moka/sync/cache.rs
1use super::{
2 base_cache::{BaseCache, HouseKeeperArc},
3 value_initializer::{InitResult, ValueInitializer},
4 CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector,
5};
6use crate::{
7 common::{
8 concurrent::{
9 constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp,
10 },
11 iter::ScanningGet,
12 time::{Clock, Instant},
13 HousekeeperConfig,
14 },
15 notification::EvictionListener,
16 ops::compute::{self, CompResult},
17 policy::{EvictionPolicy, ExpirationPolicy},
18 sync::{Iter, PredicateId},
19 Entry, Policy, PredicateError,
20};
21
22use crossbeam_channel::{Sender, TrySendError};
23use equivalent::Equivalent;
24use std::{
25 collections::hash_map::RandomState,
26 fmt,
27 hash::{BuildHasher, Hash},
28 sync::Arc,
29 time::Duration,
30};
31
32/// A thread-safe concurrent synchronous in-memory cache.
33///
34/// `Cache` supports full concurrency of retrievals and a high expected concurrency
35/// for updates.
36///
37/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
38/// storage. `Cache` performs a best-effort bounding of the map using an entry
39/// replacement algorithm to determine which entries to evict when the capacity is
40/// exceeded.
41///
42/// # Table of Contents
43///
44/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
45/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
46/// - [Sharing a cache across threads](#sharing-a-cache-across-threads)
47/// - [No lock is needed](#no-lock-is-needed)
48/// - [Hashing Algorithm](#hashing-algorithm)
49/// - [Example: Size-based Eviction](#example-size-based-eviction)
50/// - [Example: Time-based Expirations](#example-time-based-expirations)
51/// - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
52/// - [Per-entry expiration policy](#per-entry-expiration-policy)
53/// - [Example: Eviction Listener](#example-eviction-listener)
54/// - [You should avoid eviction listener to
55/// panic](#you-should-avoid-eviction-listener-to-panic)
56///
57/// # Example: `insert`, `get` and `invalidate`
58///
59/// Cache entries are manually added using [`insert`](#method.insert) or
60/// [`get_with`](#method.get_with) methods, and are stored in the cache until either
61/// evicted or manually invalidated.
62///
63/// Here's an example of reading and updating a cache by using multiple threads:
64///
65/// ```rust
66/// use moka::sync::Cache;
67///
68/// use std::thread;
69///
70/// fn value(n: usize) -> String {
71/// format!("value {n}")
72/// }
73///
74/// const NUM_THREADS: usize = 16;
75/// const NUM_KEYS_PER_THREAD: usize = 64;
76///
77/// // Create a cache that can store up to 10,000 entries.
78/// let cache = Cache::new(10_000);
79///
80/// // Spawn threads and read and update the cache simultaneously.
81/// let threads: Vec<_> = (0..NUM_THREADS)
82/// .map(|i| {
83/// // To share the same cache across the threads, clone it.
84/// // This is a cheap operation.
85/// let my_cache = cache.clone();
86/// let start = i * NUM_KEYS_PER_THREAD;
87/// let end = (i + 1) * NUM_KEYS_PER_THREAD;
88///
89/// thread::spawn(move || {
90/// // Insert 64 entries. (NUM_KEYS_PER_THREAD = 64)
91/// for key in start..end {
92/// my_cache.insert(key, value(key));
93/// // get() returns Option<String>, a clone of the stored value.
94/// assert_eq!(my_cache.get(&key), Some(value(key)));
95/// }
96///
97/// // Invalidate every 4 element of the inserted entries.
98/// for key in (start..end).step_by(4) {
99/// my_cache.invalidate(&key);
100/// }
101/// })
102/// })
103/// .collect();
104///
105/// // Wait for all threads to complete.
106/// threads.into_iter().for_each(|t| t.join().expect("Failed"));
107///
108/// // Verify the result.
109/// for key in 0..(NUM_THREADS * NUM_KEYS_PER_THREAD) {
110/// if key % 4 == 0 {
111/// assert_eq!(cache.get(&key), None);
112/// } else {
113/// assert_eq!(cache.get(&key), Some(value(key)));
114/// }
115/// }
116/// ```
117///
118/// If you want to atomically initialize and insert a value when the key is not
119/// present, you might want to check other insertion methods
120/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
121///
122/// # Avoiding to clone the value at `get`
123///
124/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
125/// time `get` is called for an existing key, it creates a clone of the stored value
126/// `V` and returns it. This is because the `Cache` allows concurrent updates from
127/// threads so a value stored in the cache can be dropped or replaced at any time by
128/// any other thread. `get` cannot return a reference `&V` as it is impossible to
129/// guarantee the value outlives the reference.
130///
131/// If you want to store values that will be expensive to clone, wrap them by
132/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
133/// thread-safe reference-counted pointer and its `clone()` method is cheap.
134///
135/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
136///
137/// # Sharing a cache across threads
138///
139/// To share a cache across threads, do one of the followings:
140///
141/// - Create a clone of the cache by calling its `clone` method and pass it to other
142/// thread.
143/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
144/// [once_cell][once-cell-crate] create, and set it to a `static` variable.
145///
146/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
147/// reference-counted pointers to the internal data structures.
148///
149/// ## No lock is needed
150///
151/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
152/// by the `Cache` are considered thread-safe, and can be safely called by multiple
153/// threads at the same time. No lock is needed.
154///
155/// [once-cell-crate]: https://crates.io/crates/once_cell
156///
157/// # Hashing Algorithm
158///
159/// By default, `Cache` uses a hashing algorithm selected to provide resistance
160/// against HashDoS attacks. It will be the same one used by
161/// `std::collections::HashMap`, which is currently SipHash 1-3.
162///
163/// While SipHash's performance is very competitive for medium sized keys, other
164/// hashing algorithms will outperform it for small keys such as integers as well as
165/// large keys such as long strings. However those algorithms will typically not
166/// protect against attacks such as HashDoS.
167///
168/// The hashing algorithm can be replaced on a per-`Cache` basis using the
169/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
170/// Many alternative algorithms are available on crates.io, such as the
171/// [AHash][ahash-crate] crate.
172///
173/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
174/// [ahash-crate]: https://crates.io/crates/ahash
175///
176/// # Example: Size-based Eviction
177///
178/// ```rust
179/// use moka::sync::Cache;
180///
181/// // Evict based on the number of entries in the cache.
182/// let cache = Cache::builder()
183/// // Up to 10,000 entries.
184/// .max_capacity(10_000)
185/// // Create the cache.
186/// .build();
187/// cache.insert(1, "one".to_string());
188///
189/// // Evict based on the byte length of strings in the cache.
190/// let cache = Cache::builder()
191/// // A weigher closure takes &K and &V and returns a u32
192/// // representing the relative size of the entry.
193/// .weigher(|_key, value: &String| -> u32 {
194/// value.len().try_into().unwrap_or(u32::MAX)
195/// })
196/// // This cache will hold up to 32MiB of values.
197/// .max_capacity(32 * 1024 * 1024)
198/// .build();
199/// cache.insert(2, "two".to_string());
200/// ```
201///
202/// If your cache should not grow beyond a certain size, use the `max_capacity`
203/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
204/// will try to evict entries that have not been used recently or very often.
205///
206/// At the cache creation time, a weigher closure can be set by the `weigher` method
207/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
208/// returns a `u32` representing the relative size of the entry:
209///
210/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
211/// size of `1`. This means the cache will be bounded by the number of entries.
212/// - If the `weigher` is set, the cache will call the weigher to calculate the
213/// weighted size (relative size) on an entry. This means the cache will be bounded
214/// by the total weighted size of entries.
215///
216/// Note that weighted sizes are not used when making eviction selections.
217///
218/// [builder-struct]: ./struct.CacheBuilder.html
219///
220/// # Example: Time-based Expirations
221///
222/// ## Cache-level TTL and TTI policies
223///
224/// `Cache` supports the following cache-level expiration policies:
225///
226/// - **Time to live (TTL)**: A cached entry will be expired after the specified
227/// duration past from `insert`.
228/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
229/// duration past from `get` or `insert`.
230///
231/// They are a cache-level expiration policies; all entries in the cache will have
232/// the same TTL and/or TTI durations. If you want to set different expiration
233/// durations for different entries, see the next section.
234///
235/// ```rust
236/// use moka::sync::Cache;
237/// use std::time::Duration;
238///
239/// let cache = Cache::builder()
240/// // Time to live (TTL): 30 minutes
241/// .time_to_live(Duration::from_secs(30 * 60))
242/// // Time to idle (TTI): 5 minutes
243/// .time_to_idle(Duration::from_secs( 5 * 60))
244/// // Create the cache.
245/// .build();
246///
247/// // This entry will expire after 5 minutes (TTI) if there is no get().
248/// cache.insert(0, "zero");
249///
250/// // This get() will extend the entry life for another 5 minutes.
251/// cache.get(&0);
252///
253/// // Even though we keep calling get(), the entry will expire
254/// // after 30 minutes (TTL) from the insert().
255/// ```
256///
257/// ## Per-entry expiration policy
258///
259/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
260///
261/// `Expiry` trait provides three callback methods:
262/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
263/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
264/// updated, one of these methods is called. These methods return an
265/// `Option<Duration>`, which is used as the expiration duration of the entry.
266///
267/// `Expiry` trait provides the default implementations of these methods, so you will
268/// implement only the methods you want to customize.
269///
270/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
271/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
272/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
273///
274/// ```rust
275/// use moka::{sync::Cache, Expiry};
276/// use std::time::{Duration, Instant};
277///
278/// // In this example, we will create a `sync::Cache` with `u32` as the key, and
279/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
280/// // expiration of the value, and `String` is the application data of the value.
281///
282/// /// An enum to represent the expiration of a value.
283/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
284/// pub enum Expiration {
285/// /// The value never expires.
286/// Never,
287/// /// The value expires after a short time. (5 seconds in this example)
288/// AfterShortTime,
289/// /// The value expires after a long time. (15 seconds in this example)
290/// AfterLongTime,
291/// }
292///
293/// impl Expiration {
294/// /// Returns the duration of this expiration.
295/// pub fn as_duration(&self) -> Option<Duration> {
296/// match self {
297/// Expiration::Never => None,
298/// Expiration::AfterShortTime => Some(Duration::from_secs(5)),
299/// Expiration::AfterLongTime => Some(Duration::from_secs(15)),
300/// }
301/// }
302/// }
303///
304/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
305/// /// default implementations of three callback methods `expire_after_create`,
306/// /// `expire_after_read`, and `expire_after_update`.
307/// ///
308/// /// In this example, we only override the `expire_after_create` method.
309/// pub struct MyExpiry;
310///
311/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
312/// /// Returns the duration of the expiration of the value that was just
313/// /// created.
314/// fn expire_after_create(
315/// &self,
316/// _key: &u32,
317/// value: &(Expiration, String),
318/// _current_time: Instant,
319/// ) -> Option<Duration> {
320/// let duration = value.0.as_duration();
321/// println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
322/// duration
323/// }
324/// }
325///
326/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
327/// // eviction listener.
328/// let expiry = MyExpiry;
329///
330/// let eviction_listener = |key, _value, cause| {
331/// println!("Evicted key {key}. Cause: {cause:?}");
332/// };
333///
334/// let cache = Cache::builder()
335/// .max_capacity(100)
336/// .expire_after(expiry)
337/// .eviction_listener(eviction_listener)
338/// .build();
339///
340/// // Insert some entries into the cache with different expirations.
341/// cache.get_with(0, || (Expiration::AfterShortTime, "a".to_string()));
342/// cache.get_with(1, || (Expiration::AfterLongTime, "b".to_string()));
343/// cache.get_with(2, || (Expiration::Never, "c".to_string()));
344///
345/// // Verify that all the inserted entries exist.
346/// assert!(cache.contains_key(&0));
347/// assert!(cache.contains_key(&1));
348/// assert!(cache.contains_key(&2));
349///
350/// // Sleep for 6 seconds. Key 0 should expire.
351/// println!("\nSleeping for 6 seconds...\n");
352/// std::thread::sleep(Duration::from_secs(6));
353/// println!("Entry count: {}", cache.entry_count());
354///
355/// // Verify that key 0 has been evicted.
356/// assert!(!cache.contains_key(&0));
357/// assert!(cache.contains_key(&1));
358/// assert!(cache.contains_key(&2));
359///
360/// // Sleep for 10 more seconds. Key 1 should expire.
361/// println!("\nSleeping for 10 seconds...\n");
362/// std::thread::sleep(Duration::from_secs(10));
363/// println!("Entry count: {}", cache.entry_count());
364///
365/// // Verify that key 1 has been evicted.
366/// assert!(!cache.contains_key(&1));
367/// assert!(cache.contains_key(&2));
368///
369/// // Manually invalidate key 2.
370/// cache.invalidate(&2);
371/// assert!(!cache.contains_key(&2));
372///
373/// println!("\nSleeping for a second...\n");
374/// std::thread::sleep(Duration::from_secs(1));
375/// println!("Entry count: {}", cache.entry_count());
376///
377/// println!("\nDone!");
378/// ```
379///
380/// # Example: Eviction Listener
381///
382/// A `Cache` can be configured with an eviction listener, a closure that is called
383/// every time there is a cache eviction. The listener takes three parameters: the
384/// key and value of the evicted entry, and the
385/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
386/// entry was evicted.
387///
388/// An eviction listener can be used to keep other data structures in sync with the
389/// cache, for example.
390///
391/// The following example demonstrates how to use an eviction listener with
392/// time-to-live expiration to manage the lifecycle of temporary files on a
393/// filesystem. The cache stores the paths of the files, and when one of them has
394/// expired, the eviction listener will be called with the path, so it can remove the
395/// file from the filesystem.
396///
397/// ```rust
398/// // Cargo.toml
399/// //
400/// // [dependencies]
401/// // anyhow = "1.0"
402/// // uuid = { version = "1.1", features = ["v4"] }
403///
404/// use moka::{sync::Cache, notification};
405///
406/// use anyhow::{anyhow, Context};
407/// use std::{
408/// fs, io,
409/// path::{Path, PathBuf},
410/// sync::{Arc, RwLock},
411/// time::Duration,
412/// };
413/// use uuid::Uuid;
414///
415/// /// The DataFileManager writes, reads and removes data files.
416/// struct DataFileManager {
417/// base_dir: PathBuf,
418/// file_count: usize,
419/// }
420///
421/// impl DataFileManager {
422/// fn new(base_dir: PathBuf) -> Self {
423/// Self {
424/// base_dir,
425/// file_count: 0,
426/// }
427/// }
428///
429/// fn write_data_file(
430/// &mut self,
431/// key: impl AsRef<str>,
432/// contents: String
433/// ) -> io::Result<PathBuf> {
434/// // Use the key as a part of the filename.
435/// let mut path = self.base_dir.to_path_buf();
436/// path.push(key.as_ref());
437///
438/// assert!(!path.exists(), "Path already exists: {path:?}");
439///
440/// // create the file at the path and write the contents to the file.
441/// fs::write(&path, contents)?;
442/// self.file_count += 1;
443/// println!("Created a data file at {path:?} (file count: {})", self.file_count);
444/// Ok(path)
445/// }
446///
447/// fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
448/// // Reads the contents of the file at the path, and return the contents.
449/// fs::read_to_string(path)
450/// }
451///
452/// fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
453/// // Remove the file at the path.
454/// fs::remove_file(path.as_ref())?;
455/// self.file_count -= 1;
456/// println!(
457/// "Removed a data file at {:?} (file count: {})",
458/// path.as_ref(),
459/// self.file_count
460/// );
461///
462/// Ok(())
463/// }
464/// }
465///
466/// fn main() -> anyhow::Result<()> {
467/// // Create an instance of the DataFileManager and wrap it with
468/// // Arc<RwLock<_>> so it can be shared across threads.
469/// let mut base_dir = std::env::temp_dir();
470/// base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
471/// println!("base_dir: {base_dir:?}");
472/// std::fs::create_dir(&base_dir)?;
473///
474/// let file_mgr = DataFileManager::new(base_dir);
475/// let file_mgr = Arc::new(RwLock::new(file_mgr));
476///
477/// let file_mgr1 = Arc::clone(&file_mgr);
478///
479/// // Create an eviction listener closure.
480/// let eviction_listener = move |k, v: PathBuf, cause| {
481/// // Try to remove the data file at the path `v`.
482/// println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
483///
484/// // Acquire the write lock of the DataFileManager. We must handle
485/// // error cases here to prevent the listener from panicking.
486/// match file_mgr1.write() {
487/// Err(_e) => {
488/// eprintln!("The lock has been poisoned");
489/// }
490/// Ok(mut mgr) => {
491/// // Remove the data file using the DataFileManager.
492/// if let Err(_e) = mgr.remove_data_file(v.as_path()) {
493/// eprintln!("Failed to remove a data file at {v:?}");
494/// }
495/// }
496/// }
497/// };
498///
499/// // Create the cache. Set time to live for two seconds and set the
500/// // eviction listener.
501/// let cache = Cache::builder()
502/// .max_capacity(100)
503/// .time_to_live(Duration::from_secs(2))
504/// .eviction_listener(eviction_listener)
505/// .build();
506///
507/// // Insert an entry to the cache.
508/// // This will create and write a data file for the key "user1", store the
509/// // path of the file to the cache, and return it.
510/// println!("== try_get_with()");
511/// let key = "user1";
512/// let path = cache
513/// .try_get_with(key, || -> anyhow::Result<_> {
514/// let mut mgr = file_mgr
515/// .write()
516/// .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
517/// let path = mgr
518/// .write_data_file(key, "user data".into())
519/// .with_context(|| format!("Failed to create a data file"))?;
520/// Ok(path)
521/// })
522/// .map_err(|e| anyhow!("{e}"))?;
523///
524/// // Read the data file at the path and print the contents.
525/// println!("\n== read_data_file()");
526/// {
527/// let mgr = file_mgr
528/// .read()
529/// .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
530/// let contents = mgr
531/// .read_data_file(path.as_path())
532/// .with_context(|| format!("Failed to read data from {path:?}"))?;
533/// println!("contents: {contents}");
534/// }
535///
536/// // Sleep for five seconds. While sleeping, the cache entry for key "user1"
537/// // will be expired and evicted, so the eviction listener will be called to
538/// // remove the file.
539/// std::thread::sleep(Duration::from_secs(5));
540///
541/// cache.run_pending_tasks();
542///
543/// Ok(())
544/// }
545/// ```
546///
547/// ## You should avoid eviction listener to panic
548///
549/// It is very important to make an eviction listener closure not to panic.
550/// Otherwise, the cache will stop calling the listener after a panic. This is an
551/// intended behavior because the cache cannot know whether it is memory safe or not
552/// to call the panicked listener again.
553///
554/// When a listener panics, the cache will swallow the panic and disable the
555/// listener. If you want to know when a listener panics and the reason of the panic,
556/// you can enable an optional `logging` feature of Moka and check error-level logs.
557///
558/// To enable the `logging`, do the followings:
559///
560/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
561/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
562/// `info`, ...):
563/// - If you are using the `env_logger` crate, you can achieve this by setting
564/// `RUST_LOG` environment variable to `moka=error`.
565/// 3. If you have more than one caches, you may want to set a distinct name for each
566/// cache by using cache builder's [`name`][builder-name-method] method. The name
567/// will appear in the log.
568///
569/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
570///
571pub struct Cache<K, V, S = RandomState> {
572 pub(crate) base: BaseCache<K, V, S>,
573 value_initializer: Arc<ValueInitializer<K, V, S>>,
574}
575
576unsafe impl<K, V, S> Send for Cache<K, V, S>
577where
578 K: Send + Sync,
579 V: Send + Sync,
580 S: Send,
581{
582}
583
584unsafe impl<K, V, S> Sync for Cache<K, V, S>
585where
586 K: Send + Sync,
587 V: Send + Sync,
588 S: Sync,
589{
590}
591
592// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
593impl<K, V, S> Clone for Cache<K, V, S> {
594 /// Makes a clone of this shared cache.
595 ///
596 /// This operation is cheap as it only creates thread-safe reference counted
597 /// pointers to the shared internal data structures.
598 fn clone(&self) -> Self {
599 Self {
600 base: self.base.clone(),
601 value_initializer: Arc::clone(&self.value_initializer),
602 }
603 }
604}
605
606impl<K, V, S> fmt::Debug for Cache<K, V, S>
607where
608 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
609 V: fmt::Debug + Clone + Send + Sync + 'static,
610 // TODO: Remove these bounds from S.
611 S: BuildHasher + Clone + Send + Sync + 'static,
612{
613 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614 let mut d_map = f.debug_map();
615
616 for (k, v) in self {
617 d_map.entry(&k, &v);
618 }
619
620 d_map.finish()
621 }
622}
623
624impl<K, V, S> Cache<K, V, S> {
625 /// Returns cache’s name.
626 pub fn name(&self) -> Option<&str> {
627 self.base.name()
628 }
629
630 /// Returns a read-only cache policy of this cache.
631 ///
632 /// At this time, cache policy cannot be modified after cache creation.
633 /// A future version may support to modify it.
634 pub fn policy(&self) -> Policy {
635 self.base.policy()
636 }
637
638 /// Returns an approximate number of entries in this cache.
639 ///
640 /// The value returned is _an estimate_; the actual count may differ if there are
641 /// concurrent insertions or removals, or if some entries are pending removal due
642 /// to expiration. This inaccuracy can be mitigated by performing a
643 /// `run_pending_tasks` first.
644 ///
645 /// # Example
646 ///
647 /// ```rust
648 /// use moka::sync::Cache;
649 ///
650 /// let cache = Cache::new(10);
651 /// cache.insert('n', "Netherland Dwarf");
652 /// cache.insert('l', "Lop Eared");
653 /// cache.insert('d', "Dutch");
654 ///
655 /// // Ensure an entry exists.
656 /// assert!(cache.contains_key(&'n'));
657 ///
658 /// // However, followings may print stale number zeros instead of threes.
659 /// println!("{}", cache.entry_count()); // -> 0
660 /// println!("{}", cache.weighted_size()); // -> 0
661 ///
662 /// // To mitigate the inaccuracy, Call `run_pending_tasks` method to run
663 /// // pending internal tasks.
664 /// cache.run_pending_tasks();
665 ///
666 /// // Followings will print the actual numbers.
667 /// println!("{}", cache.entry_count()); // -> 3
668 /// println!("{}", cache.weighted_size()); // -> 3
669 /// ```
670 ///
671 pub fn entry_count(&self) -> u64 {
672 self.base.entry_count()
673 }
674
675 /// Returns an approximate total weighted size of entries in this cache.
676 ///
677 /// The value returned is _an estimate_; the actual size may differ if there are
678 /// concurrent insertions or removals, or if some entries are pending removal due
679 /// to expiration. This inaccuracy can be mitigated by performing a
680 /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
681 /// sample code.
682 pub fn weighted_size(&self) -> u64 {
683 self.base.weighted_size()
684 }
685}
686
687impl<K, V> Cache<K, V, RandomState>
688where
689 K: Hash + Eq + Send + Sync + 'static,
690 V: Clone + Send + Sync + 'static,
691{
692 /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
693 ///
694 /// To adjust various configuration knobs such as `initial_capacity` or
695 /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
696 ///
697 /// [builder-struct]: ./struct.CacheBuilder.html
698 pub fn new(max_capacity: u64) -> Self {
699 let build_hasher = RandomState::default();
700 Self::with_everything(
701 None,
702 Some(max_capacity),
703 None,
704 build_hasher,
705 None,
706 EvictionPolicy::default(),
707 None,
708 ExpirationPolicy::default(),
709 HousekeeperConfig::default(),
710 false,
711 Clock::default(),
712 )
713 }
714
715 /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` or
716 /// `SegmentedCache` with various configuration knobs.
717 ///
718 /// [builder-struct]: ./struct.CacheBuilder.html
719 pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
720 CacheBuilder::default()
721 }
722}
723
724impl<K, V, S> Cache<K, V, S>
725where
726 K: Hash + Eq + Send + Sync + 'static,
727 V: Clone + Send + Sync + 'static,
728 S: BuildHasher + Clone + Send + Sync + 'static,
729{
730 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
731 #[allow(clippy::too_many_arguments)]
732 pub(crate) fn with_everything(
733 name: Option<String>,
734 max_capacity: Option<u64>,
735 initial_capacity: Option<usize>,
736 build_hasher: S,
737 weigher: Option<Weigher<K, V>>,
738 eviction_policy: EvictionPolicy,
739 eviction_listener: Option<EvictionListener<K, V>>,
740 expiration_policy: ExpirationPolicy<K, V>,
741 housekeeper_config: HousekeeperConfig,
742 invalidator_enabled: bool,
743 clock: Clock,
744 ) -> Self {
745 Self {
746 base: BaseCache::new(
747 name,
748 max_capacity,
749 initial_capacity,
750 build_hasher.clone(),
751 weigher,
752 eviction_policy,
753 eviction_listener,
754 expiration_policy,
755 housekeeper_config,
756 invalidator_enabled,
757 clock,
758 ),
759 value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
760 }
761 }
762
763 /// Returns `true` if the cache contains a value for the key.
764 ///
765 /// Unlike the `get` method, this method is not considered a cache read operation,
766 /// so it does not update the historic popularity estimator or reset the idle
767 /// timer for the key.
768 ///
769 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
770 /// on the borrowed form _must_ match those for the key type.
771 pub fn contains_key<Q>(&self, key: &Q) -> bool
772 where
773 Q: Equivalent<K> + Hash + ?Sized,
774 {
775 self.base.contains_key_with_hash(key, self.base.hash(key))
776 }
777
778 pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
779 where
780 Q: Equivalent<K> + Hash + ?Sized,
781 {
782 self.base.contains_key_with_hash(key, hash)
783 }
784
785 /// Returns a _clone_ of the value corresponding to the key.
786 ///
787 /// If you want to store values that will be expensive to clone, wrap them by
788 /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
789 /// thread-safe reference-counted pointer and its `clone()` method is cheap.
790 ///
791 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
792 /// on the borrowed form _must_ match those for the key type.
793 ///
794 /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
795 pub fn get<Q>(&self, key: &Q) -> Option<V>
796 where
797 Q: Equivalent<K> + Hash + ?Sized,
798 {
799 self.base
800 .get_with_hash(key, self.base.hash(key), false)
801 .map(Entry::into_value)
802 }
803
804 pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
805 where
806 Q: Equivalent<K> + Hash + ?Sized,
807 {
808 self.base.get_with_hash(key, hash, need_key)
809 }
810
811 /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
812 /// select or insert an entry.
813 ///
814 /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
815 ///
816 /// # Example
817 ///
818 /// ```rust
819 /// use moka::sync::Cache;
820 ///
821 /// let cache: Cache<String, u32> = Cache::new(100);
822 /// let key = "key1".to_string();
823 ///
824 /// let entry = cache.entry(key.clone()).or_insert(3);
825 /// assert!(entry.is_fresh());
826 /// assert_eq!(entry.key(), &key);
827 /// assert_eq!(entry.into_value(), 3);
828 ///
829 /// let entry = cache.entry(key).or_insert(6);
830 /// // Not fresh because the value was already in the cache.
831 /// assert!(!entry.is_fresh());
832 /// assert_eq!(entry.into_value(), 3);
833 /// ```
834 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
835 where
836 K: Hash + Eq,
837 {
838 let hash = self.base.hash(&key);
839 OwnedKeyEntrySelector::new(key, hash, self)
840 }
841
842 /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
843 /// can be used to select or insert an entry.
844 ///
845 /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
846 ///
847 /// # Example
848 ///
849 /// ```rust
850 /// use moka::sync::Cache;
851 ///
852 /// let cache: Cache<String, u32> = Cache::new(100);
853 /// let key = "key1".to_string();
854 ///
855 /// let entry = cache.entry_by_ref(&key).or_insert(3);
856 /// assert!(entry.is_fresh());
857 /// assert_eq!(entry.key(), &key);
858 /// assert_eq!(entry.into_value(), 3);
859 ///
860 /// let entry = cache.entry_by_ref(&key).or_insert(6);
861 /// // Not fresh because the value was already in the cache.
862 /// assert!(!entry.is_fresh());
863 /// assert_eq!(entry.into_value(), 3);
864 /// ```
865 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
866 where
867 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
868 {
869 let hash = self.base.hash(key);
870 RefKeyEntrySelector::new(key, hash, self)
871 }
872
873 /// Returns a _clone_ of the value corresponding to the key. If the value does
874 /// not exist, evaluates the `init` closure and inserts the output.
875 ///
876 /// # Concurrent calls on the same key
877 ///
878 /// This method guarantees that concurrent calls on the same not-existing key are
879 /// coalesced into one evaluation of the `init` closure. Only one of the calls
880 /// evaluates its closure, and other calls wait for that closure to complete.
881 ///
882 /// The following code snippet demonstrates this behavior:
883 ///
884 /// ```rust
885 /// use moka::sync::Cache;
886 /// use std::{sync::Arc, thread};
887 ///
888 /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
889 /// let cache = Cache::new(100);
890 ///
891 /// // Spawn four threads.
892 /// let threads: Vec<_> = (0..4_u8)
893 /// .map(|task_id| {
894 /// let my_cache = cache.clone();
895 /// thread::spawn(move || {
896 /// println!("Thread {task_id} started.");
897 ///
898 /// // Try to insert and get the value for key1. Although all four
899 /// // threads will call `get_with` at the same time, the `init` closure
900 /// // must be evaluated only once.
901 /// let value = my_cache.get_with("key1", || {
902 /// println!("Thread {task_id} inserting a value.");
903 /// Arc::new(vec![0u8; TEN_MIB])
904 /// });
905 ///
906 /// // Ensure the value exists now.
907 /// assert_eq!(value.len(), TEN_MIB);
908 /// assert!(my_cache.get(&"key1").is_some());
909 ///
910 /// println!("Thread {task_id} got the value. (len: {})", value.len());
911 /// })
912 /// })
913 /// .collect();
914 ///
915 /// // Wait all threads to complete.
916 /// threads
917 /// .into_iter()
918 /// .for_each(|t| t.join().expect("Thread failed"));
919 /// ```
920 ///
921 /// **Result**
922 ///
923 /// - The `init` closure was called exactly once by thread 1.
924 /// - Other threads were blocked until thread 1 inserted the value.
925 ///
926 /// ```console
927 /// Thread 1 started.
928 /// Thread 0 started.
929 /// Thread 3 started.
930 /// Thread 2 started.
931 /// Thread 1 inserting a value.
932 /// Thread 2 got the value. (len: 10485760)
933 /// Thread 1 got the value. (len: 10485760)
934 /// Thread 0 got the value. (len: 10485760)
935 /// Thread 3 got the value. (len: 10485760)
936 /// ```
937 ///
938 /// # Panics
939 ///
940 /// This method panics when the `init` closure has panicked. When it happens,
941 /// only the caller whose `init` closure panicked will get the panic (e.g. only
942 /// thread 1 in the above sample). If there are other calls in progress (e.g.
943 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
944 /// remaining `init` closure.
945 ///
946 pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
947 let hash = self.base.hash(&key);
948 let key = Arc::new(key);
949 let replace_if = None as Option<fn(&V) -> bool>;
950 self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
951 .into_value()
952 }
953
954 /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
955 /// key, you can pass a reference to the key. If the key does not exist in the
956 /// cache, the key will be cloned to create new entry in the cache.
957 pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
958 where
959 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
960 {
961 let hash = self.base.hash(key);
962 let replace_if = None as Option<fn(&V) -> bool>;
963
964 self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
965 .into_value()
966 }
967
968 /// TODO: Remove this in v0.13.0.
969 /// Deprecated, replaced with
970 /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
971 #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
972 pub fn get_with_if(
973 &self,
974 key: K,
975 init: impl FnOnce() -> V,
976 replace_if: impl FnMut(&V) -> bool,
977 ) -> V {
978 let hash = self.base.hash(&key);
979 let key = Arc::new(key);
980 self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
981 .into_value()
982 }
983
984 pub(crate) fn get_or_insert_with_hash_and_fun(
985 &self,
986 key: Arc<K>,
987 hash: u64,
988 init: impl FnOnce() -> V,
989 mut replace_if: Option<impl FnMut(&V) -> bool>,
990 need_key: bool,
991 ) -> Entry<K, V> {
992 self.base
993 .get_with_hash_and_ignore_if(&*key, hash, replace_if.as_mut(), need_key)
994 .unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key))
995 }
996
997 // Need to create new function instead of using the existing
998 // `get_or_insert_with_hash_and_fun`. The reason is `by_ref` function will
999 // require key reference to have `ToOwned` trait. If we modify the existing
1000 // `get_or_insert_with_hash_and_fun` function, it will require all the existing
1001 // apis that depends on it to make the `K` to have `ToOwned` trait.
1002 pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1003 &self,
1004 key: &Q,
1005 hash: u64,
1006 init: impl FnOnce() -> V,
1007 mut replace_if: Option<impl FnMut(&V) -> bool>,
1008 need_key: bool,
1009 ) -> Entry<K, V>
1010 where
1011 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1012 {
1013 self.base
1014 .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key)
1015 .unwrap_or_else(|| {
1016 let key = Arc::new(key.to_owned());
1017 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1018 })
1019 }
1020
1021 pub(crate) fn insert_with_hash_and_fun(
1022 &self,
1023 key: Arc<K>,
1024 hash: u64,
1025 init: impl FnOnce() -> V,
1026 mut replace_if: Option<impl FnMut(&V) -> bool>,
1027 need_key: bool,
1028 ) -> Entry<K, V> {
1029 let get = || {
1030 self.base
1031 .get_with_hash_without_recording(&*key, hash, replace_if.as_mut())
1032 };
1033 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1034
1035 let k = if need_key {
1036 Some(Arc::clone(&key))
1037 } else {
1038 None
1039 };
1040
1041 let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1042 let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1043
1044 match self
1045 .value_initializer
1046 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1047 {
1048 InitResult::Initialized(v) => {
1049 crossbeam_epoch::pin().flush();
1050 Entry::new(k, v, true, false)
1051 }
1052 InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1053 InitResult::InitErr(_) => unreachable!(),
1054 }
1055 }
1056
1057 pub(crate) fn get_or_insert_with_hash(
1058 &self,
1059 key: Arc<K>,
1060 hash: u64,
1061 init: impl FnOnce() -> V,
1062 ) -> Entry<K, V> {
1063 match self.base.get_with_hash(&*key, hash, true) {
1064 Some(entry) => entry,
1065 None => {
1066 let value = init();
1067 self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1068 Entry::new(Some(key), value, true, false)
1069 }
1070 }
1071 }
1072
1073 pub(crate) fn get_or_insert_with_hash_by_ref<Q>(
1074 &self,
1075 key: &Q,
1076 hash: u64,
1077 init: impl FnOnce() -> V,
1078 ) -> Entry<K, V>
1079 where
1080 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1081 {
1082 match self.base.get_with_hash(key, hash, true) {
1083 Some(entry) => entry,
1084 None => {
1085 let key = Arc::new(key.to_owned());
1086 let value = init();
1087 self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1088 Entry::new(Some(key), value, true, false)
1089 }
1090 }
1091 }
1092
1093 /// Returns a _clone_ of the value corresponding to the key. If the value does
1094 /// not exist, evaluates the `init` closure, and inserts the value if
1095 /// `Some(value)` was returned. If `None` was returned from the closure, this
1096 /// method does not insert a value and returns `None`.
1097 ///
1098 /// # Concurrent calls on the same key
1099 ///
1100 /// This method guarantees that concurrent calls on the same not-existing key are
1101 /// coalesced into one evaluation of the `init` closure. Only one of the calls
1102 /// evaluates its closure, and other calls wait for that closure to complete.
1103 ///
1104 /// The following code snippet demonstrates this behavior:
1105 ///
1106 /// ```rust
1107 /// use moka::sync::Cache;
1108 /// use std::{path::Path, thread};
1109 ///
1110 /// /// This function tries to get the file size in bytes.
1111 /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Option<u64> {
1112 /// println!("get_file_size() called by thread {thread_id}.");
1113 /// std::fs::metadata(path).ok().map(|m| m.len())
1114 /// }
1115 ///
1116 /// let cache = Cache::new(100);
1117 ///
1118 /// // Spawn four threads.
1119 /// let threads: Vec<_> = (0..4_u8)
1120 /// .map(|thread_id| {
1121 /// let my_cache = cache.clone();
1122 /// thread::spawn(move || {
1123 /// println!("Thread {thread_id} started.");
1124 ///
1125 /// // Try to insert and get the value for key1. Although all four
1126 /// // threads will call `optionally_get_with` at the same time,
1127 /// // get_file_size() must be called only once.
1128 /// let value = my_cache.optionally_get_with(
1129 /// "key1",
1130 /// || get_file_size(thread_id, "./Cargo.toml"),
1131 /// );
1132 ///
1133 /// // Ensure the value exists now.
1134 /// assert!(value.is_some());
1135 /// assert!(my_cache.get(&"key1").is_some());
1136 ///
1137 /// println!(
1138 /// "Thread {thread_id} got the value. (len: {})",
1139 /// value.unwrap()
1140 /// );
1141 /// })
1142 /// })
1143 /// .collect();
1144 ///
1145 /// // Wait all threads to complete.
1146 /// threads
1147 /// .into_iter()
1148 /// .for_each(|t| t.join().expect("Thread failed"));
1149 /// ```
1150 ///
1151 /// **Result**
1152 ///
1153 /// - `get_file_size()` was called exactly once by thread 0.
1154 /// - Other threads were blocked until thread 0 inserted the value.
1155 ///
1156 /// ```console
1157 /// Thread 0 started.
1158 /// Thread 1 started.
1159 /// Thread 2 started.
1160 /// get_file_size() called by thread 0.
1161 /// Thread 3 started.
1162 /// Thread 2 got the value. (len: 1466)
1163 /// Thread 0 got the value. (len: 1466)
1164 /// Thread 1 got the value. (len: 1466)
1165 /// Thread 3 got the value. (len: 1466)
1166 /// ```
1167 ///
1168 /// # Panics
1169 ///
1170 /// This method panics when the `init` closure has panicked. When it happens,
1171 /// only the caller whose `init` closure panicked will get the panic (e.g. only
1172 /// thread 1 in the above sample). If there are other calls in progress (e.g.
1173 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1174 /// remaining `init` closure.
1175 ///
1176 pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1177 where
1178 F: FnOnce() -> Option<V>,
1179 {
1180 let hash = self.base.hash(&key);
1181 let key = Arc::new(key);
1182
1183 self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1184 .map(Entry::into_value)
1185 }
1186
1187 /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1188 /// of passing an owned key, you can pass a reference to the key. If the key does
1189 /// not exist in the cache, the key will be cloned to create new entry in the
1190 /// cache.
1191 pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1192 where
1193 F: FnOnce() -> Option<V>,
1194 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1195 {
1196 let hash = self.base.hash(key);
1197 self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1198 .map(Entry::into_value)
1199 }
1200
1201 pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
1202 &self,
1203 key: Arc<K>,
1204 hash: u64,
1205 init: F,
1206 need_key: bool,
1207 ) -> Option<Entry<K, V>>
1208 where
1209 F: FnOnce() -> Option<V>,
1210 {
1211 let entry = self.get_with_hash(&*key, hash, need_key);
1212 if entry.is_some() {
1213 return entry;
1214 }
1215
1216 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1217 }
1218
1219 pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1220 &self,
1221 key: &Q,
1222 hash: u64,
1223 init: F,
1224 need_key: bool,
1225 ) -> Option<Entry<K, V>>
1226 where
1227 F: FnOnce() -> Option<V>,
1228 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1229 {
1230 let entry = self.get_with_hash(key, hash, need_key);
1231 if entry.is_some() {
1232 return entry;
1233 }
1234
1235 let key = Arc::new(key.to_owned());
1236 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1237 }
1238
1239 pub(super) fn optionally_insert_with_hash_and_fun<F>(
1240 &self,
1241 key: Arc<K>,
1242 hash: u64,
1243 init: F,
1244 need_key: bool,
1245 ) -> Option<Entry<K, V>>
1246 where
1247 F: FnOnce() -> Option<V>,
1248 {
1249 let get = || {
1250 let ignore_if = None as Option<&mut fn(&V) -> bool>;
1251 self.base
1252 .get_with_hash_without_recording(&*key, hash, ignore_if)
1253 };
1254 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1255
1256 let k = if need_key {
1257 Some(Arc::clone(&key))
1258 } else {
1259 None
1260 };
1261
1262 let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1263 let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1264
1265 match self
1266 .value_initializer
1267 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1268 {
1269 InitResult::Initialized(v) => {
1270 crossbeam_epoch::pin().flush();
1271 Some(Entry::new(k, v, true, false))
1272 }
1273 InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1274 InitResult::InitErr(_) => {
1275 crossbeam_epoch::pin().flush();
1276 None
1277 }
1278 }
1279 }
1280
1281 /// Returns a _clone_ of the value corresponding to the key. If the value does
1282 /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
1283 /// was returned. If `Err(_)` was returned from the closure, this method does not
1284 /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1285 ///
1286 /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1287 ///
1288 /// # Concurrent calls on the same key
1289 ///
1290 /// This method guarantees that concurrent calls on the same not-existing key are
1291 /// coalesced into one evaluation of the `init` closure (as long as these
1292 /// closures return the same error type). Only one of the calls evaluates its
1293 /// closure, and other calls wait for that closure to complete.
1294 ///
1295 /// The following code snippet demonstrates this behavior:
1296 ///
1297 /// ```rust
1298 /// use moka::sync::Cache;
1299 /// use std::{path::Path, thread};
1300 ///
1301 /// /// This function tries to get the file size in bytes.
1302 /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Result<u64, std::io::Error> {
1303 /// println!("get_file_size() called by thread {thread_id}.");
1304 /// Ok(std::fs::metadata(path)?.len())
1305 /// }
1306 ///
1307 /// let cache = Cache::new(100);
1308 ///
1309 /// // Spawn four threads.
1310 /// let threads: Vec<_> = (0..4_u8)
1311 /// .map(|thread_id| {
1312 /// let my_cache = cache.clone();
1313 /// thread::spawn(move || {
1314 /// println!("Thread {thread_id} started.");
1315 ///
1316 /// // Try to insert and get the value for key1. Although all four
1317 /// // threads will call `try_get_with` at the same time,
1318 /// // get_file_size() must be called only once.
1319 /// let value = my_cache.try_get_with(
1320 /// "key1",
1321 /// || get_file_size(thread_id, "./Cargo.toml"),
1322 /// );
1323 ///
1324 /// // Ensure the value exists now.
1325 /// assert!(value.is_ok());
1326 /// assert!(my_cache.get(&"key1").is_some());
1327 ///
1328 /// println!(
1329 /// "Thread {thread_id} got the value. (len: {})",
1330 /// value.unwrap()
1331 /// );
1332 /// })
1333 /// })
1334 /// .collect();
1335 ///
1336 /// // Wait all threads to complete.
1337 /// threads
1338 /// .into_iter()
1339 /// .for_each(|t| t.join().expect("Thread failed"));
1340 /// ```
1341 ///
1342 /// **Result**
1343 ///
1344 /// - `get_file_size()` was called exactly once by thread 1.
1345 /// - Other threads were blocked until thread 1 inserted the value.
1346 ///
1347 /// ```console
1348 /// Thread 1 started.
1349 /// Thread 2 started.
1350 /// get_file_size() called by thread 1.
1351 /// Thread 3 started.
1352 /// Thread 0 started.
1353 /// Thread 2 got the value. (len: 1466)
1354 /// Thread 0 got the value. (len: 1466)
1355 /// Thread 1 got the value. (len: 1466)
1356 /// Thread 3 got the value. (len: 1466)
1357 /// ```
1358 ///
1359 /// # Panics
1360 ///
1361 /// This method panics when the `init` closure has panicked. When it happens,
1362 /// only the caller whose `init` closure panicked will get the panic (e.g. only
1363 /// thread 1 in the above sample). If there are other calls in progress (e.g.
1364 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1365 /// remaining `init` closure.
1366 ///
1367 pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1368 where
1369 F: FnOnce() -> Result<V, E>,
1370 E: Send + Sync + 'static,
1371 {
1372 let hash = self.base.hash(&key);
1373 let key = Arc::new(key);
1374 self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1375 .map(Entry::into_value)
1376 }
1377
1378 /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1379 /// owned key, you can pass a reference to the key. If the key does not exist in
1380 /// the cache, the key will be cloned to create new entry in the cache.
1381 pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1382 where
1383 F: FnOnce() -> Result<V, E>,
1384 E: Send + Sync + 'static,
1385 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1386 {
1387 let hash = self.base.hash(key);
1388 self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1389 .map(Entry::into_value)
1390 }
1391
1392 pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
1393 &self,
1394 key: Arc<K>,
1395 hash: u64,
1396 init: F,
1397 need_key: bool,
1398 ) -> Result<Entry<K, V>, Arc<E>>
1399 where
1400 F: FnOnce() -> Result<V, E>,
1401 E: Send + Sync + 'static,
1402 {
1403 if let Some(entry) = self.get_with_hash(&*key, hash, need_key) {
1404 return Ok(entry);
1405 }
1406
1407 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1408 }
1409
1410 pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
1411 &self,
1412 key: &Q,
1413 hash: u64,
1414 init: F,
1415 need_key: bool,
1416 ) -> Result<Entry<K, V>, Arc<E>>
1417 where
1418 F: FnOnce() -> Result<V, E>,
1419 E: Send + Sync + 'static,
1420 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1421 {
1422 if let Some(entry) = self.get_with_hash(key, hash, false) {
1423 return Ok(entry);
1424 }
1425
1426 let key = Arc::new(key.to_owned());
1427 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1428 }
1429
1430 pub(crate) fn try_insert_with_hash_and_fun<F, E>(
1431 &self,
1432 key: Arc<K>,
1433 hash: u64,
1434 init: F,
1435 need_key: bool,
1436 ) -> Result<Entry<K, V>, Arc<E>>
1437 where
1438 F: FnOnce() -> Result<V, E>,
1439 E: Send + Sync + 'static,
1440 {
1441 let get = || {
1442 let ignore_if = None as Option<&mut fn(&V) -> bool>;
1443 self.base
1444 .get_with_hash_without_recording(&*key, hash, ignore_if)
1445 };
1446 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1447
1448 let k = if need_key {
1449 Some(Arc::clone(&key))
1450 } else {
1451 None
1452 };
1453
1454 let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1455 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1456
1457 match self
1458 .value_initializer
1459 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1460 {
1461 InitResult::Initialized(v) => {
1462 crossbeam_epoch::pin().flush();
1463 Ok(Entry::new(k, v, true, false))
1464 }
1465 InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1466 InitResult::InitErr(e) => {
1467 crossbeam_epoch::pin().flush();
1468 Err(e)
1469 }
1470 }
1471 }
1472
1473 /// Inserts a key-value pair into the cache.
1474 ///
1475 /// If the cache has this key present, the value is updated.
1476 pub fn insert(&self, key: K, value: V) {
1477 let hash = self.base.hash(&key);
1478 let key = Arc::new(key);
1479 self.insert_with_hash(key, hash, value);
1480 }
1481
1482 pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1483 if self.base.is_map_disabled() {
1484 return;
1485 }
1486
1487 let (op, now) = self.base.do_insert_with_hash(key, hash, value);
1488 let hk = self.base.housekeeper.as_ref();
1489 Self::schedule_write_op(
1490 self.base.inner.as_ref(),
1491 &self.base.write_op_ch,
1492 op,
1493 now,
1494 hk,
1495 )
1496 .expect("Failed to insert");
1497 }
1498
1499 pub(crate) fn compute_with_hash_and_fun<F>(
1500 &self,
1501 key: Arc<K>,
1502 hash: u64,
1503 f: F,
1504 ) -> compute::CompResult<K, V>
1505 where
1506 F: FnOnce(Option<Entry<K, V>>) -> compute::Op<V>,
1507 {
1508 let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1509 match self
1510 .value_initializer
1511 .try_compute(key, hash, self, f, post_init, true)
1512 {
1513 Ok(result) => result,
1514 Err(_) => unreachable!(),
1515 }
1516 }
1517
1518 pub(crate) fn try_compute_with_hash_and_fun<F, E>(
1519 &self,
1520 key: Arc<K>,
1521 hash: u64,
1522 f: F,
1523 ) -> Result<compute::CompResult<K, V>, E>
1524 where
1525 F: FnOnce(Option<Entry<K, V>>) -> Result<compute::Op<V>, E>,
1526 E: Send + Sync + 'static,
1527 {
1528 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1529 self.value_initializer
1530 .try_compute(key, hash, self, f, post_init, true)
1531 }
1532
1533 pub(crate) fn upsert_with_hash_and_fun<F>(&self, key: Arc<K>, hash: u64, f: F) -> Entry<K, V>
1534 where
1535 F: FnOnce(Option<Entry<K, V>>) -> V,
1536 {
1537 let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1538 match self
1539 .value_initializer
1540 .try_compute(key, hash, self, f, post_init, false)
1541 {
1542 Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1543 _ => unreachable!(),
1544 }
1545 }
1546
1547 /// Discards any cached value for the key.
1548 ///
1549 /// If you need to get a the value that has been discarded, use the
1550 /// [`remove`](#method.remove) method instead.
1551 ///
1552 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1553 /// on the borrowed form _must_ match those for the key type.
1554 pub fn invalidate<Q>(&self, key: &Q)
1555 where
1556 Q: Equivalent<K> + Hash + ?Sized,
1557 {
1558 let hash = self.base.hash(key);
1559 self.invalidate_with_hash(key, hash, false);
1560 }
1561
1562 /// Discards any cached value for the key and returns a _clone_ of the value.
1563 ///
1564 /// If you do not need to get the value that has been discarded, use the
1565 /// [`invalidate`](#method.invalidate) method instead.
1566 ///
1567 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1568 /// on the borrowed form _must_ match those for the key type.
1569 pub fn remove<Q>(&self, key: &Q) -> Option<V>
1570 where
1571 Q: Equivalent<K> + Hash + ?Sized,
1572 {
1573 let hash = self.base.hash(key);
1574 self.invalidate_with_hash(key, hash, true)
1575 }
1576
1577 pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
1578 where
1579 Q: Equivalent<K> + Hash + ?Sized,
1580 {
1581 // Lock the key for removal if blocking removal notification is enabled.
1582 let mut kl = None;
1583 let mut klg = None;
1584 if self.base.is_removal_notifier_enabled() {
1585 // To lock the key, we have to get Arc<K> for key (&Q).
1586 //
1587 // TODO: Enhance this if possible. This is rather hack now because
1588 // it cannot prevent race conditions like this:
1589 //
1590 // 1. We miss the key because it does not exist. So we do not lock
1591 // the key.
1592 // 2. Somebody else (other thread) inserts the key.
1593 // 3. We remove the entry for the key, but without the key lock!
1594 //
1595 if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1596 kl = self.base.maybe_key_lock(&arc_key);
1597 klg = kl.as_ref().map(|kl| kl.lock());
1598 }
1599 }
1600
1601 match self.base.remove_entry(key, hash) {
1602 None => None,
1603 Some(kv) => {
1604 let now = self.base.current_time();
1605
1606 let info = kv.entry.entry_info();
1607 let entry_gen = info.incr_entry_gen();
1608
1609 if self.base.is_removal_notifier_enabled() {
1610 self.base.notify_invalidate(&kv.key, &kv.entry);
1611 }
1612 // Drop the locks before scheduling write op to avoid a potential
1613 // dead lock. (Scheduling write can do spin lock when the queue is
1614 // full, and queue will be drained by the housekeeping thread that
1615 // can lock the same key)
1616 std::mem::drop(klg);
1617 std::mem::drop(kl);
1618
1619 let maybe_v = if need_value {
1620 Some(kv.entry.value.clone())
1621 } else {
1622 None
1623 };
1624
1625 let op = WriteOp::Remove {
1626 kv_entry: kv,
1627 entry_gen,
1628 };
1629 let hk = self.base.housekeeper.as_ref();
1630 Self::schedule_write_op(
1631 self.base.inner.as_ref(),
1632 &self.base.write_op_ch,
1633 op,
1634 now,
1635 hk,
1636 )
1637 .expect("Failed to remove");
1638 crossbeam_epoch::pin().flush();
1639 maybe_v
1640 }
1641 }
1642 }
1643
1644 /// Discards all cached values.
1645 ///
1646 /// This method returns immediately by just setting the current time as the
1647 /// invalidation time. `get` and other retrieval methods are guaranteed not to
1648 /// return the entries inserted before or at the invalidation time.
1649 ///
1650 /// The actual removal of the invalidated entries is done as a maintenance task
1651 /// driven by a user thread. For more details, see
1652 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1653 /// level documentation.
1654 ///
1655 /// Like the `invalidate` method, this method does not clear the historic
1656 /// popularity estimator of keys so that it retains the client activities of
1657 /// trying to retrieve an item.
1658 pub fn invalidate_all(&self) {
1659 self.base.invalidate_all();
1660 }
1661
1662 /// Discards cached values that satisfy a predicate.
1663 ///
1664 /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1665 /// closure is called against each cached entry inserted before or at the time
1666 /// when this method was called. If the closure returns `true` that entry will be
1667 /// evicted from the cache.
1668 ///
1669 /// This method returns immediately by not actually removing the invalidated
1670 /// entries. Instead, it just sets the predicate to the cache with the time when
1671 /// this method was called. The actual removal of the invalidated entries is done
1672 /// as a maintenance task driven by a user thread. For more details, see
1673 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1674 /// level documentation.
1675 ///
1676 /// Also the `get` and other retrieval methods will apply the closure to a cached
1677 /// entry to determine if it should have been invalidated. Therefore, it is
1678 /// guaranteed that these methods must not return invalidated values.
1679 ///
1680 /// Note that you must call
1681 /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1682 /// at the cache creation time as the cache needs to maintain additional internal
1683 /// data structures to support this method. Otherwise, calling this method will
1684 /// fail with a
1685 /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1686 ///
1687 /// Like the `invalidate` method, this method does not clear the historic
1688 /// popularity estimator of keys so that it retains the client activities of
1689 /// trying to retrieve an item.
1690 ///
1691 /// [support-invalidation-closures]:
1692 /// ./struct.CacheBuilder.html#method.support_invalidation_closures
1693 /// [invalidation-disabled-error]:
1694 /// ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1695 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1696 where
1697 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1698 {
1699 self.base.invalidate_entries_if(Arc::new(predicate))
1700 }
1701
1702 pub(crate) fn invalidate_entries_with_arc_fun<F>(
1703 &self,
1704 predicate: Arc<F>,
1705 ) -> Result<PredicateId, PredicateError>
1706 where
1707 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1708 {
1709 self.base.invalidate_entries_if(predicate)
1710 }
1711
1712 /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1713 /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1714 /// value.
1715 ///
1716 /// Iterators do not block concurrent reads and writes on the cache. An entry can
1717 /// be inserted to, invalidated or evicted from a cache while iterators are alive
1718 /// on the same cache.
1719 ///
1720 /// Unlike the `get` method, visiting entries via an iterator do not update the
1721 /// historic popularity estimator or reset idle timers for keys.
1722 ///
1723 /// # Guarantees
1724 ///
1725 /// In order to allow concurrent access to the cache, iterator's `next` method
1726 /// does _not_ guarantee the following:
1727 ///
1728 /// - It does not guarantee to return a key-value pair (an entry) if its key has
1729 /// been inserted to the cache _after_ the iterator was created.
1730 /// - Such an entry may or may not be returned depending on key's hash and
1731 /// timing.
1732 ///
1733 /// and the `next` method guarantees the followings:
1734 ///
1735 /// - It guarantees not to return the same entry more than once.
1736 /// - It guarantees not to return an entry if it has been removed from the cache
1737 /// after the iterator was created.
1738 /// - Note: An entry can be removed by following reasons:
1739 /// - Manually invalidated.
1740 /// - Expired (e.g. time-to-live).
1741 /// - Evicted as the cache capacity exceeded.
1742 ///
1743 /// # Examples
1744 ///
1745 /// ```rust
1746 /// use moka::sync::Cache;
1747 ///
1748 /// let cache = Cache::new(100);
1749 /// cache.insert("Julia", 14);
1750 ///
1751 /// let mut iter = cache.iter();
1752 /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1753 /// assert_eq!(*k, "Julia");
1754 /// assert_eq!(v, 14);
1755 ///
1756 /// assert!(iter.next().is_none());
1757 /// ```
1758 ///
1759 pub fn iter(&self) -> Iter<'_, K, V> {
1760 Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
1761 }
1762
1763 /// Performs any pending maintenance operations needed by the cache.
1764 pub fn run_pending_tasks(&self) {
1765 if let Some(hk) = &self.base.housekeeper {
1766 hk.run_pending_tasks(&*self.base.inner);
1767 }
1768 }
1769}
1770
1771impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1772where
1773 K: Hash + Eq + Send + Sync + 'static,
1774 V: Clone + Send + Sync + 'static,
1775 S: BuildHasher + Clone + Send + Sync + 'static,
1776{
1777 type Item = (Arc<K>, V);
1778
1779 type IntoIter = Iter<'a, K, V>;
1780
1781 fn into_iter(self) -> Self::IntoIter {
1782 self.iter()
1783 }
1784}
1785
1786//
1787// Iterator support
1788//
1789impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
1790where
1791 K: Hash + Eq + Send + Sync + 'static,
1792 V: Clone + Send + Sync + 'static,
1793 S: BuildHasher + Clone + Send + Sync + 'static,
1794{
1795 fn num_cht_segments(&self) -> usize {
1796 self.base.num_cht_segments()
1797 }
1798
1799 fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
1800 self.base.scanning_get(key)
1801 }
1802
1803 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1804 self.base.keys(cht_segment)
1805 }
1806}
1807
1808//
1809// private methods
1810//
1811impl<K, V, S> Cache<K, V, S>
1812where
1813 K: Hash + Eq + Send + Sync + 'static,
1814 V: Clone + Send + Sync + 'static,
1815 S: BuildHasher + Clone + Send + Sync + 'static,
1816{
1817 // TODO: Like future::Cache, move this method to BaseCache.
1818 #[inline]
1819 fn schedule_write_op(
1820 inner: &impl InnerSync,
1821 ch: &Sender<WriteOp<K, V>>,
1822 op: WriteOp<K, V>,
1823 now: Instant,
1824 housekeeper: Option<&HouseKeeperArc>,
1825 ) -> Result<(), TrySendError<WriteOp<K, V>>> {
1826 let mut op = op;
1827
1828 // NOTES:
1829 // - This will block when the channel is full.
1830 // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`,
1831 // but we got a notable performance degradation.
1832 loop {
1833 BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
1834 match ch.try_send(op) {
1835 Ok(()) => break,
1836 Err(TrySendError::Full(op1)) => {
1837 op = op1;
1838 std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
1839 }
1840 Err(e @ TrySendError::Disconnected(_)) => return Err(e),
1841 }
1842 }
1843 Ok(())
1844 }
1845}
1846
1847// For unit tests.
1848#[cfg(test)]
1849impl<K, V, S> Cache<K, V, S> {
1850 pub(crate) fn is_table_empty(&self) -> bool {
1851 self.entry_count() == 0
1852 }
1853
1854 pub(crate) fn is_waiter_map_empty(&self) -> bool {
1855 self.value_initializer.waiter_count() == 0
1856 }
1857}
1858
1859#[cfg(test)]
1860impl<K, V, S> Cache<K, V, S>
1861where
1862 K: Hash + Eq + Send + Sync + 'static,
1863 V: Clone + Send + Sync + 'static,
1864 S: BuildHasher + Clone + Send + Sync + 'static,
1865{
1866 pub(crate) fn invalidation_predicate_count(&self) -> usize {
1867 self.base.invalidation_predicate_count()
1868 }
1869
1870 pub(crate) fn reconfigure_for_testing(&mut self) {
1871 self.base.reconfigure_for_testing();
1872 }
1873
1874 pub(crate) fn key_locks_map_is_empty(&self) -> bool {
1875 self.base.key_locks_map_is_empty()
1876 }
1877}
1878
1879// To see the debug prints, run test as `cargo test -- --nocapture`
1880#[cfg(test)]
1881mod tests {
1882 use super::Cache;
1883 use crate::{
1884 common::{time::Clock, HousekeeperConfig},
1885 notification::RemovalCause,
1886 policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
1887 Expiry,
1888 };
1889
1890 use parking_lot::Mutex;
1891 use std::{
1892 convert::Infallible,
1893 sync::{
1894 atomic::{AtomicU8, Ordering},
1895 Arc,
1896 },
1897 time::{Duration, Instant as StdInstant},
1898 };
1899
1900 #[test]
1901 fn max_capacity_zero() {
1902 let mut cache = Cache::new(0);
1903 cache.reconfigure_for_testing();
1904
1905 // Make the cache exterior immutable.
1906 let cache = cache;
1907
1908 cache.insert(0, ());
1909
1910 assert!(!cache.contains_key(&0));
1911 assert!(cache.get(&0).is_none());
1912 cache.run_pending_tasks();
1913 assert!(!cache.contains_key(&0));
1914 assert!(cache.get(&0).is_none());
1915 assert_eq!(cache.entry_count(), 0)
1916 }
1917
1918 #[test]
1919 fn basic_single_thread() {
1920 // The following `Vec`s will hold actual and expected notifications.
1921 let actual = Arc::new(Mutex::new(Vec::new()));
1922 let mut expected = Vec::new();
1923
1924 // Create an eviction listener.
1925 let a1 = Arc::clone(&actual);
1926 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
1927
1928 // Create a cache with the eviction listener.
1929 let mut cache = Cache::builder()
1930 .max_capacity(3)
1931 .eviction_listener(listener)
1932 .build();
1933 cache.reconfigure_for_testing();
1934
1935 // Make the cache exterior immutable.
1936 let cache = cache;
1937
1938 cache.insert("a", "alice");
1939 cache.insert("b", "bob");
1940 assert_eq!(cache.get(&"a"), Some("alice"));
1941 assert!(cache.contains_key(&"a"));
1942 assert!(cache.contains_key(&"b"));
1943 assert_eq!(cache.get(&"b"), Some("bob"));
1944 cache.run_pending_tasks();
1945 // counts: a -> 1, b -> 1
1946
1947 cache.insert("c", "cindy");
1948 assert_eq!(cache.get(&"c"), Some("cindy"));
1949 assert!(cache.contains_key(&"c"));
1950 // counts: a -> 1, b -> 1, c -> 1
1951 cache.run_pending_tasks();
1952
1953 assert!(cache.contains_key(&"a"));
1954 assert_eq!(cache.get(&"a"), Some("alice"));
1955 assert_eq!(cache.get(&"b"), Some("bob"));
1956 assert!(cache.contains_key(&"b"));
1957 cache.run_pending_tasks();
1958 // counts: a -> 2, b -> 2, c -> 1
1959
1960 // "d" should not be admitted because its frequency is too low.
1961 cache.insert("d", "david"); // count: d -> 0
1962 expected.push((Arc::new("d"), "david", RemovalCause::Size));
1963 cache.run_pending_tasks();
1964 assert_eq!(cache.get(&"d"), None); // d -> 1
1965 assert!(!cache.contains_key(&"d"));
1966
1967 cache.insert("d", "david");
1968 expected.push((Arc::new("d"), "david", RemovalCause::Size));
1969 cache.run_pending_tasks();
1970 assert!(!cache.contains_key(&"d"));
1971 assert_eq!(cache.get(&"d"), None); // d -> 2
1972
1973 // "d" should be admitted and "c" should be evicted
1974 // because d's frequency is higher than c's.
1975 cache.insert("d", "dennis");
1976 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
1977 cache.run_pending_tasks();
1978 assert_eq!(cache.get(&"a"), Some("alice"));
1979 assert_eq!(cache.get(&"b"), Some("bob"));
1980 assert_eq!(cache.get(&"c"), None);
1981 assert_eq!(cache.get(&"d"), Some("dennis"));
1982 assert!(cache.contains_key(&"a"));
1983 assert!(cache.contains_key(&"b"));
1984 assert!(!cache.contains_key(&"c"));
1985 assert!(cache.contains_key(&"d"));
1986
1987 cache.invalidate(&"b");
1988 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
1989 cache.run_pending_tasks();
1990 assert_eq!(cache.get(&"b"), None);
1991 assert!(!cache.contains_key(&"b"));
1992
1993 assert!(cache.remove(&"b").is_none());
1994 assert_eq!(cache.remove(&"d"), Some("dennis"));
1995 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
1996 cache.run_pending_tasks();
1997 assert_eq!(cache.get(&"d"), None);
1998 assert!(!cache.contains_key(&"d"));
1999
2000 verify_notification_vec(&cache, actual, &expected);
2001 assert!(cache.key_locks_map_is_empty());
2002 }
2003
2004 #[test]
2005 fn basic_lru_single_thread() {
2006 // The following `Vec`s will hold actual and expected notifications.
2007 let actual = Arc::new(Mutex::new(Vec::new()));
2008 let mut expected = Vec::new();
2009
2010 // Create an eviction listener.
2011 let a1 = Arc::clone(&actual);
2012 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2013
2014 // Create a cache with the eviction listener.
2015 let mut cache = Cache::builder()
2016 .max_capacity(3)
2017 .eviction_policy(EvictionPolicy::lru())
2018 .eviction_listener(listener)
2019 .build();
2020 cache.reconfigure_for_testing();
2021
2022 // Make the cache exterior immutable.
2023 let cache = cache;
2024
2025 cache.insert("a", "alice");
2026 cache.insert("b", "bob");
2027 assert_eq!(cache.get(&"a"), Some("alice"));
2028 assert!(cache.contains_key(&"a"));
2029 assert!(cache.contains_key(&"b"));
2030 assert_eq!(cache.get(&"b"), Some("bob"));
2031 cache.run_pending_tasks();
2032 // a -> b
2033
2034 cache.insert("c", "cindy");
2035 assert_eq!(cache.get(&"c"), Some("cindy"));
2036 assert!(cache.contains_key(&"c"));
2037 cache.run_pending_tasks();
2038 // a -> b -> c
2039
2040 assert!(cache.contains_key(&"a"));
2041 assert_eq!(cache.get(&"a"), Some("alice"));
2042 assert_eq!(cache.get(&"b"), Some("bob"));
2043 assert!(cache.contains_key(&"b"));
2044 cache.run_pending_tasks();
2045 // c -> a -> b
2046
2047 // "d" should be admitted because the cache uses the LRU strategy.
2048 cache.insert("d", "david");
2049 // "c" is the LRU and should have be evicted.
2050 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2051 cache.run_pending_tasks();
2052
2053 assert_eq!(cache.get(&"a"), Some("alice"));
2054 assert_eq!(cache.get(&"b"), Some("bob"));
2055 assert_eq!(cache.get(&"c"), None);
2056 assert_eq!(cache.get(&"d"), Some("david"));
2057 assert!(cache.contains_key(&"a"));
2058 assert!(cache.contains_key(&"b"));
2059 assert!(!cache.contains_key(&"c"));
2060 assert!(cache.contains_key(&"d"));
2061 cache.run_pending_tasks();
2062 // a -> b -> d
2063
2064 cache.invalidate(&"b");
2065 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2066 cache.run_pending_tasks();
2067 // a -> d
2068 assert_eq!(cache.get(&"b"), None);
2069 assert!(!cache.contains_key(&"b"));
2070
2071 assert!(cache.remove(&"b").is_none());
2072 assert_eq!(cache.remove(&"d"), Some("david"));
2073 expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2074 cache.run_pending_tasks();
2075 // a
2076 assert_eq!(cache.get(&"d"), None);
2077 assert!(!cache.contains_key(&"d"));
2078
2079 cache.insert("e", "emily");
2080 cache.insert("f", "frank");
2081 // "a" should be evicted because it is the LRU.
2082 cache.insert("g", "gina");
2083 expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2084 cache.run_pending_tasks();
2085 // e -> f -> g
2086 assert_eq!(cache.get(&"a"), None);
2087 assert_eq!(cache.get(&"e"), Some("emily"));
2088 assert_eq!(cache.get(&"f"), Some("frank"));
2089 assert_eq!(cache.get(&"g"), Some("gina"));
2090 assert!(!cache.contains_key(&"a"));
2091 assert!(cache.contains_key(&"e"));
2092 assert!(cache.contains_key(&"f"));
2093 assert!(cache.contains_key(&"g"));
2094
2095 verify_notification_vec(&cache, actual, &expected);
2096 assert!(cache.key_locks_map_is_empty());
2097 }
2098
2099 #[test]
2100 fn size_aware_eviction() {
2101 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2102
2103 let alice = ("alice", 10);
2104 let bob = ("bob", 15);
2105 let bill = ("bill", 20);
2106 let cindy = ("cindy", 5);
2107 let david = ("david", 15);
2108 let dennis = ("dennis", 15);
2109
2110 // The following `Vec`s will hold actual and expected notifications.
2111 let actual = Arc::new(Mutex::new(Vec::new()));
2112 let mut expected = Vec::new();
2113
2114 // Create an eviction listener.
2115 let a1 = Arc::clone(&actual);
2116 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2117
2118 // Create a cache with the eviction listener.
2119 let mut cache = Cache::builder()
2120 .max_capacity(31)
2121 .weigher(weigher)
2122 .eviction_listener(listener)
2123 .build();
2124 cache.reconfigure_for_testing();
2125
2126 // Make the cache exterior immutable.
2127 let cache = cache;
2128
2129 cache.insert("a", alice);
2130 cache.insert("b", bob);
2131 assert_eq!(cache.get(&"a"), Some(alice));
2132 assert!(cache.contains_key(&"a"));
2133 assert!(cache.contains_key(&"b"));
2134 assert_eq!(cache.get(&"b"), Some(bob));
2135 cache.run_pending_tasks();
2136 // order (LRU -> MRU) and counts: a -> 1, b -> 1
2137
2138 cache.insert("c", cindy);
2139 assert_eq!(cache.get(&"c"), Some(cindy));
2140 assert!(cache.contains_key(&"c"));
2141 // order and counts: a -> 1, b -> 1, c -> 1
2142 cache.run_pending_tasks();
2143
2144 assert!(cache.contains_key(&"a"));
2145 assert_eq!(cache.get(&"a"), Some(alice));
2146 assert_eq!(cache.get(&"b"), Some(bob));
2147 assert!(cache.contains_key(&"b"));
2148 cache.run_pending_tasks();
2149 // order and counts: c -> 1, a -> 2, b -> 2
2150
2151 // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2152 // "d" must have higher count than 3, which is the aggregated count
2153 // of "a" and "c".
2154 cache.insert("d", david); // count: d -> 0
2155 expected.push((Arc::new("d"), david, RemovalCause::Size));
2156 cache.run_pending_tasks();
2157 assert_eq!(cache.get(&"d"), None); // d -> 1
2158 assert!(!cache.contains_key(&"d"));
2159
2160 cache.insert("d", david);
2161 expected.push((Arc::new("d"), david, RemovalCause::Size));
2162 cache.run_pending_tasks();
2163 assert!(!cache.contains_key(&"d"));
2164 assert_eq!(cache.get(&"d"), None); // d -> 2
2165
2166 cache.insert("d", david);
2167 expected.push((Arc::new("d"), david, RemovalCause::Size));
2168 cache.run_pending_tasks();
2169 assert_eq!(cache.get(&"d"), None); // d -> 3
2170 assert!(!cache.contains_key(&"d"));
2171
2172 cache.insert("d", david);
2173 expected.push((Arc::new("d"), david, RemovalCause::Size));
2174 cache.run_pending_tasks();
2175 assert!(!cache.contains_key(&"d"));
2176 assert_eq!(cache.get(&"d"), None); // d -> 4
2177
2178 // Finally "d" should be admitted by evicting "c" and "a".
2179 cache.insert("d", dennis);
2180 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2181 expected.push((Arc::new("a"), alice, RemovalCause::Size));
2182 cache.run_pending_tasks();
2183 assert_eq!(cache.get(&"a"), None);
2184 assert_eq!(cache.get(&"b"), Some(bob));
2185 assert_eq!(cache.get(&"c"), None);
2186 assert_eq!(cache.get(&"d"), Some(dennis));
2187 assert!(!cache.contains_key(&"a"));
2188 assert!(cache.contains_key(&"b"));
2189 assert!(!cache.contains_key(&"c"));
2190 assert!(cache.contains_key(&"d"));
2191
2192 // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2193 cache.insert("b", bill);
2194 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2195 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2196 cache.run_pending_tasks();
2197 assert_eq!(cache.get(&"b"), Some(bill));
2198 assert_eq!(cache.get(&"d"), None);
2199 assert!(cache.contains_key(&"b"));
2200 assert!(!cache.contains_key(&"d"));
2201
2202 // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2203 cache.insert("a", alice);
2204 cache.insert("b", bob);
2205 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2206 cache.run_pending_tasks();
2207 assert_eq!(cache.get(&"a"), Some(alice));
2208 assert_eq!(cache.get(&"b"), Some(bob));
2209 assert_eq!(cache.get(&"d"), None);
2210 assert!(cache.contains_key(&"a"));
2211 assert!(cache.contains_key(&"b"));
2212 assert!(!cache.contains_key(&"d"));
2213
2214 // Verify the sizes.
2215 assert_eq!(cache.entry_count(), 2);
2216 assert_eq!(cache.weighted_size(), 25);
2217
2218 verify_notification_vec(&cache, actual, &expected);
2219 assert!(cache.key_locks_map_is_empty());
2220 }
2221
2222 #[test]
2223 fn basic_multi_threads() {
2224 let num_threads = 4;
2225 let cache = Cache::new(100);
2226
2227 // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
2228 #[allow(clippy::needless_collect)]
2229 let handles = (0..num_threads)
2230 .map(|id| {
2231 let cache = cache.clone();
2232 std::thread::spawn(move || {
2233 cache.insert(10, format!("{id}-100"));
2234 cache.get(&10);
2235 cache.insert(20, format!("{id}-200"));
2236 cache.invalidate(&10);
2237 })
2238 })
2239 .collect::<Vec<_>>();
2240
2241 handles.into_iter().for_each(|h| h.join().expect("Failed"));
2242
2243 assert!(cache.get(&10).is_none());
2244 assert!(cache.get(&20).is_some());
2245 assert!(!cache.contains_key(&10));
2246 assert!(cache.contains_key(&20));
2247 }
2248
2249 #[test]
2250 fn invalidate_all() {
2251 // The following `Vec`s will hold actual and expected notifications.
2252 let actual = Arc::new(Mutex::new(Vec::new()));
2253 let mut expected = Vec::new();
2254
2255 // Create an eviction listener.
2256 let a1 = Arc::clone(&actual);
2257 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2258
2259 // Create a cache with the eviction listener.
2260 let mut cache = Cache::builder()
2261 .max_capacity(100)
2262 .eviction_listener(listener)
2263 .build();
2264 cache.reconfigure_for_testing();
2265
2266 // Make the cache exterior immutable.
2267 let cache = cache;
2268
2269 cache.insert("a", "alice");
2270 cache.insert("b", "bob");
2271 cache.insert("c", "cindy");
2272 assert_eq!(cache.get(&"a"), Some("alice"));
2273 assert_eq!(cache.get(&"b"), Some("bob"));
2274 assert_eq!(cache.get(&"c"), Some("cindy"));
2275 assert!(cache.contains_key(&"a"));
2276 assert!(cache.contains_key(&"b"));
2277 assert!(cache.contains_key(&"c"));
2278
2279 // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
2280 // modified timestamp of the entries were updated when they were inserted.
2281 // https://github.com/moka-rs/moka/issues/155
2282
2283 cache.invalidate_all();
2284 expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2285 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2286 expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2287 cache.run_pending_tasks();
2288
2289 cache.insert("d", "david");
2290 cache.run_pending_tasks();
2291
2292 assert!(cache.get(&"a").is_none());
2293 assert!(cache.get(&"b").is_none());
2294 assert!(cache.get(&"c").is_none());
2295 assert_eq!(cache.get(&"d"), Some("david"));
2296 assert!(!cache.contains_key(&"a"));
2297 assert!(!cache.contains_key(&"b"));
2298 assert!(!cache.contains_key(&"c"));
2299 assert!(cache.contains_key(&"d"));
2300
2301 verify_notification_vec(&cache, actual, &expected);
2302 }
2303
2304 #[test]
2305 fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2306 use std::collections::HashSet;
2307
2308 // The following `Vec`s will hold actual and expected notifications.
2309 let actual = Arc::new(Mutex::new(Vec::new()));
2310 let mut expected = Vec::new();
2311
2312 // Create an eviction listener.
2313 let a1 = Arc::clone(&actual);
2314 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2315
2316 let (clock, mock) = Clock::mock();
2317
2318 // Create a cache with the eviction listener.
2319 let mut cache = Cache::builder()
2320 .max_capacity(100)
2321 .support_invalidation_closures()
2322 .eviction_listener(listener)
2323 .clock(clock)
2324 .build();
2325 cache.reconfigure_for_testing();
2326
2327 // Make the cache exterior immutable.
2328 let cache = cache;
2329
2330 cache.insert(0, "alice");
2331 cache.insert(1, "bob");
2332 cache.insert(2, "alex");
2333 cache.run_pending_tasks();
2334
2335 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2336 cache.run_pending_tasks();
2337
2338 assert_eq!(cache.get(&0), Some("alice"));
2339 assert_eq!(cache.get(&1), Some("bob"));
2340 assert_eq!(cache.get(&2), Some("alex"));
2341 assert!(cache.contains_key(&0));
2342 assert!(cache.contains_key(&1));
2343 assert!(cache.contains_key(&2));
2344
2345 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2346 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2347 assert_eq!(cache.base.invalidation_predicate_count(), 1);
2348 expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2349 expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2350
2351 mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2352
2353 cache.insert(3, "alice");
2354
2355 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2356 cache.run_pending_tasks(); // To submit the invalidation task.
2357 std::thread::sleep(Duration::from_millis(200));
2358 cache.run_pending_tasks(); // To process the task result.
2359 std::thread::sleep(Duration::from_millis(200));
2360
2361 assert!(cache.get(&0).is_none());
2362 assert!(cache.get(&2).is_none());
2363 assert_eq!(cache.get(&1), Some("bob"));
2364 // This should survive as it was inserted after calling invalidate_entries_if.
2365 assert_eq!(cache.get(&3), Some("alice"));
2366
2367 assert!(!cache.contains_key(&0));
2368 assert!(cache.contains_key(&1));
2369 assert!(!cache.contains_key(&2));
2370 assert!(cache.contains_key(&3));
2371
2372 assert_eq!(cache.entry_count(), 2);
2373 assert_eq!(cache.invalidation_predicate_count(), 0);
2374
2375 mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2376
2377 cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2378 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2379 assert_eq!(cache.invalidation_predicate_count(), 2);
2380 // key 1 was inserted before key 3.
2381 expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2382 expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2383
2384 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2385 cache.run_pending_tasks(); // To submit the invalidation task.
2386 std::thread::sleep(Duration::from_millis(200));
2387 cache.run_pending_tasks(); // To process the task result.
2388 std::thread::sleep(Duration::from_millis(200));
2389
2390 assert!(cache.get(&1).is_none());
2391 assert!(cache.get(&3).is_none());
2392
2393 assert!(!cache.contains_key(&1));
2394 assert!(!cache.contains_key(&3));
2395
2396 assert_eq!(cache.entry_count(), 0);
2397 assert_eq!(cache.invalidation_predicate_count(), 0);
2398
2399 verify_notification_vec(&cache, actual, &expected);
2400
2401 Ok(())
2402 }
2403
2404 #[test]
2405 fn time_to_live() {
2406 // The following `Vec`s will hold actual and expected notifications.
2407 let actual = Arc::new(Mutex::new(Vec::new()));
2408 let mut expected = Vec::new();
2409
2410 // Create an eviction listener.
2411 let a1 = Arc::clone(&actual);
2412 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2413
2414 let (clock, mock) = Clock::mock();
2415
2416 // Create a cache with the eviction listener.
2417 let mut cache = Cache::builder()
2418 .max_capacity(100)
2419 .time_to_live(Duration::from_secs(10))
2420 .eviction_listener(listener)
2421 .clock(clock)
2422 .build();
2423 cache.reconfigure_for_testing();
2424
2425 // Make the cache exterior immutable.
2426 let cache = cache;
2427
2428 cache.insert("a", "alice");
2429 cache.run_pending_tasks();
2430
2431 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2432 cache.run_pending_tasks();
2433
2434 assert_eq!(cache.get(&"a"), Some("alice"));
2435 assert!(cache.contains_key(&"a"));
2436
2437 mock.increment(Duration::from_secs(5)); // 10 secs.
2438 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2439 assert_eq!(cache.get(&"a"), None);
2440 assert!(!cache.contains_key(&"a"));
2441
2442 assert_eq!(cache.iter().count(), 0);
2443
2444 cache.run_pending_tasks();
2445 assert!(cache.is_table_empty());
2446
2447 cache.insert("b", "bob");
2448 cache.run_pending_tasks();
2449
2450 assert_eq!(cache.entry_count(), 1);
2451
2452 mock.increment(Duration::from_secs(5)); // 15 secs.
2453 cache.run_pending_tasks();
2454
2455 assert_eq!(cache.get(&"b"), Some("bob"));
2456 assert!(cache.contains_key(&"b"));
2457 assert_eq!(cache.entry_count(), 1);
2458
2459 cache.insert("b", "bill");
2460 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2461 cache.run_pending_tasks();
2462
2463 mock.increment(Duration::from_secs(5)); // 20 secs
2464 cache.run_pending_tasks();
2465
2466 assert_eq!(cache.get(&"b"), Some("bill"));
2467 assert!(cache.contains_key(&"b"));
2468 assert_eq!(cache.entry_count(), 1);
2469
2470 mock.increment(Duration::from_secs(5)); // 25 secs
2471 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2472
2473 assert_eq!(cache.get(&"a"), None);
2474 assert_eq!(cache.get(&"b"), None);
2475 assert!(!cache.contains_key(&"a"));
2476 assert!(!cache.contains_key(&"b"));
2477
2478 assert_eq!(cache.iter().count(), 0);
2479
2480 cache.run_pending_tasks();
2481 assert!(cache.is_table_empty());
2482
2483 verify_notification_vec(&cache, actual, &expected);
2484 }
2485
2486 #[test]
2487 fn time_to_idle() {
2488 // The following `Vec`s will hold actual and expected notifications.
2489 let actual = Arc::new(Mutex::new(Vec::new()));
2490 let mut expected = Vec::new();
2491
2492 // Create an eviction listener.
2493 let a1 = Arc::clone(&actual);
2494 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2495
2496 let (clock, mock) = Clock::mock();
2497
2498 // Create a cache with the eviction listener.
2499 let mut cache = Cache::builder()
2500 .max_capacity(100)
2501 .time_to_idle(Duration::from_secs(10))
2502 .eviction_listener(listener)
2503 .clock(clock)
2504 .build();
2505 cache.reconfigure_for_testing();
2506
2507 // Make the cache exterior immutable.
2508 let cache = cache;
2509
2510 cache.insert("a", "alice");
2511 cache.run_pending_tasks();
2512
2513 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2514 cache.run_pending_tasks();
2515
2516 assert_eq!(cache.get(&"a"), Some("alice"));
2517
2518 mock.increment(Duration::from_secs(5)); // 10 secs.
2519 cache.run_pending_tasks();
2520
2521 cache.insert("b", "bob");
2522 cache.run_pending_tasks();
2523
2524 assert_eq!(cache.entry_count(), 2);
2525
2526 mock.increment(Duration::from_secs(2)); // 12 secs.
2527 cache.run_pending_tasks();
2528
2529 // contains_key does not reset the idle timer for the key.
2530 assert!(cache.contains_key(&"a"));
2531 assert!(cache.contains_key(&"b"));
2532 cache.run_pending_tasks();
2533
2534 assert_eq!(cache.entry_count(), 2);
2535
2536 mock.increment(Duration::from_secs(3)); // 15 secs.
2537 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2538
2539 assert_eq!(cache.get(&"a"), None);
2540 assert_eq!(cache.get(&"b"), Some("bob"));
2541 assert!(!cache.contains_key(&"a"));
2542 assert!(cache.contains_key(&"b"));
2543
2544 assert_eq!(cache.iter().count(), 1);
2545
2546 cache.run_pending_tasks();
2547 assert_eq!(cache.entry_count(), 1);
2548
2549 mock.increment(Duration::from_secs(10)); // 25 secs
2550 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2551
2552 assert_eq!(cache.get(&"a"), None);
2553 assert_eq!(cache.get(&"b"), None);
2554 assert!(!cache.contains_key(&"a"));
2555 assert!(!cache.contains_key(&"b"));
2556
2557 assert_eq!(cache.iter().count(), 0);
2558
2559 cache.run_pending_tasks();
2560 assert!(cache.is_table_empty());
2561
2562 verify_notification_vec(&cache, actual, &expected);
2563 }
2564
2565 // https://github.com/moka-rs/moka/issues/359
2566 #[test]
2567 fn ensure_access_time_is_updated_immediately_after_read() {
2568 let (clock, mock) = Clock::mock();
2569 let mut cache = Cache::builder()
2570 .max_capacity(10)
2571 .time_to_idle(Duration::from_secs(5))
2572 .clock(clock)
2573 .build();
2574 cache.reconfigure_for_testing();
2575
2576 // Make the cache exterior immutable.
2577 let cache = cache;
2578
2579 cache.insert(1, 1);
2580
2581 mock.increment(Duration::from_secs(4));
2582 assert_eq!(cache.get(&1), Some(1));
2583
2584 mock.increment(Duration::from_secs(2));
2585 assert_eq!(cache.get(&1), Some(1));
2586 cache.run_pending_tasks();
2587 assert_eq!(cache.get(&1), Some(1));
2588 }
2589
2590 #[test]
2591 fn time_to_live_by_expiry_type() {
2592 // Define an expiry type.
2593 struct MyExpiry {
2594 counters: Arc<ExpiryCallCounters>,
2595 }
2596
2597 impl MyExpiry {
2598 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2599 Self { counters }
2600 }
2601 }
2602
2603 impl Expiry<&str, &str> for MyExpiry {
2604 fn expire_after_create(
2605 &self,
2606 _key: &&str,
2607 _value: &&str,
2608 _current_time: StdInstant,
2609 ) -> Option<Duration> {
2610 self.counters.incl_actual_creations();
2611 Some(Duration::from_secs(10))
2612 }
2613
2614 fn expire_after_update(
2615 &self,
2616 _key: &&str,
2617 _value: &&str,
2618 _current_time: StdInstant,
2619 _current_duration: Option<Duration>,
2620 ) -> Option<Duration> {
2621 self.counters.incl_actual_updates();
2622 Some(Duration::from_secs(10))
2623 }
2624 }
2625
2626 // The following `Vec`s will hold actual and expected notifications.
2627 let actual = Arc::new(Mutex::new(Vec::new()));
2628 let mut expected = Vec::new();
2629
2630 // Create expiry counters and the expiry.
2631 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2632 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2633
2634 // Create an eviction listener.
2635 let a1 = Arc::clone(&actual);
2636 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2637
2638 let (clock, mock) = Clock::mock();
2639
2640 // Create a cache with the eviction listener.
2641 let mut cache = Cache::builder()
2642 .max_capacity(100)
2643 .expire_after(expiry)
2644 .eviction_listener(listener)
2645 .clock(clock)
2646 .build();
2647 cache.reconfigure_for_testing();
2648
2649 // Make the cache exterior immutable.
2650 let cache = cache;
2651
2652 cache.insert("a", "alice");
2653 expiry_counters.incl_expected_creations();
2654 cache.run_pending_tasks();
2655
2656 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2657 cache.run_pending_tasks();
2658
2659 assert_eq!(cache.get(&"a"), Some("alice"));
2660 assert!(cache.contains_key(&"a"));
2661
2662 mock.increment(Duration::from_secs(5)); // 10 secs.
2663 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2664 assert_eq!(cache.get(&"a"), None);
2665 assert!(!cache.contains_key(&"a"));
2666
2667 assert_eq!(cache.iter().count(), 0);
2668
2669 cache.run_pending_tasks();
2670 assert!(cache.is_table_empty());
2671
2672 cache.insert("b", "bob");
2673 expiry_counters.incl_expected_creations();
2674 cache.run_pending_tasks();
2675
2676 assert_eq!(cache.entry_count(), 1);
2677
2678 mock.increment(Duration::from_secs(5)); // 15 secs.
2679 cache.run_pending_tasks();
2680
2681 assert_eq!(cache.get(&"b"), Some("bob"));
2682 assert!(cache.contains_key(&"b"));
2683 assert_eq!(cache.entry_count(), 1);
2684
2685 cache.insert("b", "bill");
2686 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2687 expiry_counters.incl_expected_updates();
2688 cache.run_pending_tasks();
2689
2690 mock.increment(Duration::from_secs(5)); // 20 secs
2691 cache.run_pending_tasks();
2692
2693 assert_eq!(cache.get(&"b"), Some("bill"));
2694 assert!(cache.contains_key(&"b"));
2695 assert_eq!(cache.entry_count(), 1);
2696
2697 mock.increment(Duration::from_secs(5)); // 25 secs
2698 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2699
2700 assert_eq!(cache.get(&"a"), None);
2701 assert_eq!(cache.get(&"b"), None);
2702 assert!(!cache.contains_key(&"a"));
2703 assert!(!cache.contains_key(&"b"));
2704
2705 assert_eq!(cache.iter().count(), 0);
2706
2707 cache.run_pending_tasks();
2708 assert!(cache.is_table_empty());
2709
2710 expiry_counters.verify();
2711 verify_notification_vec(&cache, actual, &expected);
2712 }
2713
2714 #[test]
2715 fn time_to_idle_by_expiry_type() {
2716 // Define an expiry type.
2717 struct MyExpiry {
2718 counters: Arc<ExpiryCallCounters>,
2719 }
2720
2721 impl MyExpiry {
2722 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2723 Self { counters }
2724 }
2725 }
2726
2727 impl Expiry<&str, &str> for MyExpiry {
2728 fn expire_after_read(
2729 &self,
2730 _key: &&str,
2731 _value: &&str,
2732 _current_time: StdInstant,
2733 _current_duration: Option<Duration>,
2734 _last_modified_at: StdInstant,
2735 ) -> Option<Duration> {
2736 self.counters.incl_actual_reads();
2737 Some(Duration::from_secs(10))
2738 }
2739 }
2740
2741 // The following `Vec`s will hold actual and expected notifications.
2742 let actual = Arc::new(Mutex::new(Vec::new()));
2743 let mut expected = Vec::new();
2744
2745 // Create expiry counters and the expiry.
2746 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2747 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2748
2749 // Create an eviction listener.
2750 let a1 = Arc::clone(&actual);
2751 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2752
2753 let (clock, mock) = Clock::mock();
2754
2755 // Create a cache with the eviction listener.
2756 let mut cache = Cache::builder()
2757 .max_capacity(100)
2758 .expire_after(expiry)
2759 .eviction_listener(listener)
2760 .clock(clock)
2761 .build();
2762 cache.reconfigure_for_testing();
2763
2764 // Make the cache exterior immutable.
2765 let cache = cache;
2766
2767 cache.insert("a", "alice");
2768 cache.run_pending_tasks();
2769
2770 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2771 cache.run_pending_tasks();
2772
2773 assert_eq!(cache.get(&"a"), Some("alice"));
2774 expiry_counters.incl_expected_reads();
2775
2776 mock.increment(Duration::from_secs(5)); // 10 secs.
2777 cache.run_pending_tasks();
2778
2779 cache.insert("b", "bob");
2780 cache.run_pending_tasks();
2781
2782 assert_eq!(cache.entry_count(), 2);
2783
2784 mock.increment(Duration::from_secs(2)); // 12 secs.
2785 cache.run_pending_tasks();
2786
2787 // contains_key does not reset the idle timer for the key.
2788 assert!(cache.contains_key(&"a"));
2789 assert!(cache.contains_key(&"b"));
2790 cache.run_pending_tasks();
2791
2792 assert_eq!(cache.entry_count(), 2);
2793
2794 mock.increment(Duration::from_secs(3)); // 15 secs.
2795 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2796
2797 assert_eq!(cache.get(&"a"), None);
2798 assert_eq!(cache.get(&"b"), Some("bob"));
2799 expiry_counters.incl_expected_reads();
2800 assert!(!cache.contains_key(&"a"));
2801 assert!(cache.contains_key(&"b"));
2802
2803 assert_eq!(cache.iter().count(), 1);
2804
2805 cache.run_pending_tasks();
2806 assert_eq!(cache.entry_count(), 1);
2807
2808 mock.increment(Duration::from_secs(10)); // 25 secs
2809 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2810
2811 assert_eq!(cache.get(&"a"), None);
2812 assert_eq!(cache.get(&"b"), None);
2813 assert!(!cache.contains_key(&"a"));
2814 assert!(!cache.contains_key(&"b"));
2815
2816 assert_eq!(cache.iter().count(), 0);
2817
2818 cache.run_pending_tasks();
2819 assert!(cache.is_table_empty());
2820
2821 expiry_counters.verify();
2822 verify_notification_vec(&cache, actual, &expected);
2823 }
2824
2825 /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
2826 /// only when the key was already present in the cache.
2827 #[test]
2828 fn test_expiry_using_get_with() {
2829 // Define an expiry type, which always return `None`.
2830 struct NoExpiry {
2831 counters: Arc<ExpiryCallCounters>,
2832 }
2833
2834 impl NoExpiry {
2835 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2836 Self { counters }
2837 }
2838 }
2839
2840 impl Expiry<&str, &str> for NoExpiry {
2841 fn expire_after_create(
2842 &self,
2843 _key: &&str,
2844 _value: &&str,
2845 _current_time: StdInstant,
2846 ) -> Option<Duration> {
2847 self.counters.incl_actual_creations();
2848 None
2849 }
2850
2851 fn expire_after_read(
2852 &self,
2853 _key: &&str,
2854 _value: &&str,
2855 _current_time: StdInstant,
2856 _current_duration: Option<Duration>,
2857 _last_modified_at: StdInstant,
2858 ) -> Option<Duration> {
2859 self.counters.incl_actual_reads();
2860 None
2861 }
2862
2863 fn expire_after_update(
2864 &self,
2865 _key: &&str,
2866 _value: &&str,
2867 _current_time: StdInstant,
2868 _current_duration: Option<Duration>,
2869 ) -> Option<Duration> {
2870 unreachable!("The `expire_after_update()` method should not be called.");
2871 }
2872 }
2873
2874 // Create expiry counters and the expiry.
2875 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2876 let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
2877
2878 // Create a cache with the expiry and eviction listener.
2879 let mut cache = Cache::builder()
2880 .max_capacity(100)
2881 .expire_after(expiry)
2882 .build();
2883 cache.reconfigure_for_testing();
2884
2885 // Make the cache exterior immutable.
2886 let cache = cache;
2887
2888 // The key is not present.
2889 cache.get_with("a", || "alice");
2890 expiry_counters.incl_expected_creations();
2891 cache.run_pending_tasks();
2892
2893 // The key is present.
2894 cache.get_with("a", || "alex");
2895 expiry_counters.incl_expected_reads();
2896 cache.run_pending_tasks();
2897
2898 // The key is not present.
2899 cache.invalidate("a");
2900 cache.get_with("a", || "amanda");
2901 expiry_counters.incl_expected_creations();
2902 cache.run_pending_tasks();
2903
2904 expiry_counters.verify();
2905 }
2906
2907 /// Test that returning `None` from `expire_after_update` properly clears
2908 /// expiration for an already expired entry.
2909 ///
2910 /// Expected behavior: After `expire_after_update` returns `None`, the entry
2911 /// should become accessible via `get()`.
2912 #[test]
2913 fn expire_after_update_none_on_expired_entry() {
2914 use std::sync::atomic::{AtomicBool, Ordering};
2915
2916 // A flag to control whether the entry should expire immediately
2917 let should_expire = Arc::new(AtomicBool::new(true));
2918
2919 struct TestExpiry {
2920 should_expire: Arc<AtomicBool>,
2921 }
2922
2923 impl crate::Expiry<String, String> for TestExpiry {
2924 fn expire_after_create(
2925 &self,
2926 _key: &String,
2927 _value: &String,
2928 _current_time: StdInstant,
2929 ) -> Option<Duration> {
2930 if self.should_expire.load(Ordering::SeqCst) {
2931 // Expire immediately
2932 Some(Duration::ZERO)
2933 } else {
2934 // No expiration
2935 None
2936 }
2937 }
2938
2939 fn expire_after_update(
2940 &self,
2941 _key: &String,
2942 _value: &String,
2943 _current_time: StdInstant,
2944 _duration_until_expiry: Option<Duration>,
2945 ) -> Option<Duration> {
2946 if self.should_expire.load(Ordering::SeqCst) {
2947 Some(Duration::ZERO)
2948 } else {
2949 // According to docs, None means "no expiration"
2950 // This should make the entry accessible again
2951 None
2952 }
2953 }
2954 }
2955
2956 let expiry = TestExpiry {
2957 should_expire: Arc::clone(&should_expire),
2958 };
2959
2960 let cache: Cache<String, String> = Cache::builder()
2961 .max_capacity(100)
2962 .expire_after(expiry)
2963 .build();
2964
2965 let key = "test_key".to_string();
2966
2967 // Insert entry that expires immediately
2968 cache.insert(key.clone(), "first_value".to_string());
2969 cache.run_pending_tasks();
2970
2971 // Entry should exist but be expired
2972 assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
2973 assert_eq!(
2974 cache.get(&key),
2975 None,
2976 "Entry should not be accessible (expired)"
2977 );
2978 cache.run_pending_tasks();
2979
2980 // Now update the entry to NOT expire (expire_after_update returns None)
2981 should_expire.store(false, Ordering::SeqCst);
2982 cache.insert(key.clone(), "second_value".to_string());
2983 cache.run_pending_tasks();
2984
2985 // The entry should now be accessible since we returned None
2986 // (meaning "no expiration") from expire_after_update
2987 assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
2988
2989 // Returning None from expire_after_update should clear the expiration.
2990 let result = cache.get(&key);
2991 assert_eq!(
2992 result,
2993 Some("second_value".to_string()),
2994 "Entry should be accessible after clearing expiration"
2995 );
2996 }
2997
2998 // https://github.com/moka-rs/moka/issues/575
2999 //
3000 // Regression test: with a custom `Expiry` that only overrides
3001 // `expire_after_create`, re-inserting an expired key should call
3002 // `expire_after_create` (not `expire_after_update`), matching the
3003 // documented behavior that `expire_after_create` handles new entries.
3004 //
3005 // Before the fix, the expired entry in the hash table caused the update
3006 // path to run, calling `expire_after_update` (default returns
3007 // `duration_until_expiry`). The `else if` branch then cleared per-entry
3008 // expiration, making the entry immortal (stale).
3009 #[test]
3010 fn test_expire_after_create_only_no_stale_entries() {
3011 use std::sync::atomic::{AtomicBool, Ordering};
3012
3013 struct TestExpiry {
3014 should_expire: Arc<AtomicBool>,
3015 }
3016
3017 impl crate::Expiry<String, String> for TestExpiry {
3018 fn expire_after_create(
3019 &self,
3020 _key: &String,
3021 _value: &String,
3022 _current_time: StdInstant,
3023 ) -> Option<Duration> {
3024 if self.should_expire.load(Ordering::SeqCst) {
3025 Some(Duration::from_secs(1))
3026 } else {
3027 None
3028 }
3029 }
3030 // expire_after_read and expire_after_update use defaults (return
3031 // duration_until_expiry unchanged).
3032 }
3033
3034 let (clock, mock) = Clock::mock();
3035 let should_expire = Arc::new(AtomicBool::new(true));
3036
3037 let mut cache: Cache<String, String> = Cache::builder()
3038 .max_capacity(100)
3039 .expire_after(TestExpiry {
3040 should_expire: Arc::clone(&should_expire),
3041 })
3042 .clock(clock)
3043 .build();
3044 cache.reconfigure_for_testing();
3045
3046 let key = "key1".to_string();
3047
3048 // Insert an entry that expires in 1 second.
3049 cache.insert(key.clone(), "value1".to_string());
3050 cache.run_pending_tasks();
3051 assert_eq!(cache.get(&key), Some("value1".to_string()));
3052
3053 // Advance time past expiration.
3054 mock.increment(Duration::from_secs(2));
3055
3056 // Entry should be expired but still in the hash table (no
3057 // run_pending_tasks to evict it).
3058 assert_eq!(cache.get(&key), None, "Entry should be expired");
3059
3060 // Re-insert with no expiration. The old entry is expired, so this
3061 // calls expire_after_create (not expire_after_update).
3062 // expire_after_create returns None (no expiry) → entry lives forever.
3063 should_expire.store(false, Ordering::SeqCst);
3064 cache.insert(key.clone(), "value2".to_string());
3065 cache.run_pending_tasks();
3066
3067 // The entry should be accessible (expire_after_create cleared expiry).
3068 assert_eq!(
3069 cache.get(&key),
3070 Some("value2".to_string()),
3071 "Re-inserted entry with no expiry should be accessible"
3072 );
3073
3074 // Now test: re-insert an expired key with expiration enabled.
3075 // Verify it gets a fresh TTL from expire_after_create.
3076 should_expire.store(true, Ordering::SeqCst);
3077
3078 // Insert key2 with 1s expiry.
3079 let key2 = "key2".to_string();
3080 cache.insert(key2.clone(), "v1".to_string());
3081 cache.run_pending_tasks();
3082 assert_eq!(cache.get(&key2), Some("v1".to_string()));
3083
3084 // Expire it.
3085 mock.increment(Duration::from_secs(2));
3086 assert_eq!(cache.get(&key2), None, "key2 should be expired");
3087
3088 // Re-insert → calls expire_after_create → fresh 1s TTL.
3089 cache.insert(key2.clone(), "v2".to_string());
3090 cache.run_pending_tasks();
3091 assert_eq!(
3092 cache.get(&key2),
3093 Some("v2".to_string()),
3094 "Re-inserted key2 should be accessible with fresh TTL"
3095 );
3096
3097 // Advance past the fresh 1s TTL.
3098 mock.increment(Duration::from_secs(2));
3099 assert_eq!(
3100 cache.get(&key2),
3101 None,
3102 "key2 should expire with fresh TTL from expire_after_create"
3103 );
3104 }
3105
3106 // https://github.com/moka-rs/moka/issues/345
3107 #[test]
3108 fn test_race_between_updating_entry_and_processing_its_write_ops() {
3109 let (clock, mock) = Clock::mock();
3110 let cache = Cache::builder()
3111 .max_capacity(2)
3112 .time_to_idle(Duration::from_secs(1))
3113 .clock(clock)
3114 .build();
3115
3116 cache.insert("a", "alice");
3117 cache.insert("b", "bob");
3118 cache.insert("c", "cathy"); // c1
3119 mock.increment(Duration::from_secs(2));
3120
3121 // The following `insert` will do the followings:
3122 // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3123 // 2. Runs the pending tasks implicitly.
3124 // (1) "a" will be admitted.
3125 // (2) "b" will be admitted.
3126 // (3) c1 will be evicted by size constraint.
3127 // (4) "a" will be evicted due to expiration.
3128 // (5) "b" will be evicted due to expiration.
3129 // 3. Send its `WriteOp` log to the channel.
3130 cache.insert("c", "cindy"); // c2
3131
3132 // Remove "c" (c2) from the cht.
3133 assert_eq!(cache.remove(&"c"), Some("cindy")); // c-remove
3134
3135 mock.increment(Duration::from_secs(2));
3136
3137 // The following `run_pending_tasks` will do the followings:
3138 // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3139 // 2. Because of c-remove, removes c2's node from the LRU deque.
3140 cache.run_pending_tasks();
3141 assert_eq!(cache.entry_count(), 0);
3142 }
3143
3144 #[test]
3145 fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3146 let cache = Cache::builder().max_capacity(2).build();
3147
3148 cache.insert('a', "a");
3149 cache.insert('b', "b");
3150 cache.run_pending_tasks();
3151
3152 cache.insert('c', "c1"); // (a) `EntryInfo` 1, gen: 1
3153 assert!(cache.remove(&'a').is_some()); // (b)
3154 assert!(cache.remove(&'b').is_some()); // (c)
3155 assert!(cache.remove(&'c').is_some()); // (d) `EntryInfo` 1, gen: 2
3156 cache.insert('c', "c2"); // (e) `EntryInfo` 2, gen: 1
3157
3158 // Now the `write_op_ch` channel contains the following `WriteOp`s:
3159 //
3160 // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3161 // - 1: (b) remove "a"
3162 // - 2: (c) remove "b"
3163 // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3164 // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3165 //
3166 // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3167 // processing 0 must not remove "c2" from the concurrent hash table. (Their
3168 // gen are the same, but `EntryInfo`s are different)
3169 cache.run_pending_tasks();
3170 assert_eq!(cache.get(&'c'), Some("c2"));
3171 }
3172
3173 #[test]
3174 fn test_iter() {
3175 const NUM_KEYS: usize = 50;
3176
3177 fn make_value(key: usize) -> String {
3178 format!("val: {key}")
3179 }
3180
3181 let cache = Cache::builder()
3182 .max_capacity(100)
3183 .time_to_idle(Duration::from_secs(10))
3184 .build();
3185
3186 for key in 0..NUM_KEYS {
3187 cache.insert(key, make_value(key));
3188 }
3189
3190 let mut key_set = std::collections::HashSet::new();
3191
3192 for (key, value) in &cache {
3193 assert_eq!(value, make_value(*key));
3194
3195 key_set.insert(*key);
3196 }
3197
3198 // Ensure there are no missing or duplicate keys in the iteration.
3199 assert_eq!(key_set.len(), NUM_KEYS);
3200 }
3201
3202 /// Runs 16 threads at the same time and ensures no deadlock occurs.
3203 ///
3204 /// - Eight of the threads will update key-values in the cache.
3205 /// - Eight others will iterate the cache.
3206 ///
3207 #[test]
3208 fn test_iter_multi_threads() {
3209 use std::collections::HashSet;
3210
3211 const NUM_KEYS: usize = 1024;
3212 const NUM_THREADS: usize = 16;
3213
3214 fn make_value(key: usize) -> String {
3215 format!("val: {key}")
3216 }
3217
3218 let cache = Cache::builder()
3219 .max_capacity(2048)
3220 .time_to_idle(Duration::from_secs(10))
3221 .build();
3222
3223 // Initialize the cache.
3224 for key in 0..NUM_KEYS {
3225 cache.insert(key, make_value(key));
3226 }
3227
3228 let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
3229 let write_lock = rw_lock.write().unwrap();
3230
3231 // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
3232 #[allow(clippy::needless_collect)]
3233 let handles = (0..NUM_THREADS)
3234 .map(|n| {
3235 let cache = cache.clone();
3236 let rw_lock = Arc::clone(&rw_lock);
3237
3238 if n % 2 == 0 {
3239 // This thread will update the cache.
3240 std::thread::spawn(move || {
3241 let read_lock = rw_lock.read().unwrap();
3242 for key in 0..NUM_KEYS {
3243 // TODO: Update keys in a random order?
3244 cache.insert(key, make_value(key));
3245 }
3246 std::mem::drop(read_lock);
3247 })
3248 } else {
3249 // This thread will iterate the cache.
3250 std::thread::spawn(move || {
3251 let read_lock = rw_lock.read().unwrap();
3252 let mut key_set = HashSet::new();
3253 for (key, value) in &cache {
3254 assert_eq!(value, make_value(*key));
3255 key_set.insert(*key);
3256 }
3257 // Ensure there are no missing or duplicate keys in the iteration.
3258 assert_eq!(key_set.len(), NUM_KEYS);
3259 std::mem::drop(read_lock);
3260 })
3261 }
3262 })
3263 .collect::<Vec<_>>();
3264
3265 // Let these threads to run by releasing the write lock.
3266 std::mem::drop(write_lock);
3267
3268 handles.into_iter().for_each(|h| h.join().expect("Failed"));
3269
3270 // Ensure there are no missing or duplicate keys in the iteration.
3271 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3272 assert_eq!(key_set.len(), NUM_KEYS);
3273 }
3274
3275 #[test]
3276 fn get_with() {
3277 use std::thread::{sleep, spawn};
3278
3279 let cache = Cache::new(100);
3280 const KEY: u32 = 0;
3281
3282 // This test will run five threads:
3283 //
3284 // Thread1 will be the first thread to call `get_with` for a key, so its init
3285 // closure will be evaluated and then a &str value "thread1" will be inserted
3286 // to the cache.
3287 let thread1 = {
3288 let cache1 = cache.clone();
3289 spawn(move || {
3290 // Call `get_with` immediately.
3291 let v = cache1.get_with(KEY, || {
3292 // Wait for 300 ms and return a &str value.
3293 sleep(Duration::from_millis(300));
3294 "thread1"
3295 });
3296 assert_eq!(v, "thread1");
3297 })
3298 };
3299
3300 // Thread2 will be the second thread to call `get_with` for the same key, so
3301 // its init closure will not be evaluated. Once thread1's init closure
3302 // finishes, it will get the value inserted by thread1's init closure.
3303 let thread2 = {
3304 let cache2 = cache.clone();
3305 spawn(move || {
3306 // Wait for 100 ms before calling `get_with`.
3307 sleep(Duration::from_millis(100));
3308 let v = cache2.get_with(KEY, || unreachable!());
3309 assert_eq!(v, "thread1");
3310 })
3311 };
3312
3313 // Thread3 will be the third thread to call `get_with` for the same key. By
3314 // the time it calls, thread1's init closure should have finished already and
3315 // the value should be already inserted to the cache. So its init closure
3316 // will not be evaluated and will get the value insert by thread1's init
3317 // closure immediately.
3318 let thread3 = {
3319 let cache3 = cache.clone();
3320 spawn(move || {
3321 // Wait for 400 ms before calling `get_with`.
3322 sleep(Duration::from_millis(400));
3323 let v = cache3.get_with(KEY, || unreachable!());
3324 assert_eq!(v, "thread1");
3325 })
3326 };
3327
3328 // Thread4 will call `get` for the same key. It will call when thread1's init
3329 // closure is still running, so it will get none for the key.
3330 let thread4 = {
3331 let cache4 = cache.clone();
3332 spawn(move || {
3333 // Wait for 200 ms before calling `get`.
3334 sleep(Duration::from_millis(200));
3335 let maybe_v = cache4.get(&KEY);
3336 assert!(maybe_v.is_none());
3337 })
3338 };
3339
3340 // Thread5 will call `get` for the same key. It will call after thread1's init
3341 // closure finished, so it will get the value insert by thread1's init closure.
3342 let thread5 = {
3343 let cache5 = cache.clone();
3344 spawn(move || {
3345 // Wait for 400 ms before calling `get`.
3346 sleep(Duration::from_millis(400));
3347 let maybe_v = cache5.get(&KEY);
3348 assert_eq!(maybe_v, Some("thread1"));
3349 })
3350 };
3351
3352 for t in [thread1, thread2, thread3, thread4, thread5] {
3353 t.join().expect("Failed to join");
3354 }
3355
3356 assert!(cache.is_waiter_map_empty());
3357 }
3358
3359 #[test]
3360 fn get_with_by_ref() {
3361 use std::thread::{sleep, spawn};
3362
3363 let cache = Cache::new(100);
3364 const KEY: &u32 = &0;
3365
3366 // This test will run five threads:
3367 //
3368 // Thread1 will be the first thread to call `get_with_by_ref` for a key, so
3369 // its init closure will be evaluated and then a &str value "thread1" will be
3370 // inserted to the cache.
3371 let thread1 = {
3372 let cache1 = cache.clone();
3373 spawn(move || {
3374 // Call `get_with_by_ref` immediately.
3375 let v = cache1.get_with_by_ref(KEY, || {
3376 // Wait for 300 ms and return a &str value.
3377 sleep(Duration::from_millis(300));
3378 "thread1"
3379 });
3380 assert_eq!(v, "thread1");
3381 })
3382 };
3383
3384 // Thread2 will be the second thread to call `get_with_by_ref` for the same
3385 // key, so its init closure will not be evaluated. Once thread1's init
3386 // closure finishes, it will get the value inserted by thread1's init
3387 // closure.
3388 let thread2 = {
3389 let cache2 = cache.clone();
3390 spawn(move || {
3391 // Wait for 100 ms before calling `get_with_by_ref`.
3392 sleep(Duration::from_millis(100));
3393 let v = cache2.get_with_by_ref(KEY, || unreachable!());
3394 assert_eq!(v, "thread1");
3395 })
3396 };
3397
3398 // Thread3 will be the third thread to call `get_with_by_ref` for the same
3399 // key. By the time it calls, thread1's init closure should have finished
3400 // already and the value should be already inserted to the cache. So its init
3401 // closure will not be evaluated and will get the value insert by thread1's
3402 // init closure immediately.
3403 let thread3 = {
3404 let cache3 = cache.clone();
3405 spawn(move || {
3406 // Wait for 400 ms before calling `get_with_by_ref`.
3407 sleep(Duration::from_millis(400));
3408 let v = cache3.get_with_by_ref(KEY, || unreachable!());
3409 assert_eq!(v, "thread1");
3410 })
3411 };
3412
3413 // Thread4 will call `get` for the same key. It will call when thread1's init
3414 // closure is still running, so it will get none for the key.
3415 let thread4 = {
3416 let cache4 = cache.clone();
3417 spawn(move || {
3418 // Wait for 200 ms before calling `get`.
3419 sleep(Duration::from_millis(200));
3420 let maybe_v = cache4.get(KEY);
3421 assert!(maybe_v.is_none());
3422 })
3423 };
3424
3425 // Thread5 will call `get` for the same key. It will call after thread1's init
3426 // closure finished, so it will get the value insert by thread1's init closure.
3427 let thread5 = {
3428 let cache5 = cache.clone();
3429 spawn(move || {
3430 // Wait for 400 ms before calling `get`.
3431 sleep(Duration::from_millis(400));
3432 let maybe_v = cache5.get(KEY);
3433 assert_eq!(maybe_v, Some("thread1"));
3434 })
3435 };
3436
3437 for t in [thread1, thread2, thread3, thread4, thread5] {
3438 t.join().expect("Failed to join");
3439 }
3440
3441 assert!(cache.is_waiter_map_empty());
3442 }
3443
3444 #[test]
3445 fn entry_or_insert_with_if() {
3446 use std::thread::{sleep, spawn};
3447
3448 let cache = Cache::new(100);
3449 const KEY: u32 = 0;
3450
3451 // This test will run seven threads:
3452 //
3453 // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3454 // its init closure will be evaluated and then a &str value "thread1" will be
3455 // inserted to the cache.
3456 let thread1 = {
3457 let cache1 = cache.clone();
3458 spawn(move || {
3459 // Call `get_with` immediately.
3460 let entry = cache1.entry(KEY).or_insert_with_if(
3461 || {
3462 // Wait for 300 ms and return a &str value.
3463 sleep(Duration::from_millis(300));
3464 "thread1"
3465 },
3466 |_v| unreachable!(),
3467 );
3468 // Entry should be fresh because our async block should have been
3469 // evaluated.
3470 assert!(entry.is_fresh());
3471 assert_eq!(entry.into_value(), "thread1");
3472 })
3473 };
3474
3475 // Thread2 will be the second thread to call `or_insert_with_if` for the same
3476 // key, so its init closure will not be evaluated. Once thread1's init
3477 // closure finishes, it will get the value inserted by thread1's init
3478 // closure.
3479 let thread2 = {
3480 let cache2 = cache.clone();
3481 spawn(move || {
3482 // Wait for 100 ms before calling `get_with`.
3483 sleep(Duration::from_millis(100));
3484 let entry = cache2
3485 .entry(KEY)
3486 .or_insert_with_if(|| unreachable!(), |_v| unreachable!());
3487 // Entry should not be fresh because thread1's async block should have
3488 // been evaluated instead of ours.
3489 assert!(!entry.is_fresh());
3490 assert_eq!(entry.into_value(), "thread1");
3491 })
3492 };
3493
3494 // Thread3 will be the third thread to call `or_insert_with_if` for the same
3495 // key. By the time it calls, thread1's init closure should have finished
3496 // already and the value should be already inserted to the cache. Also
3497 // thread3's `replace_if` closure returns `false`. So its init closure will
3498 // not be evaluated and will get the value inserted by thread1's init closure
3499 // immediately.
3500 let thread3 = {
3501 let cache3 = cache.clone();
3502 spawn(move || {
3503 // Wait for 350 ms before calling `or_insert_with_if`.
3504 sleep(Duration::from_millis(350));
3505 let entry = cache3.entry(KEY).or_insert_with_if(
3506 || unreachable!(),
3507 |v| {
3508 assert_eq!(v, &"thread1");
3509 false
3510 },
3511 );
3512 assert!(!entry.is_fresh());
3513 assert_eq!(entry.into_value(), "thread1");
3514 })
3515 };
3516
3517 // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3518 // key. The value should have been already inserted to the cache by thread1.
3519 // However thread4's `replace_if` closure returns `true`. So its init closure
3520 // will be evaluated to replace the current value.
3521 let thread4 = {
3522 let cache4 = cache.clone();
3523 spawn(move || {
3524 // Wait for 400 ms before calling `or_insert_with_if`.
3525 sleep(Duration::from_millis(400));
3526 let entry = cache4.entry(KEY).or_insert_with_if(
3527 || "thread4",
3528 |v| {
3529 assert_eq!(v, &"thread1");
3530 true
3531 },
3532 );
3533 assert!(entry.is_fresh());
3534 assert_eq!(entry.into_value(), "thread4");
3535 })
3536 };
3537
3538 // Thread5 will call `get` for the same key. It will call when thread1's init
3539 // closure is still running, so it will get none for the key.
3540 let thread5 = {
3541 let cache5 = cache.clone();
3542 spawn(move || {
3543 // Wait for 200 ms before calling `get`.
3544 sleep(Duration::from_millis(200));
3545 let maybe_v = cache5.get(&KEY);
3546 assert!(maybe_v.is_none());
3547 })
3548 };
3549
3550 // Thread6 will call `get` for the same key. It will call when thread1's init
3551 // closure is still running, so it will get none for the key.
3552 let thread6 = {
3553 let cache6 = cache.clone();
3554 spawn(move || {
3555 // Wait for 350 ms before calling `get`.
3556 sleep(Duration::from_millis(350));
3557 let maybe_v = cache6.get(&KEY);
3558 assert_eq!(maybe_v, Some("thread1"));
3559 })
3560 };
3561
3562 // Thread7 will call `get` for the same key. It will call after thread1's init
3563 // closure finished, so it will get the value insert by thread1's init closure.
3564 let thread7 = {
3565 let cache7 = cache.clone();
3566 spawn(move || {
3567 // Wait for 450 ms before calling `get`.
3568 sleep(Duration::from_millis(450));
3569 let maybe_v = cache7.get(&KEY);
3570 assert_eq!(maybe_v, Some("thread4"));
3571 })
3572 };
3573
3574 for t in [
3575 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3576 ] {
3577 t.join().expect("Failed to join");
3578 }
3579
3580 assert!(cache.is_waiter_map_empty());
3581 }
3582
3583 #[test]
3584 fn entry_by_ref_or_insert_with_if() {
3585 use std::thread::{sleep, spawn};
3586
3587 let cache: Cache<u32, &str> = Cache::new(100);
3588 const KEY: &u32 = &0;
3589
3590 // This test will run seven threads:
3591 //
3592 // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3593 // its init closure will be evaluated and then a &str value "thread1" will be
3594 // inserted to the cache.
3595 let thread1 = {
3596 let cache1 = cache.clone();
3597 spawn(move || {
3598 // Call `get_with` immediately.
3599 let v = cache1
3600 .entry_by_ref(KEY)
3601 .or_insert_with_if(
3602 || {
3603 // Wait for 300 ms and return a &str value.
3604 sleep(Duration::from_millis(300));
3605 "thread1"
3606 },
3607 |_v| unreachable!(),
3608 )
3609 .into_value();
3610 assert_eq!(v, "thread1");
3611 })
3612 };
3613
3614 // Thread2 will be the second thread to call `or_insert_with_if` for the same
3615 // key, so its init closure will not be evaluated. Once thread1's init
3616 // closure finishes, it will get the value inserted by thread1's init
3617 // closure.
3618 let thread2 = {
3619 let cache2 = cache.clone();
3620 spawn(move || {
3621 // Wait for 100 ms before calling `get_with`.
3622 sleep(Duration::from_millis(100));
3623 let v = cache2
3624 .entry_by_ref(KEY)
3625 .or_insert_with_if(|| unreachable!(), |_v| unreachable!())
3626 .into_value();
3627 assert_eq!(v, "thread1");
3628 })
3629 };
3630
3631 // Thread3 will be the third thread to call `or_insert_with_if` for the same
3632 // key. By the time it calls, thread1's init closure should have finished
3633 // already and the value should be already inserted to the cache. Also
3634 // thread3's `replace_if` closure returns `false`. So its init closure will
3635 // not be evaluated and will get the value inserted by thread1's init closure
3636 // immediately.
3637 let thread3 = {
3638 let cache3 = cache.clone();
3639 spawn(move || {
3640 // Wait for 350 ms before calling `or_insert_with_if`.
3641 sleep(Duration::from_millis(350));
3642 let v = cache3
3643 .entry_by_ref(KEY)
3644 .or_insert_with_if(
3645 || unreachable!(),
3646 |v| {
3647 assert_eq!(v, &"thread1");
3648 false
3649 },
3650 )
3651 .into_value();
3652 assert_eq!(v, "thread1");
3653 })
3654 };
3655
3656 // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3657 // key. The value should have been already inserted to the cache by
3658 // thread1. However thread4's `replace_if` closure returns `true`. So its
3659 // init closure will be evaluated to replace the current value.
3660 let thread4 = {
3661 let cache4 = cache.clone();
3662 spawn(move || {
3663 // Wait for 400 ms before calling `or_insert_with_if`.
3664 sleep(Duration::from_millis(400));
3665 let v = cache4
3666 .entry_by_ref(KEY)
3667 .or_insert_with_if(
3668 || "thread4",
3669 |v| {
3670 assert_eq!(v, &"thread1");
3671 true
3672 },
3673 )
3674 .into_value();
3675 assert_eq!(v, "thread4");
3676 })
3677 };
3678
3679 // Thread5 will call `get` for the same key. It will call when thread1's init
3680 // closure is still running, so it will get none for the key.
3681 let thread5 = {
3682 let cache5 = cache.clone();
3683 spawn(move || {
3684 // Wait for 200 ms before calling `get`.
3685 sleep(Duration::from_millis(200));
3686 let maybe_v = cache5.get(KEY);
3687 assert!(maybe_v.is_none());
3688 })
3689 };
3690
3691 // Thread6 will call `get` for the same key. It will call when thread1's init
3692 // closure is still running, so it will get none for the key.
3693 let thread6 = {
3694 let cache6 = cache.clone();
3695 spawn(move || {
3696 // Wait for 350 ms before calling `get`.
3697 sleep(Duration::from_millis(350));
3698 let maybe_v = cache6.get(KEY);
3699 assert_eq!(maybe_v, Some("thread1"));
3700 })
3701 };
3702
3703 // Thread7 will call `get` for the same key. It will call after thread1's init
3704 // closure finished, so it will get the value insert by thread1's init closure.
3705 let thread7 = {
3706 let cache7 = cache.clone();
3707 spawn(move || {
3708 // Wait for 450 ms before calling `get`.
3709 sleep(Duration::from_millis(450));
3710 let maybe_v = cache7.get(KEY);
3711 assert_eq!(maybe_v, Some("thread4"));
3712 })
3713 };
3714
3715 for t in [
3716 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3717 ] {
3718 t.join().expect("Failed to join");
3719 }
3720
3721 assert!(cache.is_waiter_map_empty());
3722 }
3723
3724 #[test]
3725 fn try_get_with() {
3726 use std::{
3727 sync::Arc,
3728 thread::{sleep, spawn},
3729 };
3730
3731 // Note that MyError does not implement std::error::Error trait like
3732 // anyhow::Error.
3733 #[derive(Debug)]
3734 pub struct MyError(#[allow(dead_code)] String);
3735
3736 type MyResult<T> = Result<T, Arc<MyError>>;
3737
3738 let cache = Cache::new(100);
3739 const KEY: u32 = 0;
3740
3741 // This test will run eight threads:
3742 //
3743 // Thread1 will be the first thread to call `try_get_with` for a key, so its
3744 // init closure will be evaluated and then an error will be returned. Nothing
3745 // will be inserted to the cache.
3746 let thread1 = {
3747 let cache1 = cache.clone();
3748 spawn(move || {
3749 // Call `try_get_with` immediately.
3750 let v = cache1.try_get_with(KEY, || {
3751 // Wait for 300 ms and return an error.
3752 sleep(Duration::from_millis(300));
3753 Err(MyError("thread1 error".into()))
3754 });
3755 assert!(v.is_err());
3756 })
3757 };
3758
3759 // Thread2 will be the second thread to call `try_get_with` for the same key,
3760 // so its init closure will not be evaluated. Once thread1's init closure
3761 // finishes, it will get the same error value returned by thread1's init
3762 // closure.
3763 let thread2 = {
3764 let cache2 = cache.clone();
3765 spawn(move || {
3766 // Wait for 100 ms before calling `try_get_with`.
3767 sleep(Duration::from_millis(100));
3768 let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
3769 assert!(v.is_err());
3770 })
3771 };
3772
3773 // Thread3 will be the third thread to call `get_with` for the same key. By
3774 // the time it calls, thread1's init closure should have finished already,
3775 // but the key still does not exist in the cache. So its init closure will be
3776 // evaluated and then an okay &str value will be returned. That value will be
3777 // inserted to the cache.
3778 let thread3 = {
3779 let cache3 = cache.clone();
3780 spawn(move || {
3781 // Wait for 400 ms before calling `try_get_with`.
3782 sleep(Duration::from_millis(400));
3783 let v: MyResult<_> = cache3.try_get_with(KEY, || {
3784 // Wait for 300 ms and return an Ok(&str) value.
3785 sleep(Duration::from_millis(300));
3786 Ok("thread3")
3787 });
3788 assert_eq!(v.unwrap(), "thread3");
3789 })
3790 };
3791
3792 // thread4 will be the fourth thread to call `try_get_with` for the same
3793 // key. So its init closure will not be evaluated. Once thread3's init
3794 // closure finishes, it will get the same okay &str value.
3795 let thread4 = {
3796 let cache4 = cache.clone();
3797 spawn(move || {
3798 // Wait for 500 ms before calling `try_get_with`.
3799 sleep(Duration::from_millis(500));
3800 let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
3801 assert_eq!(v.unwrap(), "thread3");
3802 })
3803 };
3804
3805 // Thread5 will be the fifth thread to call `try_get_with` for the same
3806 // key. So its init closure will not be evaluated. By the time it calls,
3807 // thread3's init closure should have finished already, so its init closure
3808 // will not be evaluated and will get the value insert by thread3's init
3809 // closure immediately.
3810 let thread5 = {
3811 let cache5 = cache.clone();
3812 spawn(move || {
3813 // Wait for 800 ms before calling `try_get_with`.
3814 sleep(Duration::from_millis(800));
3815 let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
3816 assert_eq!(v.unwrap(), "thread3");
3817 })
3818 };
3819
3820 // Thread6 will call `get` for the same key. It will call when thread1's init
3821 // closure is still running, so it will get none for the key.
3822 let thread6 = {
3823 let cache6 = cache.clone();
3824 spawn(move || {
3825 // Wait for 200 ms before calling `get`.
3826 sleep(Duration::from_millis(200));
3827 let maybe_v = cache6.get(&KEY);
3828 assert!(maybe_v.is_none());
3829 })
3830 };
3831
3832 // Thread7 will call `get` for the same key. It will call after thread1's init
3833 // closure finished with an error. So it will get none for the key.
3834 let thread7 = {
3835 let cache7 = cache.clone();
3836 spawn(move || {
3837 // Wait for 400 ms before calling `get`.
3838 sleep(Duration::from_millis(400));
3839 let maybe_v = cache7.get(&KEY);
3840 assert!(maybe_v.is_none());
3841 })
3842 };
3843
3844 // Thread8 will call `get` for the same key. It will call after thread3's init
3845 // closure finished, so it will get the value insert by thread3's init closure.
3846 let thread8 = {
3847 let cache8 = cache.clone();
3848 spawn(move || {
3849 // Wait for 800 ms before calling `get`.
3850 sleep(Duration::from_millis(800));
3851 let maybe_v = cache8.get(&KEY);
3852 assert_eq!(maybe_v, Some("thread3"));
3853 })
3854 };
3855
3856 for t in [
3857 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3858 ] {
3859 t.join().expect("Failed to join");
3860 }
3861
3862 assert!(cache.is_waiter_map_empty());
3863 }
3864
3865 #[test]
3866 fn try_get_with_by_ref() {
3867 use std::{
3868 sync::Arc,
3869 thread::{sleep, spawn},
3870 };
3871
3872 // Note that MyError does not implement std::error::Error trait like
3873 // anyhow::Error.
3874 #[derive(Debug)]
3875 pub struct MyError(#[allow(dead_code)] String);
3876
3877 type MyResult<T> = Result<T, Arc<MyError>>;
3878
3879 let cache = Cache::new(100);
3880 const KEY: &u32 = &0;
3881
3882 // This test will run eight threads:
3883 //
3884 // Thread1 will be the first thread to call `try_get_with_by_ref` for a key,
3885 // so its init closure will be evaluated and then an error will be returned.
3886 // Nothing will be inserted to the cache.
3887 let thread1 = {
3888 let cache1 = cache.clone();
3889 spawn(move || {
3890 // Call `try_get_with_by_ref` immediately.
3891 let v = cache1.try_get_with_by_ref(KEY, || {
3892 // Wait for 300 ms and return an error.
3893 sleep(Duration::from_millis(300));
3894 Err(MyError("thread1 error".into()))
3895 });
3896 assert!(v.is_err());
3897 })
3898 };
3899
3900 // Thread2 will be the second thread to call `try_get_with_by_ref` for the
3901 // same key, so its init closure will not be evaluated. Once thread1's init
3902 // closure finishes, it will get the same error value returned by thread1's
3903 // init closure.
3904 let thread2 = {
3905 let cache2 = cache.clone();
3906 spawn(move || {
3907 // Wait for 100 ms before calling `try_get_with_by_ref`.
3908 sleep(Duration::from_millis(100));
3909 let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
3910 assert!(v.is_err());
3911 })
3912 };
3913
3914 // Thread3 will be the third thread to call `get_with` for the same key. By
3915 // the time it calls, thread1's init closure should have finished already,
3916 // but the key still does not exist in the cache. So its init closure will be
3917 // evaluated and then an okay &str value will be returned. That value will be
3918 // inserted to the cache.
3919 let thread3 = {
3920 let cache3 = cache.clone();
3921 spawn(move || {
3922 // Wait for 400 ms before calling `try_get_with_by_ref`.
3923 sleep(Duration::from_millis(400));
3924 let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
3925 // Wait for 300 ms and return an Ok(&str) value.
3926 sleep(Duration::from_millis(300));
3927 Ok("thread3")
3928 });
3929 assert_eq!(v.unwrap(), "thread3");
3930 })
3931 };
3932
3933 // thread4 will be the fourth thread to call `try_get_with_by_ref` for the
3934 // same key. So its init closure will not be evaluated. Once thread3's init
3935 // closure finishes, it will get the same okay &str value.
3936 let thread4 = {
3937 let cache4 = cache.clone();
3938 spawn(move || {
3939 // Wait for 500 ms before calling `try_get_with_by_ref`.
3940 sleep(Duration::from_millis(500));
3941 let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
3942 assert_eq!(v.unwrap(), "thread3");
3943 })
3944 };
3945
3946 // Thread5 will be the fifth thread to call `try_get_with_by_ref` for the
3947 // same key. So its init closure will not be evaluated. By the time it calls,
3948 // thread3's init closure should have finished already, so its init closure
3949 // will not be evaluated and will get the value insert by thread3's init
3950 // closure immediately.
3951 let thread5 = {
3952 let cache5 = cache.clone();
3953 spawn(move || {
3954 // Wait for 800 ms before calling `try_get_with_by_ref`.
3955 sleep(Duration::from_millis(800));
3956 let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
3957 assert_eq!(v.unwrap(), "thread3");
3958 })
3959 };
3960
3961 // Thread6 will call `get` for the same key. It will call when thread1's init
3962 // closure is still running, so it will get none for the key.
3963 let thread6 = {
3964 let cache6 = cache.clone();
3965 spawn(move || {
3966 // Wait for 200 ms before calling `get`.
3967 sleep(Duration::from_millis(200));
3968 let maybe_v = cache6.get(KEY);
3969 assert!(maybe_v.is_none());
3970 })
3971 };
3972
3973 // Thread7 will call `get` for the same key. It will call after thread1's init
3974 // closure finished with an error. So it will get none for the key.
3975 let thread7 = {
3976 let cache7 = cache.clone();
3977 spawn(move || {
3978 // Wait for 400 ms before calling `get`.
3979 sleep(Duration::from_millis(400));
3980 let maybe_v = cache7.get(KEY);
3981 assert!(maybe_v.is_none());
3982 })
3983 };
3984
3985 // Thread8 will call `get` for the same key. It will call after thread3's init
3986 // closure finished, so it will get the value insert by thread3's init closure.
3987 let thread8 = {
3988 let cache8 = cache.clone();
3989 spawn(move || {
3990 // Wait for 800 ms before calling `get`.
3991 sleep(Duration::from_millis(800));
3992 let maybe_v = cache8.get(KEY);
3993 assert_eq!(maybe_v, Some("thread3"));
3994 })
3995 };
3996
3997 for t in [
3998 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3999 ] {
4000 t.join().expect("Failed to join");
4001 }
4002
4003 assert!(cache.is_waiter_map_empty());
4004 }
4005
4006 #[test]
4007 fn optionally_get_with() {
4008 use std::thread::{sleep, spawn};
4009
4010 let cache = Cache::new(100);
4011 const KEY: u32 = 0;
4012
4013 // This test will run eight threads:
4014 //
4015 // Thread1 will be the first thread to call `optionally_get_with` for a key,
4016 // so its init closure will be evaluated and then an error will be returned.
4017 // Nothing will be inserted to the cache.
4018 let thread1 = {
4019 let cache1 = cache.clone();
4020 spawn(move || {
4021 // Call `optionally_get_with` immediately.
4022 let v = cache1.optionally_get_with(KEY, || {
4023 // Wait for 300 ms and return an error.
4024 sleep(Duration::from_millis(300));
4025 None
4026 });
4027 assert!(v.is_none());
4028 })
4029 };
4030
4031 // Thread2 will be the second thread to call `optionally_get_with` for the
4032 // same key, so its init closure will not be evaluated. Once thread1's init
4033 // closure finishes, it will get the same error value returned by thread1's
4034 // init closure.
4035 let thread2 = {
4036 let cache2 = cache.clone();
4037 spawn(move || {
4038 // Wait for 100 ms before calling `optionally_get_with`.
4039 sleep(Duration::from_millis(100));
4040 let v = cache2.optionally_get_with(KEY, || unreachable!());
4041 assert!(v.is_none());
4042 })
4043 };
4044
4045 // Thread3 will be the third thread to call `get_with` for the same key. By
4046 // the time it calls, thread1's init closure should have finished already,
4047 // but the key still does not exist in the cache. So its init closure will be
4048 // evaluated and then an okay &str value will be returned. That value will be
4049 // inserted to the cache.
4050 let thread3 = {
4051 let cache3 = cache.clone();
4052 spawn(move || {
4053 // Wait for 400 ms before calling `optionally_get_with`.
4054 sleep(Duration::from_millis(400));
4055 let v = cache3.optionally_get_with(KEY, || {
4056 // Wait for 300 ms and return an Ok(&str) value.
4057 sleep(Duration::from_millis(300));
4058 Some("thread3")
4059 });
4060 assert_eq!(v.unwrap(), "thread3");
4061 })
4062 };
4063
4064 // thread4 will be the fourth thread to call `optionally_get_with` for the
4065 // same key. So its init closure will not be evaluated. Once thread3's init
4066 // closure finishes, it will get the same okay &str value.
4067 let thread4 = {
4068 let cache4 = cache.clone();
4069 spawn(move || {
4070 // Wait for 500 ms before calling `optionally_get_with`.
4071 sleep(Duration::from_millis(500));
4072 let v = cache4.optionally_get_with(KEY, || unreachable!());
4073 assert_eq!(v.unwrap(), "thread3");
4074 })
4075 };
4076
4077 // Thread5 will be the fifth thread to call `optionally_get_with` for the
4078 // same key. So its init closure will not be evaluated. By the time it calls,
4079 // thread3's init closure should have finished already, so its init closure
4080 // will not be evaluated and will get the value insert by thread3's init
4081 // closure immediately.
4082 let thread5 = {
4083 let cache5 = cache.clone();
4084 spawn(move || {
4085 // Wait for 800 ms before calling `optionally_get_with`.
4086 sleep(Duration::from_millis(800));
4087 let v = cache5.optionally_get_with(KEY, || unreachable!());
4088 assert_eq!(v.unwrap(), "thread3");
4089 })
4090 };
4091
4092 // Thread6 will call `get` for the same key. It will call when thread1's init
4093 // closure is still running, so it will get none for the key.
4094 let thread6 = {
4095 let cache6 = cache.clone();
4096 spawn(move || {
4097 // Wait for 200 ms before calling `get`.
4098 sleep(Duration::from_millis(200));
4099 let maybe_v = cache6.get(&KEY);
4100 assert!(maybe_v.is_none());
4101 })
4102 };
4103
4104 // Thread7 will call `get` for the same key. It will call after thread1's init
4105 // closure finished with an error. So it will get none for the key.
4106 let thread7 = {
4107 let cache7 = cache.clone();
4108 spawn(move || {
4109 // Wait for 400 ms before calling `get`.
4110 sleep(Duration::from_millis(400));
4111 let maybe_v = cache7.get(&KEY);
4112 assert!(maybe_v.is_none());
4113 })
4114 };
4115
4116 // Thread8 will call `get` for the same key. It will call after thread3's init
4117 // closure finished, so it will get the value insert by thread3's init closure.
4118 let thread8 = {
4119 let cache8 = cache.clone();
4120 spawn(move || {
4121 // Wait for 800 ms before calling `get`.
4122 sleep(Duration::from_millis(800));
4123 let maybe_v = cache8.get(&KEY);
4124 assert_eq!(maybe_v, Some("thread3"));
4125 })
4126 };
4127
4128 for t in [
4129 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4130 ] {
4131 t.join().expect("Failed to join");
4132 }
4133
4134 assert!(cache.is_waiter_map_empty());
4135 }
4136
4137 #[test]
4138 fn optionally_get_with_by_ref() {
4139 use std::thread::{sleep, spawn};
4140
4141 let cache = Cache::new(100);
4142 const KEY: &u32 = &0;
4143
4144 // This test will run eight threads:
4145 //
4146 // Thread1 will be the first thread to call `optionally_get_with_by_ref` for
4147 // a key, so its init closure will be evaluated and then an error will be
4148 // returned. Nothing will be inserted to the cache.
4149 let thread1 = {
4150 let cache1 = cache.clone();
4151 spawn(move || {
4152 // Call `optionally_get_with_by_ref` immediately.
4153 let v = cache1.optionally_get_with_by_ref(KEY, || {
4154 // Wait for 300 ms and return an error.
4155 sleep(Duration::from_millis(300));
4156 None
4157 });
4158 assert!(v.is_none());
4159 })
4160 };
4161
4162 // Thread2 will be the second thread to call `optionally_get_with_by_ref` for
4163 // the same key, so its init closure will not be evaluated. Once thread1's
4164 // init closure finishes, it will get the same error value returned by
4165 // thread1's init closure.
4166 let thread2 = {
4167 let cache2 = cache.clone();
4168 spawn(move || {
4169 // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4170 sleep(Duration::from_millis(100));
4171 let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
4172 assert!(v.is_none());
4173 })
4174 };
4175
4176 // Thread3 will be the third thread to call `get_with` for the same key. By
4177 // the time it calls, thread1's init closure should have finished already,
4178 // but the key still does not exist in the cache. So its init closure will be
4179 // evaluated and then an okay &str value will be returned. That value will be
4180 // inserted to the cache.
4181 let thread3 = {
4182 let cache3 = cache.clone();
4183 spawn(move || {
4184 // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4185 sleep(Duration::from_millis(400));
4186 let v = cache3.optionally_get_with_by_ref(KEY, || {
4187 // Wait for 300 ms and return an Ok(&str) value.
4188 sleep(Duration::from_millis(300));
4189 Some("thread3")
4190 });
4191 assert_eq!(v.unwrap(), "thread3");
4192 })
4193 };
4194
4195 // thread4 will be the fourth thread to call `optionally_get_with_by_ref` for
4196 // the same key. So its init closure will not be evaluated. Once thread3's
4197 // init closure finishes, it will get the same okay &str value.
4198 let thread4 = {
4199 let cache4 = cache.clone();
4200 spawn(move || {
4201 // Wait for 500 ms before calling `optionally_get_with_by_ref`.
4202 sleep(Duration::from_millis(500));
4203 let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
4204 assert_eq!(v.unwrap(), "thread3");
4205 })
4206 };
4207
4208 // Thread5 will be the fifth thread to call `optionally_get_with_by_ref` for
4209 // the same key. So its init closure will not be evaluated. By the time it
4210 // calls, thread3's init closure should have finished already, so its init
4211 // closure will not be evaluated and will get the value insert by thread3's
4212 // init closure immediately.
4213 let thread5 = {
4214 let cache5 = cache.clone();
4215 spawn(move || {
4216 // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4217 sleep(Duration::from_millis(800));
4218 let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
4219 assert_eq!(v.unwrap(), "thread3");
4220 })
4221 };
4222
4223 // Thread6 will call `get` for the same key. It will call when thread1's init
4224 // closure is still running, so it will get none for the key.
4225 let thread6 = {
4226 let cache6 = cache.clone();
4227 spawn(move || {
4228 // Wait for 200 ms before calling `get`.
4229 sleep(Duration::from_millis(200));
4230 let maybe_v = cache6.get(KEY);
4231 assert!(maybe_v.is_none());
4232 })
4233 };
4234
4235 // Thread7 will call `get` for the same key. It will call after thread1's init
4236 // closure finished with an error. So it will get none for the key.
4237 let thread7 = {
4238 let cache7 = cache.clone();
4239 spawn(move || {
4240 // Wait for 400 ms before calling `get`.
4241 sleep(Duration::from_millis(400));
4242 let maybe_v = cache7.get(KEY);
4243 assert!(maybe_v.is_none());
4244 })
4245 };
4246
4247 // Thread8 will call `get` for the same key. It will call after thread3's init
4248 // closure finished, so it will get the value insert by thread3's init closure.
4249 let thread8 = {
4250 let cache8 = cache.clone();
4251 spawn(move || {
4252 // Wait for 800 ms before calling `get`.
4253 sleep(Duration::from_millis(800));
4254 let maybe_v = cache8.get(KEY);
4255 assert_eq!(maybe_v, Some("thread3"));
4256 })
4257 };
4258
4259 for t in [
4260 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4261 ] {
4262 t.join().expect("Failed to join");
4263 }
4264
4265 assert!(cache.is_waiter_map_empty());
4266 }
4267
4268 #[test]
4269 fn upsert_with() {
4270 use std::thread::{sleep, spawn};
4271
4272 let cache = Cache::new(100);
4273 const KEY: u32 = 0;
4274
4275 // Spawn three threads to call `and_upsert_with` for the same key and each
4276 // task increments the current value by 1. Ensure the key-level lock is
4277 // working by verifying the value is 3 after all threads finish.
4278 //
4279 // | | thread 1 | thread 2 | thread 3 |
4280 // |--------|----------|----------|----------|
4281 // | 0 ms | get none | | |
4282 // | 100 ms | | blocked | |
4283 // | 200 ms | insert 1 | | |
4284 // | | | get 1 | |
4285 // | 300 ms | | | blocked |
4286 // | 400 ms | | insert 2 | |
4287 // | | | | get 2 |
4288 // | 500 ms | | | insert 3 |
4289
4290 let thread1 = {
4291 let cache1 = cache.clone();
4292 spawn(move || {
4293 cache1.entry(KEY).and_upsert_with(|maybe_entry| {
4294 sleep(Duration::from_millis(200));
4295 assert!(maybe_entry.is_none());
4296 1
4297 })
4298 })
4299 };
4300
4301 let thread2 = {
4302 let cache2 = cache.clone();
4303 spawn(move || {
4304 sleep(Duration::from_millis(100));
4305 cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4306 sleep(Duration::from_millis(200));
4307 let entry = maybe_entry.expect("The entry should exist");
4308 entry.into_value() + 1
4309 })
4310 })
4311 };
4312
4313 let thread3 = {
4314 let cache3 = cache.clone();
4315 spawn(move || {
4316 sleep(Duration::from_millis(300));
4317 cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4318 sleep(Duration::from_millis(100));
4319 let entry = maybe_entry.expect("The entry should exist");
4320 entry.into_value() + 1
4321 })
4322 })
4323 };
4324
4325 let ent1 = thread1.join().expect("Thread 1 should finish");
4326 let ent2 = thread2.join().expect("Thread 2 should finish");
4327 let ent3 = thread3.join().expect("Thread 3 should finish");
4328 assert_eq!(ent1.into_value(), 1);
4329 assert_eq!(ent2.into_value(), 2);
4330 assert_eq!(ent3.into_value(), 3);
4331
4332 assert_eq!(cache.get(&KEY), Some(3));
4333
4334 assert!(cache.is_waiter_map_empty());
4335 }
4336
4337 #[test]
4338 fn compute_with() {
4339 use crate::ops::compute;
4340 use std::{
4341 sync::RwLock,
4342 thread::{sleep, spawn},
4343 };
4344
4345 let cache = Cache::new(100);
4346 const KEY: u32 = 0;
4347
4348 // Spawn six threads to call `and_compute_with` for the same key. Ensure the
4349 // key-level lock is working by verifying the value after all threads finish.
4350 //
4351 // | | thread 1 | thread 2 | thread 3 | thread 4 | thread 5 | thread 6 |
4352 // |---------|------------|---------------|------------|----------|------------|----------|
4353 // | 0 ms | get none | | | | | |
4354 // | 100 ms | | blocked | | | | |
4355 // | 200 ms | insert [1] | | | | | |
4356 // | | | get [1] | | | | |
4357 // | 300 ms | | | blocked | | | |
4358 // | 400 ms | | insert [1, 2] | | | | |
4359 // | | | | get [1, 2] | | | |
4360 // | 500 ms | | | | blocked | | |
4361 // | 600 ms | | | remove | | | |
4362 // | | | | | get none | | |
4363 // | 700 ms | | | | | blocked | |
4364 // | 800 ms | | | | nop | | |
4365 // | | | | | | get none | |
4366 // | 900 ms | | | | | | blocked |
4367 // | 1000 ms | | | | | insert [5] | |
4368 // | | | | | | | get [5] |
4369 // | 1100 ms | | | | | | nop |
4370
4371 let thread1 = {
4372 let cache1 = cache.clone();
4373 spawn(move || {
4374 cache1.entry(KEY).and_compute_with(|maybe_entry| {
4375 sleep(Duration::from_millis(200));
4376 assert!(maybe_entry.is_none());
4377 compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4378 })
4379 })
4380 };
4381
4382 let thread2 = {
4383 let cache2 = cache.clone();
4384 spawn(move || {
4385 sleep(Duration::from_millis(100));
4386 cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4387 let entry = maybe_entry.expect("The entry should exist");
4388 let value = entry.into_value();
4389 assert_eq!(*value.read().unwrap(), vec![1]);
4390 sleep(Duration::from_millis(200));
4391 value.write().unwrap().push(2);
4392 compute::Op::Put(value)
4393 })
4394 })
4395 };
4396
4397 let thread3 = {
4398 let cache3 = cache.clone();
4399 spawn(move || {
4400 sleep(Duration::from_millis(300));
4401 cache3.entry(KEY).and_compute_with(|maybe_entry| {
4402 let entry = maybe_entry.expect("The entry should exist");
4403 let value = entry.into_value();
4404 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4405 sleep(Duration::from_millis(200));
4406 compute::Op::Remove
4407 })
4408 })
4409 };
4410
4411 let thread4 = {
4412 let cache4 = cache.clone();
4413 spawn(move || {
4414 sleep(Duration::from_millis(500));
4415 cache4.entry(KEY).and_compute_with(|maybe_entry| {
4416 assert!(maybe_entry.is_none());
4417 sleep(Duration::from_millis(200));
4418 compute::Op::Nop
4419 })
4420 })
4421 };
4422
4423 let thread5 = {
4424 let cache5 = cache.clone();
4425 spawn(move || {
4426 sleep(Duration::from_millis(700));
4427 cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4428 assert!(maybe_entry.is_none());
4429 sleep(Duration::from_millis(200));
4430 compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4431 })
4432 })
4433 };
4434
4435 let thread6 = {
4436 let cache6 = cache.clone();
4437 spawn(move || {
4438 sleep(Duration::from_millis(900));
4439 cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4440 let entry = maybe_entry.expect("The entry should exist");
4441 let value = entry.into_value();
4442 assert_eq!(*value.read().unwrap(), vec![5]);
4443 sleep(Duration::from_millis(100));
4444 compute::Op::Nop
4445 })
4446 })
4447 };
4448
4449 let res1 = thread1.join().expect("Thread 1 should finish");
4450 let res2 = thread2.join().expect("Thread 2 should finish");
4451 let res3 = thread3.join().expect("Thread 3 should finish");
4452 let res4 = thread4.join().expect("Thread 4 should finish");
4453 let res5 = thread5.join().expect("Thread 5 should finish");
4454 let res6 = thread6.join().expect("Thread 6 should finish");
4455
4456 let compute::CompResult::Inserted(entry) = res1 else {
4457 panic!("Expected `Inserted`. Got {res1:?}")
4458 };
4459 assert_eq!(
4460 *entry.into_value().read().unwrap(),
4461 vec![1, 2] // The same Vec was modified by task2.
4462 );
4463
4464 let compute::CompResult::ReplacedWith(entry) = res2 else {
4465 panic!("Expected `ReplacedWith`. Got {res2:?}")
4466 };
4467 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4468
4469 let compute::CompResult::Removed(entry) = res3 else {
4470 panic!("Expected `Removed`. Got {res3:?}")
4471 };
4472 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4473
4474 let compute::CompResult::StillNone(key) = res4 else {
4475 panic!("Expected `StillNone`. Got {res4:?}")
4476 };
4477 assert_eq!(*key, KEY);
4478
4479 let compute::CompResult::Inserted(entry) = res5 else {
4480 panic!("Expected `Inserted`. Got {res5:?}")
4481 };
4482 assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4483
4484 let compute::CompResult::Unchanged(entry) = res6 else {
4485 panic!("Expected `Unchanged`. Got {res6:?}")
4486 };
4487 assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4488
4489 assert!(cache.is_waiter_map_empty());
4490 }
4491
4492 #[test]
4493 fn try_compute_with() {
4494 use crate::ops::compute;
4495 use std::{
4496 sync::RwLock,
4497 thread::{sleep, spawn},
4498 };
4499
4500 let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4501 const KEY: u32 = 0;
4502
4503 // Spawn four threads to call `and_try_compute_with` for the same key. Ensure
4504 // the key-level lock is working by verifying the value after all threads
4505 // finish.
4506 //
4507 // | | thread 1 | thread 2 | thread 3 | thread 4 |
4508 // |---------|------------|---------------|------------|------------|
4509 // | 0 ms | get none | | | |
4510 // | 100 ms | | blocked | | |
4511 // | 200 ms | insert [1] | | | |
4512 // | | | get [1] | | |
4513 // | 300 ms | | | blocked | |
4514 // | 400 ms | | insert [1, 2] | | |
4515 // | | | | get [1, 2] | |
4516 // | 500 ms | | | | blocked |
4517 // | 600 ms | | | err | |
4518 // | | | | | get [1, 2] |
4519 // | 700 ms | | | | remove |
4520 //
4521 // This test is shorter than `compute_with` test because this one omits `Nop`
4522 // cases.
4523
4524 let thread1 = {
4525 let cache1 = cache.clone();
4526 spawn(move || {
4527 cache1.entry(KEY).and_try_compute_with(|maybe_entry| {
4528 sleep(Duration::from_millis(200));
4529 assert!(maybe_entry.is_none());
4530 Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4531 })
4532 })
4533 };
4534
4535 let thread2 = {
4536 let cache2 = cache.clone();
4537 spawn(move || {
4538 sleep(Duration::from_millis(100));
4539 cache2
4540 .entry_by_ref(&KEY)
4541 .and_try_compute_with(|maybe_entry| {
4542 let entry = maybe_entry.expect("The entry should exist");
4543 let value = entry.into_value();
4544 assert_eq!(*value.read().unwrap(), vec![1]);
4545 sleep(Duration::from_millis(200));
4546 value.write().unwrap().push(2);
4547 Ok(compute::Op::Put(value)) as Result<_, ()>
4548 })
4549 })
4550 };
4551
4552 let thread3 = {
4553 let cache3 = cache.clone();
4554 spawn(move || {
4555 sleep(Duration::from_millis(300));
4556 cache3.entry(KEY).and_try_compute_with(|maybe_entry| {
4557 let entry = maybe_entry.expect("The entry should exist");
4558 let value = entry.into_value();
4559 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4560 sleep(Duration::from_millis(200));
4561 Err(())
4562 })
4563 })
4564 };
4565
4566 let thread4 = {
4567 let cache4 = cache.clone();
4568 spawn(move || {
4569 sleep(Duration::from_millis(500));
4570 cache4.entry(KEY).and_try_compute_with(|maybe_entry| {
4571 let entry = maybe_entry.expect("The entry should exist");
4572 let value = entry.into_value();
4573 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4574 sleep(Duration::from_millis(100));
4575 Ok(compute::Op::Remove) as Result<_, ()>
4576 })
4577 })
4578 };
4579
4580 let res1 = thread1.join().expect("Thread 1 should finish");
4581 let res2 = thread2.join().expect("Thread 2 should finish");
4582 let res3 = thread3.join().expect("Thread 3 should finish");
4583 let res4 = thread4.join().expect("Thread 4 should finish");
4584
4585 let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4586 panic!("Expected `Inserted`. Got {res1:?}")
4587 };
4588 assert_eq!(
4589 *entry.into_value().read().unwrap(),
4590 vec![1, 2] // The same Vec was modified by task2.
4591 );
4592
4593 let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4594 panic!("Expected `ReplacedWith`. Got {res2:?}")
4595 };
4596 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4597
4598 assert!(res3.is_err());
4599
4600 let Ok(compute::CompResult::Removed(entry)) = res4 else {
4601 panic!("Expected `Removed`. Got {res4:?}")
4602 };
4603 assert_eq!(
4604 *entry.into_value().read().unwrap(),
4605 vec![1, 2] // Removed value.
4606 );
4607
4608 assert!(cache.is_waiter_map_empty());
4609 }
4610
4611 #[test]
4612 // https://github.com/moka-rs/moka/issues/43
4613 fn handle_panic_in_get_with() {
4614 use std::{sync::Barrier, thread};
4615
4616 let cache = Cache::new(16);
4617 let barrier = Arc::new(Barrier::new(2));
4618 {
4619 let cache_ref = cache.clone();
4620 let barrier_ref = barrier.clone();
4621 thread::spawn(move || {
4622 let _ = cache_ref.get_with(1, || {
4623 barrier_ref.wait();
4624 thread::sleep(Duration::from_millis(50));
4625 panic!("Panic during get_with");
4626 });
4627 });
4628 }
4629
4630 barrier.wait();
4631 assert_eq!(cache.get_with(1, || 5), 5);
4632
4633 assert!(cache.is_waiter_map_empty());
4634 }
4635
4636 #[test]
4637 // https://github.com/moka-rs/moka/issues/43
4638 fn handle_panic_in_try_get_with() {
4639 use std::{sync::Barrier, thread};
4640
4641 let cache = Cache::new(16);
4642 let barrier = Arc::new(Barrier::new(2));
4643 {
4644 let cache_ref = cache.clone();
4645 let barrier_ref = barrier.clone();
4646 thread::spawn(move || {
4647 let _ = cache_ref.try_get_with(1, || {
4648 barrier_ref.wait();
4649 thread::sleep(Duration::from_millis(50));
4650 panic!("Panic during try_get_with");
4651 }) as Result<_, Arc<Infallible>>;
4652 });
4653 }
4654
4655 barrier.wait();
4656 assert_eq!(
4657 cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
4658 Ok(5)
4659 );
4660
4661 assert!(cache.is_waiter_map_empty());
4662 }
4663
4664 #[test]
4665 fn test_removal_notifications() {
4666 // The following `Vec`s will hold actual and expected notifications.
4667 let actual = Arc::new(Mutex::new(Vec::new()));
4668 let mut expected = Vec::new();
4669
4670 // Create an eviction listener.
4671 let a1 = Arc::clone(&actual);
4672 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4673
4674 // Create a cache with the eviction listener.
4675 let mut cache = Cache::builder()
4676 .max_capacity(3)
4677 .eviction_listener(listener)
4678 .build();
4679 cache.reconfigure_for_testing();
4680
4681 // Make the cache exterior immutable.
4682 let cache = cache;
4683
4684 cache.insert('a', "alice");
4685 cache.invalidate(&'a');
4686 expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4687
4688 cache.run_pending_tasks();
4689 assert_eq!(cache.entry_count(), 0);
4690
4691 cache.insert('b', "bob");
4692 cache.insert('c', "cathy");
4693 cache.insert('d', "david");
4694 cache.run_pending_tasks();
4695 assert_eq!(cache.entry_count(), 3);
4696
4697 // This will be rejected due to the size constraint.
4698 cache.insert('e', "emily");
4699 expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4700 cache.run_pending_tasks();
4701 assert_eq!(cache.entry_count(), 3);
4702
4703 // Raise the popularity of 'e' so it will be accepted next time.
4704 cache.get(&'e');
4705 cache.run_pending_tasks();
4706
4707 // Retry.
4708 cache.insert('e', "eliza");
4709 // and the LRU entry will be evicted.
4710 expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4711 cache.run_pending_tasks();
4712 assert_eq!(cache.entry_count(), 3);
4713
4714 // Replace an existing entry.
4715 cache.insert('d', "dennis");
4716 expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4717 cache.run_pending_tasks();
4718 assert_eq!(cache.entry_count(), 3);
4719
4720 verify_notification_vec(&cache, actual, &expected);
4721 }
4722
4723 #[test]
4724 fn test_immediate_removal_notifications_with_updates() {
4725 // The following `Vec` will hold actual notifications.
4726 let actual = Arc::new(Mutex::new(Vec::new()));
4727
4728 // Create an eviction listener.
4729 let a1 = Arc::clone(&actual);
4730 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4731
4732 let (clock, mock) = Clock::mock();
4733
4734 // Create a cache with the eviction listener and also TTL and TTI.
4735 let mut cache = Cache::builder()
4736 .eviction_listener(listener)
4737 .time_to_live(Duration::from_secs(7))
4738 .time_to_idle(Duration::from_secs(5))
4739 .clock(clock)
4740 .build();
4741 cache.reconfigure_for_testing();
4742
4743 // Make the cache exterior immutable.
4744 let cache = cache;
4745
4746 cache.insert("alice", "a0");
4747 cache.run_pending_tasks();
4748
4749 // Now alice (a0) has been expired by the idle timeout (TTI).
4750 mock.increment(Duration::from_secs(6));
4751 assert_eq!(cache.get(&"alice"), None);
4752
4753 // We have not ran sync after the expiration of alice (a0), so it is
4754 // still in the cache.
4755 assert_eq!(cache.entry_count(), 1);
4756
4757 // Re-insert alice with a different value. Since alice (a0) is still
4758 // in the cache, this is actually a replace operation rather than an
4759 // insert operation. We want to verify that the RemovalCause of a0 is
4760 // Expired, not Replaced.
4761 cache.insert("alice", "a1");
4762 {
4763 let mut a = actual.lock();
4764 assert_eq!(a.len(), 1);
4765 assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
4766 a.clear();
4767 }
4768
4769 cache.run_pending_tasks();
4770
4771 mock.increment(Duration::from_secs(4));
4772 assert_eq!(cache.get(&"alice"), Some("a1"));
4773 cache.run_pending_tasks();
4774
4775 // Now alice has been expired by time-to-live (TTL).
4776 mock.increment(Duration::from_secs(4));
4777 assert_eq!(cache.get(&"alice"), None);
4778
4779 // But, again, it is still in the cache.
4780 assert_eq!(cache.entry_count(), 1);
4781
4782 // Re-insert alice with a different value and verify that the
4783 // RemovalCause of a1 is Expired (not Replaced).
4784 cache.insert("alice", "a2");
4785 {
4786 let mut a = actual.lock();
4787 assert_eq!(a.len(), 1);
4788 assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
4789 a.clear();
4790 }
4791
4792 cache.run_pending_tasks();
4793
4794 assert_eq!(cache.entry_count(), 1);
4795
4796 // Now alice (a2) has been expired by the idle timeout.
4797 mock.increment(Duration::from_secs(6));
4798 assert_eq!(cache.get(&"alice"), None);
4799 assert_eq!(cache.entry_count(), 1);
4800
4801 // This invalidate will internally remove alice (a2).
4802 cache.invalidate(&"alice");
4803 cache.run_pending_tasks();
4804 assert_eq!(cache.entry_count(), 0);
4805
4806 {
4807 let mut a = actual.lock();
4808 assert_eq!(a.len(), 1);
4809 assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
4810 a.clear();
4811 }
4812
4813 // Re-insert, and this time, make it expired by the TTL.
4814 cache.insert("alice", "a3");
4815 cache.run_pending_tasks();
4816 mock.increment(Duration::from_secs(4));
4817 assert_eq!(cache.get(&"alice"), Some("a3"));
4818 cache.run_pending_tasks();
4819 mock.increment(Duration::from_secs(4));
4820 assert_eq!(cache.get(&"alice"), None);
4821 assert_eq!(cache.entry_count(), 1);
4822
4823 // This invalidate will internally remove alice (a2).
4824 cache.invalidate(&"alice");
4825 cache.run_pending_tasks();
4826
4827 assert_eq!(cache.entry_count(), 0);
4828
4829 {
4830 let mut a = actual.lock();
4831 assert_eq!(a.len(), 1);
4832 assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
4833 a.clear();
4834 }
4835
4836 assert!(cache.key_locks_map_is_empty());
4837 }
4838
4839 // This test ensures the key-level lock for the immediate notification
4840 // delivery mode is working so that the notifications for a given key
4841 // should always be ordered. This is true even if multiple client threads
4842 // try to modify the entries for the key at the same time. (This test will
4843 // run three client threads)
4844 //
4845 // This test is ignored by default. It becomes unstable when run in parallel
4846 // with other tests.
4847 #[test]
4848 #[cfg_attr(not(run_flaky_tests), ignore)]
4849 fn test_key_lock_used_by_immediate_removal_notifications() {
4850 use std::thread::{sleep, spawn};
4851
4852 const KEY: &str = "alice";
4853
4854 type Val = &'static str;
4855
4856 #[derive(PartialEq, Eq, Debug)]
4857 enum Event {
4858 Insert(Val),
4859 Invalidate(Val),
4860 BeginNotify(Val, RemovalCause),
4861 EndNotify(Val, RemovalCause),
4862 }
4863
4864 // The following `Vec will hold actual notifications.
4865 let actual = Arc::new(Mutex::new(Vec::new()));
4866
4867 // Create an eviction listener.
4868 // Note that this listener is slow and will take 300 ms to complete.
4869 let a0 = Arc::clone(&actual);
4870 let listener = move |_k, v, cause| {
4871 a0.lock().push(Event::BeginNotify(v, cause));
4872 sleep(Duration::from_millis(300));
4873 a0.lock().push(Event::EndNotify(v, cause));
4874 };
4875
4876 // Create a cache with the eviction listener and also TTL 500 ms.
4877 let mut cache = Cache::builder()
4878 .eviction_listener(listener)
4879 .time_to_live(Duration::from_millis(500))
4880 .build();
4881 cache.reconfigure_for_testing();
4882
4883 // Make the cache exterior immutable.
4884 let cache = cache;
4885
4886 // - Notifications for the same key must not overlap.
4887
4888 // Time Event
4889 // ----- -------------------------------------
4890 // 0000: Insert value a0
4891 // 0500: a0 expired
4892 // 0600: Insert value a1 -> expired a0 (N-A0)
4893 // 0800: Insert value a2 (waiting) (A-A2)
4894 // 0900: N-A0 processed
4895 // A-A2 finished waiting -> replace a1 (N-A1)
4896 // 1100: Invalidate (waiting) (R-A2)
4897 // 1200: N-A1 processed
4898 // R-A2 finished waiting -> explicit a2 (N-A2)
4899 // 1500: N-A2 processed
4900
4901 let expected = vec![
4902 Event::Insert("a0"),
4903 Event::Insert("a1"),
4904 Event::BeginNotify("a0", RemovalCause::Expired),
4905 Event::Insert("a2"),
4906 Event::EndNotify("a0", RemovalCause::Expired),
4907 Event::BeginNotify("a1", RemovalCause::Replaced),
4908 Event::Invalidate("a2"),
4909 Event::EndNotify("a1", RemovalCause::Replaced),
4910 Event::BeginNotify("a2", RemovalCause::Explicit),
4911 Event::EndNotify("a2", RemovalCause::Explicit),
4912 ];
4913
4914 // 0000: Insert value a0
4915 actual.lock().push(Event::Insert("a0"));
4916 cache.insert(KEY, "a0");
4917 // Call `sync` to set the last modified for the KEY immediately so that
4918 // this entry should expire in 1000 ms from now.
4919 cache.run_pending_tasks();
4920
4921 // 0500: Insert value a1 -> expired a0 (N-A0)
4922 let thread1 = {
4923 let a1 = Arc::clone(&actual);
4924 let c1 = cache.clone();
4925 spawn(move || {
4926 sleep(Duration::from_millis(600));
4927 a1.lock().push(Event::Insert("a1"));
4928 c1.insert(KEY, "a1");
4929 })
4930 };
4931
4932 // 0800: Insert value a2 (waiting) (A-A2)
4933 let thread2 = {
4934 let a2 = Arc::clone(&actual);
4935 let c2 = cache.clone();
4936 spawn(move || {
4937 sleep(Duration::from_millis(800));
4938 a2.lock().push(Event::Insert("a2"));
4939 c2.insert(KEY, "a2");
4940 })
4941 };
4942
4943 // 1100: Invalidate (waiting) (R-A2)
4944 let thread3 = {
4945 let a3 = Arc::clone(&actual);
4946 let c3 = cache.clone();
4947 spawn(move || {
4948 sleep(Duration::from_millis(1100));
4949 a3.lock().push(Event::Invalidate("a2"));
4950 c3.invalidate(&KEY);
4951 })
4952 };
4953
4954 for t in [thread1, thread2, thread3] {
4955 t.join().expect("Failed to join");
4956 }
4957
4958 let actual = actual.lock();
4959 assert_eq!(actual.len(), expected.len());
4960
4961 for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
4962 assert_eq!(actual, expected, "expected[{i}]");
4963 }
4964
4965 assert!(cache.key_locks_map_is_empty());
4966 }
4967
4968 // When the eviction listener is not set, calling `run_pending_tasks` once should
4969 // evict all entries that can be removed.
4970 #[test]
4971 fn no_batch_size_limit_on_eviction() {
4972 const MAX_CAPACITY: u64 = 20;
4973
4974 const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
4975 const MAX_LOG_SYNC_REPEATS: u32 = 1;
4976 const EVICTION_BATCH_SIZE: u32 = 1;
4977
4978 let hk_conf = HousekeeperConfig::new(
4979 // Timeout should be ignored when the eviction listener is not provided.
4980 Some(EVICTION_TIMEOUT),
4981 Some(MAX_LOG_SYNC_REPEATS),
4982 Some(EVICTION_BATCH_SIZE),
4983 );
4984
4985 // Create a cache with the LRU policy.
4986 let mut cache = Cache::builder()
4987 .max_capacity(MAX_CAPACITY)
4988 .eviction_policy(EvictionPolicy::lru())
4989 .housekeeper_config(hk_conf)
4990 .build();
4991 cache.reconfigure_for_testing();
4992
4993 // Make the cache exterior immutable.
4994 let cache = cache;
4995
4996 // Fill the cache.
4997 for i in 0..MAX_CAPACITY {
4998 let v = format!("v{i}");
4999 cache.insert(i, v)
5000 }
5001 // The max capacity should not change because we have not called
5002 // `run_pending_tasks` yet.
5003 assert_eq!(cache.entry_count(), 0);
5004
5005 cache.run_pending_tasks();
5006 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5007
5008 // Insert more items to the cache.
5009 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5010 let v = format!("v{i}");
5011 cache.insert(i, v)
5012 }
5013 // The max capacity should not change because we have not called
5014 // `run_pending_tasks` yet.
5015 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5016 // Both old and new keys should exist.
5017 assert!(cache.contains_key(&0)); // old
5018 assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5019 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5020
5021 // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5022 // and evict the LRU entries.
5023 cache.run_pending_tasks();
5024 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5025
5026 // Now all the old keys should be gone.
5027 assert!(!cache.contains_key(&0));
5028 assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5029 // And the new keys should exist.
5030 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5031 }
5032
5033 #[test]
5034 fn slow_eviction_listener() {
5035 const MAX_CAPACITY: u64 = 20;
5036
5037 const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5038 const LISTENER_DELAY: Duration = Duration::from_millis(11);
5039 const MAX_LOG_SYNC_REPEATS: u32 = 1;
5040 const EVICTION_BATCH_SIZE: u32 = 1;
5041
5042 let hk_conf = HousekeeperConfig::new(
5043 Some(EVICTION_TIMEOUT),
5044 Some(MAX_LOG_SYNC_REPEATS),
5045 Some(EVICTION_BATCH_SIZE),
5046 );
5047
5048 let (clock, mock) = Clock::mock();
5049 let listener_call_count = Arc::new(AtomicU8::new(0));
5050 let lcc = Arc::clone(&listener_call_count);
5051
5052 // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5053 // notification.
5054 let listener = move |_k, _v, _cause| {
5055 mock.increment(LISTENER_DELAY);
5056 lcc.fetch_add(1, Ordering::AcqRel);
5057 };
5058
5059 // Create a cache with the LRU policy.
5060 let mut cache = Cache::builder()
5061 .max_capacity(MAX_CAPACITY)
5062 .eviction_policy(EvictionPolicy::lru())
5063 .eviction_listener(listener)
5064 .housekeeper_config(hk_conf)
5065 .clock(clock)
5066 .build();
5067 cache.reconfigure_for_testing();
5068
5069 // Make the cache exterior immutable.
5070 let cache = cache;
5071
5072 // Fill the cache.
5073 for i in 0..MAX_CAPACITY {
5074 let v = format!("v{i}");
5075 cache.insert(i, v)
5076 }
5077 // The max capacity should not change because we have not called
5078 // `run_pending_tasks` yet.
5079 assert_eq!(cache.entry_count(), 0);
5080
5081 cache.run_pending_tasks();
5082 assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5083 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5084
5085 // Insert more items to the cache.
5086 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5087 let v = format!("v{i}");
5088 cache.insert(i, v);
5089 }
5090 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5091
5092 cache.run_pending_tasks();
5093 // Because of the slow listener, cache should get an over capacity.
5094 let mut expected_call_count = 3;
5095 assert_eq!(
5096 listener_call_count.load(Ordering::Acquire) as u64,
5097 expected_call_count
5098 );
5099 assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5100
5101 loop {
5102 cache.run_pending_tasks();
5103
5104 expected_call_count += 3;
5105 if expected_call_count > MAX_CAPACITY {
5106 expected_call_count = MAX_CAPACITY;
5107 }
5108
5109 let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5110 assert_eq!(actual_count, expected_call_count);
5111 let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5112 assert_eq!(cache.entry_count(), expected_entry_count);
5113
5114 if expected_call_count >= MAX_CAPACITY {
5115 break;
5116 }
5117 }
5118
5119 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5120 }
5121
5122 // NOTE: To enable the panic logging, run the following command:
5123 //
5124 // RUST_LOG=moka=info cargo test --features 'logging' -- \
5125 // sync::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5126 //
5127 #[test]
5128 fn recover_from_panicking_eviction_listener() {
5129 #[cfg(feature = "logging")]
5130 let _ = env_logger::builder().is_test(true).try_init();
5131
5132 // The following `Vec`s will hold actual and expected notifications.
5133 let actual = Arc::new(Mutex::new(Vec::new()));
5134 let mut expected = Vec::new();
5135
5136 // Create an eviction listener that panics when it see
5137 // a value "panic now!".
5138 let a1 = Arc::clone(&actual);
5139 let listener = move |k, v, cause| {
5140 if v == "panic now!" {
5141 panic!("Panic now!");
5142 }
5143 a1.lock().push((k, v, cause))
5144 };
5145
5146 // Create a cache with the eviction listener.
5147 let mut cache = Cache::builder()
5148 .name("My Sync Cache")
5149 .eviction_listener(listener)
5150 .build();
5151 cache.reconfigure_for_testing();
5152
5153 // Make the cache exterior immutable.
5154 let cache = cache;
5155
5156 // Insert an okay value.
5157 cache.insert("alice", "a0");
5158 cache.run_pending_tasks();
5159
5160 // Insert a value that will cause the eviction listener to panic.
5161 cache.insert("alice", "panic now!");
5162 expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5163 cache.run_pending_tasks();
5164
5165 // Insert an okay value. This will replace the previous
5166 // value "panic now!" so the eviction listener will panic.
5167 cache.insert("alice", "a2");
5168 cache.run_pending_tasks();
5169 // No more removal notification should be sent.
5170
5171 // Invalidate the okay value.
5172 cache.invalidate(&"alice");
5173 cache.run_pending_tasks();
5174
5175 verify_notification_vec(&cache, actual, &expected);
5176 }
5177
5178 // This test ensures that the `contains_key`, `get` and `invalidate` can use
5179 // borrowed form `&[u8]` for key with type `Vec<u8>`.
5180 // https://github.com/moka-rs/moka/issues/166
5181 #[test]
5182 fn borrowed_forms_of_key() {
5183 let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5184
5185 let key = vec![1_u8];
5186 cache.insert(key.clone(), ());
5187
5188 // key as &Vec<u8>
5189 let key_v: &Vec<u8> = &key;
5190 assert!(cache.contains_key(key_v));
5191 assert_eq!(cache.get(key_v), Some(()));
5192 cache.invalidate(key_v);
5193
5194 cache.insert(key, ());
5195
5196 // key as &[u8]
5197 let key_s: &[u8] = &[1_u8];
5198 assert!(cache.contains_key(key_s));
5199 assert_eq!(cache.get(key_s), Some(()));
5200 cache.invalidate(key_s);
5201 }
5202
5203 // Ignored by default. This test becomes unstable when run in parallel with
5204 // other tests.
5205 #[test]
5206 #[cfg_attr(not(run_flaky_tests), ignore)]
5207 fn drop_value_immediately_after_eviction() {
5208 use crate::common::test_utils::{Counters, Value};
5209
5210 const MAX_CAPACITY: u32 = 500;
5211 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5212
5213 let counters = Arc::new(Counters::default());
5214 let counters1 = Arc::clone(&counters);
5215
5216 let listener = move |_k, _v, cause| match cause {
5217 RemovalCause::Size => counters1.incl_evicted(),
5218 RemovalCause::Explicit => counters1.incl_invalidated(),
5219 _ => (),
5220 };
5221
5222 let mut cache = Cache::builder()
5223 .max_capacity(MAX_CAPACITY as u64)
5224 .eviction_listener(listener)
5225 .build();
5226 cache.reconfigure_for_testing();
5227
5228 // Make the cache exterior immutable.
5229 let cache = cache;
5230
5231 for key in 0..KEYS {
5232 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5233 cache.insert(key, value);
5234 counters.incl_inserted();
5235 cache.run_pending_tasks();
5236 }
5237
5238 let eviction_count = KEYS - MAX_CAPACITY;
5239
5240 cache.run_pending_tasks();
5241 assert_eq!(counters.inserted(), KEYS, "inserted");
5242 assert_eq!(counters.value_created(), KEYS, "value_created");
5243 assert_eq!(counters.evicted(), eviction_count, "evicted");
5244 assert_eq!(counters.invalidated(), 0, "invalidated");
5245 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5246
5247 for key in 0..KEYS {
5248 cache.invalidate(&key);
5249 cache.run_pending_tasks();
5250 }
5251
5252 cache.run_pending_tasks();
5253 assert_eq!(counters.inserted(), KEYS, "inserted");
5254 assert_eq!(counters.value_created(), KEYS, "value_created");
5255 assert_eq!(counters.evicted(), eviction_count, "evicted");
5256 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5257 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5258
5259 std::mem::drop(cache);
5260 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5261 }
5262
5263 // For testing the issue reported by: https://github.com/moka-rs/moka/issues/383
5264 //
5265 // Ignored by default. This test becomes unstable when run in parallel with
5266 // other tests.
5267 #[test]
5268 #[cfg_attr(not(run_flaky_tests), ignore)]
5269 fn ensure_gc_runs_when_dropping_cache() {
5270 let cache = Cache::builder().build();
5271 let val = Arc::new(0);
5272 {
5273 let val = Arc::clone(&val);
5274 cache.get_with(1, move || val);
5275 }
5276 drop(cache);
5277 assert_eq!(Arc::strong_count(&val), 1);
5278 }
5279
5280 #[test]
5281 fn test_debug_format() {
5282 let cache = Cache::new(10);
5283 cache.insert('a', "alice");
5284 cache.insert('b', "bob");
5285 cache.insert('c', "cindy");
5286
5287 let debug_str = format!("{cache:?}");
5288 assert!(debug_str.starts_with('{'));
5289 assert!(debug_str.contains(r#"'a': "alice""#));
5290 assert!(debug_str.contains(r#"'b': "bob""#));
5291 assert!(debug_str.contains(r#"'c': "cindy""#));
5292 assert!(debug_str.ends_with('}'));
5293 }
5294
5295 type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5296
5297 fn verify_notification_vec<K, V, S>(
5298 cache: &Cache<K, V, S>,
5299 actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5300 expected: &[NotificationTuple<K, V>],
5301 ) where
5302 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5303 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5304 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5305 {
5306 // Retries will be needed when testing in a QEMU VM.
5307 const MAX_RETRIES: usize = 5;
5308 let mut retries = 0;
5309 loop {
5310 // Ensure all scheduled notifications have been processed.
5311 cache.run_pending_tasks();
5312 std::thread::sleep(Duration::from_millis(500));
5313
5314 let actual = &*actual.lock();
5315 if actual.len() != expected.len() {
5316 if retries <= MAX_RETRIES {
5317 retries += 1;
5318 continue;
5319 } else {
5320 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5321 }
5322 }
5323
5324 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5325 assert_eq!(actual, expected, "expected[{i}]");
5326 }
5327
5328 break;
5329 }
5330 }
5331
5332 /// Reproduces a bug where `optionally_get_with` on an expired but not yet evicted
5333 /// entry causes the new value's expiration to be cleared, making it never expire.
5334 ///
5335 /// 1. `optionally_get_with` misses at t=0 and inserts value 1 with 2s expiry.
5336 /// 2. At t=1.98s, `optionally_get_with` still hits value 1.
5337 /// 3. At t=2.01s (expired, not yet evicted), `optionally_get_with` should miss and create value 2.
5338 /// 4. At t=12.01s, value 2 should also be expired; `optionally_get_with` should create a new value.
5339 #[test]
5340 fn test_optionally_get_with_expired_entry_bug() {
5341 use std::sync::atomic::{AtomicU32, Ordering};
5342
5343 struct CreateOnlyExpiry;
5344
5345 impl Expiry<&str, u32> for CreateOnlyExpiry {
5346 fn expire_after_create(
5347 &self,
5348 _key: &&str,
5349 _value: &u32,
5350 _current_time: StdInstant,
5351 ) -> Option<Duration> {
5352 Some(Duration::from_secs(2))
5353 }
5354 // expire_after_update use defaults
5355 }
5356
5357 let (clock, mock) = Clock::mock();
5358
5359 let mut cache = Cache::builder()
5360 .max_capacity(100)
5361 .expire_after(CreateOnlyExpiry)
5362 .clock(clock)
5363 .build();
5364 cache.reconfigure_for_testing();
5365 let cache = cache;
5366
5367 // monotonically increasing counter for test values
5368 let counter = AtomicU32::new(0);
5369 let next_value = || {
5370 let v = counter.fetch_add(1, Ordering::SeqCst) + 1;
5371 Some(v)
5372 };
5373
5374 // 1 - time 0
5375 let value = cache.optionally_get_with("key", next_value);
5376 assert_eq!(value, Some(1), "First access should return 1");
5377 cache.run_pending_tasks();
5378
5379 // 2 - time 1.98s
5380 mock.increment(Duration::from_millis(1980));
5381 let value = cache.optionally_get_with("key", next_value);
5382 assert_eq!(value, Some(1), "Access at 1.98s should hit and return 1");
5383
5384 // 3 - time 2.01s Entry is expired but NOT evicted (no housekeeping since expiration)
5385 mock.increment(Duration::from_millis(30)); // 2.01s
5386 let value = cache.optionally_get_with("key", next_value);
5387 assert_eq!(value, Some(2), "Access at 2.01s should miss and return 2");
5388 cache.run_pending_tasks();
5389
5390 // Step 4: t=12.01s - 10 seconds later, entry should have expired
5391 mock.increment(Duration::from_secs(10)); // Now at 12.01s
5392 cache.run_pending_tasks();
5393
5394 let value = cache.optionally_get_with("key", next_value);
5395 assert_ne!(value, Some(2), "Access at 12.01s should not still be 2");
5396 }
5397}