moka/sync/cache.rs
1use super::{
2 value_initializer::{InitResult, ValueInitializer},
3 CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector,
4};
5use crate::{
6 common::{
7 concurrent::{
8 constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp,
9 },
10 time::{Clock, Instant},
11 HousekeeperConfig,
12 },
13 notification::EvictionListener,
14 ops::compute::{self, CompResult},
15 policy::{EvictionPolicy, ExpirationPolicy},
16 sync::{Iter, PredicateId},
17 sync_base::{
18 base_cache::{BaseCache, HouseKeeperArc},
19 iter::ScanningGet,
20 },
21 Entry, Policy, PredicateError,
22};
23
24use crossbeam_channel::{Sender, TrySendError};
25use std::{
26 borrow::Borrow,
27 collections::hash_map::RandomState,
28 fmt,
29 hash::{BuildHasher, Hash},
30 sync::Arc,
31 time::Duration,
32};
33
34/// A thread-safe concurrent synchronous in-memory cache.
35///
36/// `Cache` supports full concurrency of retrievals and a high expected concurrency
37/// for updates.
38///
39/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
40/// storage. `Cache` performs a best-effort bounding of the map using an entry
41/// replacement algorithm to determine which entries to evict when the capacity is
42/// exceeded.
43///
44/// # Table of Contents
45///
46/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
47/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
48/// - [Sharing a cache across threads](#sharing-a-cache-across-threads)
49/// - [No lock is needed](#no-lock-is-needed)
50/// - [Hashing Algorithm](#hashing-algorithm)
51/// - [Example: Size-based Eviction](#example-size-based-eviction)
52/// - [Example: Time-based Expirations](#example-time-based-expirations)
53/// - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
54/// - [Per-entry expiration policy](#per-entry-expiration-policy)
55/// - [Example: Eviction Listener](#example-eviction-listener)
56/// - [You should avoid eviction listener to
57/// panic](#you-should-avoid-eviction-listener-to-panic)
58///
59/// # Example: `insert`, `get` and `invalidate`
60///
61/// Cache entries are manually added using [`insert`](#method.insert) or
62/// [`get_with`](#method.get_with) methods, and are stored in the cache until either
63/// evicted or manually invalidated.
64///
65/// Here's an example of reading and updating a cache by using multiple threads:
66///
67/// ```rust
68/// use moka::sync::Cache;
69///
70/// use std::thread;
71///
72/// fn value(n: usize) -> String {
73/// format!("value {n}")
74/// }
75///
76/// const NUM_THREADS: usize = 16;
77/// const NUM_KEYS_PER_THREAD: usize = 64;
78///
79/// // Create a cache that can store up to 10,000 entries.
80/// let cache = Cache::new(10_000);
81///
82/// // Spawn threads and read and update the cache simultaneously.
83/// let threads: Vec<_> = (0..NUM_THREADS)
84/// .map(|i| {
85/// // To share the same cache across the threads, clone it.
86/// // This is a cheap operation.
87/// let my_cache = cache.clone();
88/// let start = i * NUM_KEYS_PER_THREAD;
89/// let end = (i + 1) * NUM_KEYS_PER_THREAD;
90///
91/// thread::spawn(move || {
92/// // Insert 64 entries. (NUM_KEYS_PER_THREAD = 64)
93/// for key in start..end {
94/// my_cache.insert(key, value(key));
95/// // get() returns Option<String>, a clone of the stored value.
96/// assert_eq!(my_cache.get(&key), Some(value(key)));
97/// }
98///
99/// // Invalidate every 4 element of the inserted entries.
100/// for key in (start..end).step_by(4) {
101/// my_cache.invalidate(&key);
102/// }
103/// })
104/// })
105/// .collect();
106///
107/// // Wait for all threads to complete.
108/// threads.into_iter().for_each(|t| t.join().expect("Failed"));
109///
110/// // Verify the result.
111/// for key in 0..(NUM_THREADS * NUM_KEYS_PER_THREAD) {
112/// if key % 4 == 0 {
113/// assert_eq!(cache.get(&key), None);
114/// } else {
115/// assert_eq!(cache.get(&key), Some(value(key)));
116/// }
117/// }
118/// ```
119///
120/// If you want to atomically initialize and insert a value when the key is not
121/// present, you might want to check other insertion methods
122/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
123///
124/// # Avoiding to clone the value at `get`
125///
126/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
127/// time `get` is called for an existing key, it creates a clone of the stored value
128/// `V` and returns it. This is because the `Cache` allows concurrent updates from
129/// threads so a value stored in the cache can be dropped or replaced at any time by
130/// any other thread. `get` cannot return a reference `&V` as it is impossible to
131/// guarantee the value outlives the reference.
132///
133/// If you want to store values that will be expensive to clone, wrap them by
134/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
135/// thread-safe reference-counted pointer and its `clone()` method is cheap.
136///
137/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
138///
139/// # Sharing a cache across threads
140///
141/// To share a cache across threads, do one of the followings:
142///
143/// - Create a clone of the cache by calling its `clone` method and pass it to other
144/// thread.
145/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
146/// [once_cell][once-cell-crate] create, and set it to a `static` variable.
147///
148/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
149/// reference-counted pointers to the internal data structures.
150///
151/// ## No lock is needed
152///
153/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
154/// by the `Cache` are considered thread-safe, and can be safely called by multiple
155/// threads at the same time. No lock is needed.
156///
157/// [once-cell-crate]: https://crates.io/crates/once_cell
158///
159/// # Hashing Algorithm
160///
161/// By default, `Cache` uses a hashing algorithm selected to provide resistance
162/// against HashDoS attacks. It will be the same one used by
163/// `std::collections::HashMap`, which is currently SipHash 1-3.
164///
165/// While SipHash's performance is very competitive for medium sized keys, other
166/// hashing algorithms will outperform it for small keys such as integers as well as
167/// large keys such as long strings. However those algorithms will typically not
168/// protect against attacks such as HashDoS.
169///
170/// The hashing algorithm can be replaced on a per-`Cache` basis using the
171/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
172/// Many alternative algorithms are available on crates.io, such as the
173/// [AHash][ahash-crate] crate.
174///
175/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
176/// [ahash-crate]: https://crates.io/crates/ahash
177///
178/// # Example: Size-based Eviction
179///
180/// ```rust
181/// use moka::sync::Cache;
182///
183/// // Evict based on the number of entries in the cache.
184/// let cache = Cache::builder()
185/// // Up to 10,000 entries.
186/// .max_capacity(10_000)
187/// // Create the cache.
188/// .build();
189/// cache.insert(1, "one".to_string());
190///
191/// // Evict based on the byte length of strings in the cache.
192/// let cache = Cache::builder()
193/// // A weigher closure takes &K and &V and returns a u32
194/// // representing the relative size of the entry.
195/// .weigher(|_key, value: &String| -> u32 {
196/// value.len().try_into().unwrap_or(u32::MAX)
197/// })
198/// // This cache will hold up to 32MiB of values.
199/// .max_capacity(32 * 1024 * 1024)
200/// .build();
201/// cache.insert(2, "two".to_string());
202/// ```
203///
204/// If your cache should not grow beyond a certain size, use the `max_capacity`
205/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
206/// will try to evict entries that have not been used recently or very often.
207///
208/// At the cache creation time, a weigher closure can be set by the `weigher` method
209/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
210/// returns a `u32` representing the relative size of the entry:
211///
212/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
213/// size of `1`. This means the cache will be bounded by the number of entries.
214/// - If the `weigher` is set, the cache will call the weigher to calculate the
215/// weighted size (relative size) on an entry. This means the cache will be bounded
216/// by the total weighted size of entries.
217///
218/// Note that weighted sizes are not used when making eviction selections.
219///
220/// [builder-struct]: ./struct.CacheBuilder.html
221///
222/// # Example: Time-based Expirations
223///
224/// ## Cache-level TTL and TTI policies
225///
226/// `Cache` supports the following cache-level expiration policies:
227///
228/// - **Time to live (TTL)**: A cached entry will be expired after the specified
229/// duration past from `insert`.
230/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
231/// duration past from `get` or `insert`.
232///
233/// They are a cache-level expiration policies; all entries in the cache will have
234/// the same TTL and/or TTI durations. If you want to set different expiration
235/// durations for different entries, see the next section.
236///
237/// ```rust
238/// use moka::sync::Cache;
239/// use std::time::Duration;
240///
241/// let cache = Cache::builder()
242/// // Time to live (TTL): 30 minutes
243/// .time_to_live(Duration::from_secs(30 * 60))
244/// // Time to idle (TTI): 5 minutes
245/// .time_to_idle(Duration::from_secs( 5 * 60))
246/// // Create the cache.
247/// .build();
248///
249/// // This entry will expire after 5 minutes (TTI) if there is no get().
250/// cache.insert(0, "zero");
251///
252/// // This get() will extend the entry life for another 5 minutes.
253/// cache.get(&0);
254///
255/// // Even though we keep calling get(), the entry will expire
256/// // after 30 minutes (TTL) from the insert().
257/// ```
258///
259/// ## Per-entry expiration policy
260///
261/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
262///
263/// `Expiry` trait provides three callback methods:
264/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
265/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
266/// updated, one of these methods is called. These methods return an
267/// `Option<Duration>`, which is used as the expiration duration of the entry.
268///
269/// `Expiry` trait provides the default implementations of these methods, so you will
270/// implement only the methods you want to customize.
271///
272/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
273/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
274/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
275///
276/// ```rust
277/// use moka::{sync::Cache, Expiry};
278/// use std::time::{Duration, Instant};
279///
280/// // In this example, we will create a `sync::Cache` with `u32` as the key, and
281/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
282/// // expiration of the value, and `String` is the application data of the value.
283///
284/// /// An enum to represent the expiration of a value.
285/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
286/// pub enum Expiration {
287/// /// The value never expires.
288/// Never,
289/// /// The value expires after a short time. (5 seconds in this example)
290/// AfterShortTime,
291/// /// The value expires after a long time. (15 seconds in this example)
292/// AfterLongTime,
293/// }
294///
295/// impl Expiration {
296/// /// Returns the duration of this expiration.
297/// pub fn as_duration(&self) -> Option<Duration> {
298/// match self {
299/// Expiration::Never => None,
300/// Expiration::AfterShortTime => Some(Duration::from_secs(5)),
301/// Expiration::AfterLongTime => Some(Duration::from_secs(15)),
302/// }
303/// }
304/// }
305///
306/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
307/// /// default implementations of three callback methods `expire_after_create`,
308/// /// `expire_after_read`, and `expire_after_update`.
309/// ///
310/// /// In this example, we only override the `expire_after_create` method.
311/// pub struct MyExpiry;
312///
313/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
314/// /// Returns the duration of the expiration of the value that was just
315/// /// created.
316/// fn expire_after_create(
317/// &self,
318/// _key: &u32,
319/// value: &(Expiration, String),
320/// _current_time: Instant,
321/// ) -> Option<Duration> {
322/// let duration = value.0.as_duration();
323/// println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
324/// duration
325/// }
326/// }
327///
328/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
329/// // eviction listener.
330/// let expiry = MyExpiry;
331///
332/// let eviction_listener = |key, _value, cause| {
333/// println!("Evicted key {key}. Cause: {cause:?}");
334/// };
335///
336/// let cache = Cache::builder()
337/// .max_capacity(100)
338/// .expire_after(expiry)
339/// .eviction_listener(eviction_listener)
340/// .build();
341///
342/// // Insert some entries into the cache with different expirations.
343/// cache.get_with(0, || (Expiration::AfterShortTime, "a".to_string()));
344/// cache.get_with(1, || (Expiration::AfterLongTime, "b".to_string()));
345/// cache.get_with(2, || (Expiration::Never, "c".to_string()));
346///
347/// // Verify that all the inserted entries exist.
348/// assert!(cache.contains_key(&0));
349/// assert!(cache.contains_key(&1));
350/// assert!(cache.contains_key(&2));
351///
352/// // Sleep for 6 seconds. Key 0 should expire.
353/// println!("\nSleeping for 6 seconds...\n");
354/// std::thread::sleep(Duration::from_secs(6));
355/// println!("Entry count: {}", cache.entry_count());
356///
357/// // Verify that key 0 has been evicted.
358/// assert!(!cache.contains_key(&0));
359/// assert!(cache.contains_key(&1));
360/// assert!(cache.contains_key(&2));
361///
362/// // Sleep for 10 more seconds. Key 1 should expire.
363/// println!("\nSleeping for 10 seconds...\n");
364/// std::thread::sleep(Duration::from_secs(10));
365/// println!("Entry count: {}", cache.entry_count());
366///
367/// // Verify that key 1 has been evicted.
368/// assert!(!cache.contains_key(&1));
369/// assert!(cache.contains_key(&2));
370///
371/// // Manually invalidate key 2.
372/// cache.invalidate(&2);
373/// assert!(!cache.contains_key(&2));
374///
375/// println!("\nSleeping for a second...\n");
376/// std::thread::sleep(Duration::from_secs(1));
377/// println!("Entry count: {}", cache.entry_count());
378///
379/// println!("\nDone!");
380/// ```
381///
382/// # Example: Eviction Listener
383///
384/// A `Cache` can be configured with an eviction listener, a closure that is called
385/// every time there is a cache eviction. The listener takes three parameters: the
386/// key and value of the evicted entry, and the
387/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
388/// entry was evicted.
389///
390/// An eviction listener can be used to keep other data structures in sync with the
391/// cache, for example.
392///
393/// The following example demonstrates how to use an eviction listener with
394/// time-to-live expiration to manage the lifecycle of temporary files on a
395/// filesystem. The cache stores the paths of the files, and when one of them has
396/// expired, the eviction listener will be called with the path, so it can remove the
397/// file from the filesystem.
398///
399/// ```rust
400/// // Cargo.toml
401/// //
402/// // [dependencies]
403/// // anyhow = "1.0"
404/// // uuid = { version = "1.1", features = ["v4"] }
405///
406/// use moka::{sync::Cache, notification};
407///
408/// use anyhow::{anyhow, Context};
409/// use std::{
410/// fs, io,
411/// path::{Path, PathBuf},
412/// sync::{Arc, RwLock},
413/// time::Duration,
414/// };
415/// use uuid::Uuid;
416///
417/// /// The DataFileManager writes, reads and removes data files.
418/// struct DataFileManager {
419/// base_dir: PathBuf,
420/// file_count: usize,
421/// }
422///
423/// impl DataFileManager {
424/// fn new(base_dir: PathBuf) -> Self {
425/// Self {
426/// base_dir,
427/// file_count: 0,
428/// }
429/// }
430///
431/// fn write_data_file(
432/// &mut self,
433/// key: impl AsRef<str>,
434/// contents: String
435/// ) -> io::Result<PathBuf> {
436/// // Use the key as a part of the filename.
437/// let mut path = self.base_dir.to_path_buf();
438/// path.push(key.as_ref());
439///
440/// assert!(!path.exists(), "Path already exists: {path:?}");
441///
442/// // create the file at the path and write the contents to the file.
443/// fs::write(&path, contents)?;
444/// self.file_count += 1;
445/// println!("Created a data file at {path:?} (file count: {})", self.file_count);
446/// Ok(path)
447/// }
448///
449/// fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
450/// // Reads the contents of the file at the path, and return the contents.
451/// fs::read_to_string(path)
452/// }
453///
454/// fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
455/// // Remove the file at the path.
456/// fs::remove_file(path.as_ref())?;
457/// self.file_count -= 1;
458/// println!(
459/// "Removed a data file at {:?} (file count: {})",
460/// path.as_ref(),
461/// self.file_count
462/// );
463///
464/// Ok(())
465/// }
466/// }
467///
468/// fn main() -> anyhow::Result<()> {
469/// // Create an instance of the DataFileManager and wrap it with
470/// // Arc<RwLock<_>> so it can be shared across threads.
471/// let mut base_dir = std::env::temp_dir();
472/// base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
473/// println!("base_dir: {base_dir:?}");
474/// std::fs::create_dir(&base_dir)?;
475///
476/// let file_mgr = DataFileManager::new(base_dir);
477/// let file_mgr = Arc::new(RwLock::new(file_mgr));
478///
479/// let file_mgr1 = Arc::clone(&file_mgr);
480///
481/// // Create an eviction listener closure.
482/// let eviction_listener = move |k, v: PathBuf, cause| {
483/// // Try to remove the data file at the path `v`.
484/// println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
485///
486/// // Acquire the write lock of the DataFileManager. We must handle
487/// // error cases here to prevent the listener from panicking.
488/// match file_mgr1.write() {
489/// Err(_e) => {
490/// eprintln!("The lock has been poisoned");
491/// }
492/// Ok(mut mgr) => {
493/// // Remove the data file using the DataFileManager.
494/// if let Err(_e) = mgr.remove_data_file(v.as_path()) {
495/// eprintln!("Failed to remove a data file at {v:?}");
496/// }
497/// }
498/// }
499/// };
500///
501/// // Create the cache. Set time to live for two seconds and set the
502/// // eviction listener.
503/// let cache = Cache::builder()
504/// .max_capacity(100)
505/// .time_to_live(Duration::from_secs(2))
506/// .eviction_listener(eviction_listener)
507/// .build();
508///
509/// // Insert an entry to the cache.
510/// // This will create and write a data file for the key "user1", store the
511/// // path of the file to the cache, and return it.
512/// println!("== try_get_with()");
513/// let key = "user1";
514/// let path = cache
515/// .try_get_with(key, || -> anyhow::Result<_> {
516/// let mut mgr = file_mgr
517/// .write()
518/// .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
519/// let path = mgr
520/// .write_data_file(key, "user data".into())
521/// .with_context(|| format!("Failed to create a data file"))?;
522/// Ok(path)
523/// })
524/// .map_err(|e| anyhow!("{e}"))?;
525///
526/// // Read the data file at the path and print the contents.
527/// println!("\n== read_data_file()");
528/// {
529/// let mgr = file_mgr
530/// .read()
531/// .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
532/// let contents = mgr
533/// .read_data_file(path.as_path())
534/// .with_context(|| format!("Failed to read data from {path:?}"))?;
535/// println!("contents: {contents}");
536/// }
537///
538/// // Sleep for five seconds. While sleeping, the cache entry for key "user1"
539/// // will be expired and evicted, so the eviction listener will be called to
540/// // remove the file.
541/// std::thread::sleep(Duration::from_secs(5));
542///
543/// cache.run_pending_tasks();
544///
545/// Ok(())
546/// }
547/// ```
548///
549/// ## You should avoid eviction listener to panic
550///
551/// It is very important to make an eviction listener closure not to panic.
552/// Otherwise, the cache will stop calling the listener after a panic. This is an
553/// intended behavior because the cache cannot know whether it is memory safe or not
554/// to call the panicked listener again.
555///
556/// When a listener panics, the cache will swallow the panic and disable the
557/// listener. If you want to know when a listener panics and the reason of the panic,
558/// you can enable an optional `logging` feature of Moka and check error-level logs.
559///
560/// To enable the `logging`, do the followings:
561///
562/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
563/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
564/// `info`, ...):
565/// - If you are using the `env_logger` crate, you can achieve this by setting
566/// `RUST_LOG` environment variable to `moka=error`.
567/// 3. If you have more than one caches, you may want to set a distinct name for each
568/// cache by using cache builder's [`name`][builder-name-method] method. The name
569/// will appear in the log.
570///
571/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
572///
573pub struct Cache<K, V, S = RandomState> {
574 pub(crate) base: BaseCache<K, V, S>,
575 value_initializer: Arc<ValueInitializer<K, V, S>>,
576}
577
578// TODO: https://github.com/moka-rs/moka/issues/54
579#[allow(clippy::non_send_fields_in_send_ty)]
580unsafe impl<K, V, S> Send for Cache<K, V, S>
581where
582 K: Send + Sync,
583 V: Send + Sync,
584 S: Send,
585{
586}
587
588unsafe impl<K, V, S> Sync for Cache<K, V, S>
589where
590 K: Send + Sync,
591 V: Send + Sync,
592 S: Sync,
593{
594}
595
596// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
597impl<K, V, S> Clone for Cache<K, V, S> {
598 /// Makes a clone of this shared cache.
599 ///
600 /// This operation is cheap as it only creates thread-safe reference counted
601 /// pointers to the shared internal data structures.
602 fn clone(&self) -> Self {
603 Self {
604 base: self.base.clone(),
605 value_initializer: Arc::clone(&self.value_initializer),
606 }
607 }
608}
609
610impl<K, V, S> fmt::Debug for Cache<K, V, S>
611where
612 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
613 V: fmt::Debug + Clone + Send + Sync + 'static,
614 // TODO: Remove these bounds from S.
615 S: BuildHasher + Clone + Send + Sync + 'static,
616{
617 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618 let mut d_map = f.debug_map();
619
620 for (k, v) in self {
621 d_map.entry(&k, &v);
622 }
623
624 d_map.finish()
625 }
626}
627
628impl<K, V, S> Cache<K, V, S> {
629 /// Returns cache’s name.
630 pub fn name(&self) -> Option<&str> {
631 self.base.name()
632 }
633
634 /// Returns a read-only cache policy of this cache.
635 ///
636 /// At this time, cache policy cannot be modified after cache creation.
637 /// A future version may support to modify it.
638 pub fn policy(&self) -> Policy {
639 self.base.policy()
640 }
641
642 /// Returns an approximate number of entries in this cache.
643 ///
644 /// The value returned is _an estimate_; the actual count may differ if there are
645 /// concurrent insertions or removals, or if some entries are pending removal due
646 /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
647 /// first.
648 ///
649 /// # Example
650 ///
651 /// ```rust
652 /// use moka::sync::Cache;
653 ///
654 /// let cache = Cache::new(10);
655 /// cache.insert('n', "Netherland Dwarf");
656 /// cache.insert('l', "Lop Eared");
657 /// cache.insert('d', "Dutch");
658 ///
659 /// // Ensure an entry exists.
660 /// assert!(cache.contains_key(&'n'));
661 ///
662 /// // However, followings may print stale number zeros instead of threes.
663 /// println!("{}", cache.entry_count()); // -> 0
664 /// println!("{}", cache.weighted_size()); // -> 0
665 ///
666 /// // To mitigate the inaccuracy, Call `run_pending_tasks` method to run
667 /// // pending internal tasks.
668 /// cache.run_pending_tasks();
669 ///
670 /// // Followings will print the actual numbers.
671 /// println!("{}", cache.entry_count()); // -> 3
672 /// println!("{}", cache.weighted_size()); // -> 3
673 /// ```
674 ///
675 pub fn entry_count(&self) -> u64 {
676 self.base.entry_count()
677 }
678
679 /// Returns an approximate total weighted size of entries in this cache.
680 ///
681 /// The value returned is _an estimate_; the actual size may differ if there are
682 /// concurrent insertions or removals, or if some entries are pending removal due
683 /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
684 /// first. See [`entry_count`](#method.entry_count) for a sample code.
685 pub fn weighted_size(&self) -> u64 {
686 self.base.weighted_size()
687 }
688}
689
690impl<K, V> Cache<K, V, RandomState>
691where
692 K: Hash + Eq + Send + Sync + 'static,
693 V: Clone + Send + Sync + 'static,
694{
695 /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
696 ///
697 /// To adjust various configuration knobs such as `initial_capacity` or
698 /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
699 ///
700 /// [builder-struct]: ./struct.CacheBuilder.html
701 pub fn new(max_capacity: u64) -> Self {
702 let build_hasher = RandomState::default();
703 Self::with_everything(
704 None,
705 Some(max_capacity),
706 None,
707 build_hasher,
708 None,
709 EvictionPolicy::default(),
710 None,
711 ExpirationPolicy::default(),
712 HousekeeperConfig::default(),
713 false,
714 Clock::default(),
715 )
716 }
717
718 /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` or
719 /// `SegmentedCache` with various configuration knobs.
720 ///
721 /// [builder-struct]: ./struct.CacheBuilder.html
722 pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
723 CacheBuilder::default()
724 }
725}
726
727impl<K, V, S> Cache<K, V, S>
728where
729 K: Hash + Eq + Send + Sync + 'static,
730 V: Clone + Send + Sync + 'static,
731 S: BuildHasher + Clone + Send + Sync + 'static,
732{
733 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
734 #[allow(clippy::too_many_arguments)]
735 pub(crate) fn with_everything(
736 name: Option<String>,
737 max_capacity: Option<u64>,
738 initial_capacity: Option<usize>,
739 build_hasher: S,
740 weigher: Option<Weigher<K, V>>,
741 eviction_policy: EvictionPolicy,
742 eviction_listener: Option<EvictionListener<K, V>>,
743 expiration_policy: ExpirationPolicy<K, V>,
744 housekeeper_config: HousekeeperConfig,
745 invalidator_enabled: bool,
746 clock: Clock,
747 ) -> Self {
748 Self {
749 base: BaseCache::new(
750 name,
751 max_capacity,
752 initial_capacity,
753 build_hasher.clone(),
754 weigher,
755 eviction_policy,
756 eviction_listener,
757 expiration_policy,
758 housekeeper_config,
759 invalidator_enabled,
760 clock,
761 ),
762 value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
763 }
764 }
765
766 /// Returns `true` if the cache contains a value for the key.
767 ///
768 /// Unlike the `get` method, this method is not considered a cache read operation,
769 /// so it does not update the historic popularity estimator or reset the idle
770 /// timer for the key.
771 ///
772 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
773 /// on the borrowed form _must_ match those for the key type.
774 pub fn contains_key<Q>(&self, key: &Q) -> bool
775 where
776 K: Borrow<Q>,
777 Q: Hash + Eq + ?Sized,
778 {
779 self.base.contains_key_with_hash(key, self.base.hash(key))
780 }
781
782 pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
783 where
784 K: Borrow<Q>,
785 Q: Hash + Eq + ?Sized,
786 {
787 self.base.contains_key_with_hash(key, hash)
788 }
789
790 /// Returns a _clone_ of the value corresponding to the key.
791 ///
792 /// If you want to store values that will be expensive to clone, wrap them by
793 /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
794 /// thread-safe reference-counted pointer and its `clone()` method is cheap.
795 ///
796 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
797 /// on the borrowed form _must_ match those for the key type.
798 ///
799 /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
800 pub fn get<Q>(&self, key: &Q) -> Option<V>
801 where
802 K: Borrow<Q>,
803 Q: Hash + Eq + ?Sized,
804 {
805 self.base
806 .get_with_hash(key, self.base.hash(key), false)
807 .map(Entry::into_value)
808 }
809
810 pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
811 where
812 K: Borrow<Q>,
813 Q: Hash + Eq + ?Sized,
814 {
815 self.base.get_with_hash(key, hash, need_key)
816 }
817
818 /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
819 /// select or insert an entry.
820 ///
821 /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
822 ///
823 /// # Example
824 ///
825 /// ```rust
826 /// use moka::sync::Cache;
827 ///
828 /// let cache: Cache<String, u32> = Cache::new(100);
829 /// let key = "key1".to_string();
830 ///
831 /// let entry = cache.entry(key.clone()).or_insert(3);
832 /// assert!(entry.is_fresh());
833 /// assert_eq!(entry.key(), &key);
834 /// assert_eq!(entry.into_value(), 3);
835 ///
836 /// let entry = cache.entry(key).or_insert(6);
837 /// // Not fresh because the value was already in the cache.
838 /// assert!(!entry.is_fresh());
839 /// assert_eq!(entry.into_value(), 3);
840 /// ```
841 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
842 where
843 K: Hash + Eq,
844 {
845 let hash = self.base.hash(&key);
846 OwnedKeyEntrySelector::new(key, hash, self)
847 }
848
849 /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
850 /// can be used to select or insert an entry.
851 ///
852 /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
853 ///
854 /// # Example
855 ///
856 /// ```rust
857 /// use moka::sync::Cache;
858 ///
859 /// let cache: Cache<String, u32> = Cache::new(100);
860 /// let key = "key1".to_string();
861 ///
862 /// let entry = cache.entry_by_ref(&key).or_insert(3);
863 /// assert!(entry.is_fresh());
864 /// assert_eq!(entry.key(), &key);
865 /// assert_eq!(entry.into_value(), 3);
866 ///
867 /// let entry = cache.entry_by_ref(&key).or_insert(6);
868 /// // Not fresh because the value was already in the cache.
869 /// assert!(!entry.is_fresh());
870 /// assert_eq!(entry.into_value(), 3);
871 /// ```
872 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
873 where
874 K: Borrow<Q>,
875 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
876 {
877 let hash = self.base.hash(key);
878 RefKeyEntrySelector::new(key, hash, self)
879 }
880
881 /// Returns a _clone_ of the value corresponding to the key. If the value does
882 /// not exist, evaluates the `init` closure and inserts the output.
883 ///
884 /// # Concurrent calls on the same key
885 ///
886 /// This method guarantees that concurrent calls on the same not-existing key are
887 /// coalesced into one evaluation of the `init` closure. Only one of the calls
888 /// evaluates its closure, and other calls wait for that closure to complete.
889 ///
890 /// The following code snippet demonstrates this behavior:
891 ///
892 /// ```rust
893 /// use moka::sync::Cache;
894 /// use std::{sync::Arc, thread};
895 ///
896 /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
897 /// let cache = Cache::new(100);
898 ///
899 /// // Spawn four threads.
900 /// let threads: Vec<_> = (0..4_u8)
901 /// .map(|task_id| {
902 /// let my_cache = cache.clone();
903 /// thread::spawn(move || {
904 /// println!("Thread {task_id} started.");
905 ///
906 /// // Try to insert and get the value for key1. Although all four
907 /// // threads will call `get_with` at the same time, the `init` closure
908 /// // must be evaluated only once.
909 /// let value = my_cache.get_with("key1", || {
910 /// println!("Thread {task_id} inserting a value.");
911 /// Arc::new(vec![0u8; TEN_MIB])
912 /// });
913 ///
914 /// // Ensure the value exists now.
915 /// assert_eq!(value.len(), TEN_MIB);
916 /// assert!(my_cache.get(&"key1").is_some());
917 ///
918 /// println!("Thread {task_id} got the value. (len: {})", value.len());
919 /// })
920 /// })
921 /// .collect();
922 ///
923 /// // Wait all threads to complete.
924 /// threads
925 /// .into_iter()
926 /// .for_each(|t| t.join().expect("Thread failed"));
927 /// ```
928 ///
929 /// **Result**
930 ///
931 /// - The `init` closure was called exactly once by thread 1.
932 /// - Other threads were blocked until thread 1 inserted the value.
933 ///
934 /// ```console
935 /// Thread 1 started.
936 /// Thread 0 started.
937 /// Thread 3 started.
938 /// Thread 2 started.
939 /// Thread 1 inserting a value.
940 /// Thread 2 got the value. (len: 10485760)
941 /// Thread 1 got the value. (len: 10485760)
942 /// Thread 0 got the value. (len: 10485760)
943 /// Thread 3 got the value. (len: 10485760)
944 /// ```
945 ///
946 /// # Panics
947 ///
948 /// This method panics when the `init` closure has panicked. When it happens,
949 /// only the caller whose `init` closure panicked will get the panic (e.g. only
950 /// thread 1 in the above sample). If there are other calls in progress (e.g.
951 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
952 /// remaining `init` closure.
953 ///
954 pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
955 let hash = self.base.hash(&key);
956 let key = Arc::new(key);
957 let replace_if = None as Option<fn(&V) -> bool>;
958 self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
959 .into_value()
960 }
961
962 /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
963 /// key, you can pass a reference to the key. If the key does not exist in the
964 /// cache, the key will be cloned to create new entry in the cache.
965 pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
966 where
967 K: Borrow<Q>,
968 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
969 {
970 let hash = self.base.hash(key);
971 let replace_if = None as Option<fn(&V) -> bool>;
972
973 self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
974 .into_value()
975 }
976
977 /// TODO: Remove this in v0.13.0.
978 /// Deprecated, replaced with
979 /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
980 #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
981 pub fn get_with_if(
982 &self,
983 key: K,
984 init: impl FnOnce() -> V,
985 replace_if: impl FnMut(&V) -> bool,
986 ) -> V {
987 let hash = self.base.hash(&key);
988 let key = Arc::new(key);
989 self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
990 .into_value()
991 }
992
993 pub(crate) fn get_or_insert_with_hash_and_fun(
994 &self,
995 key: Arc<K>,
996 hash: u64,
997 init: impl FnOnce() -> V,
998 mut replace_if: Option<impl FnMut(&V) -> bool>,
999 need_key: bool,
1000 ) -> Entry<K, V> {
1001 self.base
1002 .get_with_hash_and_ignore_if(&key, hash, replace_if.as_mut(), need_key)
1003 .unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key))
1004 }
1005
1006 // Need to create new function instead of using the existing
1007 // `get_or_insert_with_hash_and_fun`. The reason is `by_ref` function will
1008 // require key reference to have `ToOwned` trait. If we modify the existing
1009 // `get_or_insert_with_hash_and_fun` function, it will require all the existing
1010 // apis that depends on it to make the `K` to have `ToOwned` trait.
1011 pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1012 &self,
1013 key: &Q,
1014 hash: u64,
1015 init: impl FnOnce() -> V,
1016 mut replace_if: Option<impl FnMut(&V) -> bool>,
1017 need_key: bool,
1018 ) -> Entry<K, V>
1019 where
1020 K: Borrow<Q>,
1021 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1022 {
1023 self.base
1024 .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key)
1025 .unwrap_or_else(|| {
1026 let key = Arc::new(key.to_owned());
1027 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1028 })
1029 }
1030
1031 pub(crate) fn insert_with_hash_and_fun(
1032 &self,
1033 key: Arc<K>,
1034 hash: u64,
1035 init: impl FnOnce() -> V,
1036 mut replace_if: Option<impl FnMut(&V) -> bool>,
1037 need_key: bool,
1038 ) -> Entry<K, V> {
1039 let get = || {
1040 self.base
1041 .get_with_hash_without_recording(&key, hash, replace_if.as_mut())
1042 };
1043 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1044
1045 let k = if need_key {
1046 Some(Arc::clone(&key))
1047 } else {
1048 None
1049 };
1050
1051 let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1052 let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1053
1054 match self
1055 .value_initializer
1056 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1057 {
1058 InitResult::Initialized(v) => {
1059 crossbeam_epoch::pin().flush();
1060 Entry::new(k, v, true, false)
1061 }
1062 InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1063 InitResult::InitErr(_) => unreachable!(),
1064 }
1065 }
1066
1067 pub(crate) fn get_or_insert_with_hash(
1068 &self,
1069 key: Arc<K>,
1070 hash: u64,
1071 init: impl FnOnce() -> V,
1072 ) -> Entry<K, V> {
1073 match self.base.get_with_hash(&key, hash, true) {
1074 Some(entry) => entry,
1075 None => {
1076 let value = init();
1077 self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1078 Entry::new(Some(key), value, true, false)
1079 }
1080 }
1081 }
1082
1083 pub(crate) fn get_or_insert_with_hash_by_ref<Q>(
1084 &self,
1085 key: &Q,
1086 hash: u64,
1087 init: impl FnOnce() -> V,
1088 ) -> Entry<K, V>
1089 where
1090 K: Borrow<Q>,
1091 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1092 {
1093 match self.base.get_with_hash(key, hash, true) {
1094 Some(entry) => entry,
1095 None => {
1096 let key = Arc::new(key.to_owned());
1097 let value = init();
1098 self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1099 Entry::new(Some(key), value, true, false)
1100 }
1101 }
1102 }
1103
1104 /// Returns a _clone_ of the value corresponding to the key. If the value does
1105 /// not exist, evaluates the `init` closure, and inserts the value if
1106 /// `Some(value)` was returned. If `None` was returned from the closure, this
1107 /// method does not insert a value and returns `None`.
1108 ///
1109 /// # Concurrent calls on the same key
1110 ///
1111 /// This method guarantees that concurrent calls on the same not-existing key are
1112 /// coalesced into one evaluation of the `init` closure. Only one of the calls
1113 /// evaluates its closure, and other calls wait for that closure to complete.
1114 ///
1115 /// The following code snippet demonstrates this behavior:
1116 ///
1117 /// ```rust
1118 /// use moka::sync::Cache;
1119 /// use std::{path::Path, thread};
1120 ///
1121 /// /// This function tries to get the file size in bytes.
1122 /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Option<u64> {
1123 /// println!("get_file_size() called by thread {thread_id}.");
1124 /// std::fs::metadata(path).ok().map(|m| m.len())
1125 /// }
1126 ///
1127 /// let cache = Cache::new(100);
1128 ///
1129 /// // Spawn four threads.
1130 /// let threads: Vec<_> = (0..4_u8)
1131 /// .map(|thread_id| {
1132 /// let my_cache = cache.clone();
1133 /// thread::spawn(move || {
1134 /// println!("Thread {thread_id} started.");
1135 ///
1136 /// // Try to insert and get the value for key1. Although all four
1137 /// // threads will call `optionally_get_with` at the same time,
1138 /// // get_file_size() must be called only once.
1139 /// let value = my_cache.optionally_get_with(
1140 /// "key1",
1141 /// || get_file_size(thread_id, "./Cargo.toml"),
1142 /// );
1143 ///
1144 /// // Ensure the value exists now.
1145 /// assert!(value.is_some());
1146 /// assert!(my_cache.get(&"key1").is_some());
1147 ///
1148 /// println!(
1149 /// "Thread {thread_id} got the value. (len: {})",
1150 /// value.unwrap()
1151 /// );
1152 /// })
1153 /// })
1154 /// .collect();
1155 ///
1156 /// // Wait all threads to complete.
1157 /// threads
1158 /// .into_iter()
1159 /// .for_each(|t| t.join().expect("Thread failed"));
1160 /// ```
1161 ///
1162 /// **Result**
1163 ///
1164 /// - `get_file_size()` was called exactly once by thread 0.
1165 /// - Other threads were blocked until thread 0 inserted the value.
1166 ///
1167 /// ```console
1168 /// Thread 0 started.
1169 /// Thread 1 started.
1170 /// Thread 2 started.
1171 /// get_file_size() called by thread 0.
1172 /// Thread 3 started.
1173 /// Thread 2 got the value. (len: 1466)
1174 /// Thread 0 got the value. (len: 1466)
1175 /// Thread 1 got the value. (len: 1466)
1176 /// Thread 3 got the value. (len: 1466)
1177 /// ```
1178 ///
1179 /// # Panics
1180 ///
1181 /// This method panics when the `init` closure has panicked. When it happens,
1182 /// only the caller whose `init` closure panicked will get the panic (e.g. only
1183 /// thread 1 in the above sample). If there are other calls in progress (e.g.
1184 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1185 /// remaining `init` closure.
1186 ///
1187 pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1188 where
1189 F: FnOnce() -> Option<V>,
1190 {
1191 let hash = self.base.hash(&key);
1192 let key = Arc::new(key);
1193
1194 self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1195 .map(Entry::into_value)
1196 }
1197
1198 /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1199 /// of passing an owned key, you can pass a reference to the key. If the key does
1200 /// not exist in the cache, the key will be cloned to create new entry in the
1201 /// cache.
1202 pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1203 where
1204 F: FnOnce() -> Option<V>,
1205 K: Borrow<Q>,
1206 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1207 {
1208 let hash = self.base.hash(key);
1209 self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1210 .map(Entry::into_value)
1211 }
1212
1213 pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
1214 &self,
1215 key: Arc<K>,
1216 hash: u64,
1217 init: F,
1218 need_key: bool,
1219 ) -> Option<Entry<K, V>>
1220 where
1221 F: FnOnce() -> Option<V>,
1222 {
1223 let entry = self.get_with_hash(&key, hash, need_key);
1224 if entry.is_some() {
1225 return entry;
1226 }
1227
1228 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1229 }
1230
1231 pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1232 &self,
1233 key: &Q,
1234 hash: u64,
1235 init: F,
1236 need_key: bool,
1237 ) -> Option<Entry<K, V>>
1238 where
1239 F: FnOnce() -> Option<V>,
1240 K: Borrow<Q>,
1241 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1242 {
1243 let entry = self.get_with_hash(key, hash, need_key);
1244 if entry.is_some() {
1245 return entry;
1246 }
1247
1248 let key = Arc::new(key.to_owned());
1249 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1250 }
1251
1252 pub(super) fn optionally_insert_with_hash_and_fun<F>(
1253 &self,
1254 key: Arc<K>,
1255 hash: u64,
1256 init: F,
1257 need_key: bool,
1258 ) -> Option<Entry<K, V>>
1259 where
1260 F: FnOnce() -> Option<V>,
1261 {
1262 let get = || {
1263 let ignore_if = None as Option<&mut fn(&V) -> bool>;
1264 self.base
1265 .get_with_hash_without_recording(&key, hash, ignore_if)
1266 };
1267 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1268
1269 let k = if need_key {
1270 Some(Arc::clone(&key))
1271 } else {
1272 None
1273 };
1274
1275 let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1276 let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1277
1278 match self
1279 .value_initializer
1280 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1281 {
1282 InitResult::Initialized(v) => {
1283 crossbeam_epoch::pin().flush();
1284 Some(Entry::new(k, v, true, false))
1285 }
1286 InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1287 InitResult::InitErr(_) => {
1288 crossbeam_epoch::pin().flush();
1289 None
1290 }
1291 }
1292 }
1293
1294 /// Returns a _clone_ of the value corresponding to the key. If the value does
1295 /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
1296 /// was returned. If `Err(_)` was returned from the closure, this method does not
1297 /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1298 ///
1299 /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1300 ///
1301 /// # Concurrent calls on the same key
1302 ///
1303 /// This method guarantees that concurrent calls on the same not-existing key are
1304 /// coalesced into one evaluation of the `init` closure (as long as these
1305 /// closures return the same error type). Only one of the calls evaluates its
1306 /// closure, and other calls wait for that closure to complete.
1307 ///
1308 /// The following code snippet demonstrates this behavior:
1309 ///
1310 /// ```rust
1311 /// use moka::sync::Cache;
1312 /// use std::{path::Path, thread};
1313 ///
1314 /// /// This function tries to get the file size in bytes.
1315 /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Result<u64, std::io::Error> {
1316 /// println!("get_file_size() called by thread {thread_id}.");
1317 /// Ok(std::fs::metadata(path)?.len())
1318 /// }
1319 ///
1320 /// let cache = Cache::new(100);
1321 ///
1322 /// // Spawn four threads.
1323 /// let threads: Vec<_> = (0..4_u8)
1324 /// .map(|thread_id| {
1325 /// let my_cache = cache.clone();
1326 /// thread::spawn(move || {
1327 /// println!("Thread {thread_id} started.");
1328 ///
1329 /// // Try to insert and get the value for key1. Although all four
1330 /// // threads will call `try_get_with` at the same time,
1331 /// // get_file_size() must be called only once.
1332 /// let value = my_cache.try_get_with(
1333 /// "key1",
1334 /// || get_file_size(thread_id, "./Cargo.toml"),
1335 /// );
1336 ///
1337 /// // Ensure the value exists now.
1338 /// assert!(value.is_ok());
1339 /// assert!(my_cache.get(&"key1").is_some());
1340 ///
1341 /// println!(
1342 /// "Thread {thread_id} got the value. (len: {})",
1343 /// value.unwrap()
1344 /// );
1345 /// })
1346 /// })
1347 /// .collect();
1348 ///
1349 /// // Wait all threads to complete.
1350 /// threads
1351 /// .into_iter()
1352 /// .for_each(|t| t.join().expect("Thread failed"));
1353 /// ```
1354 ///
1355 /// **Result**
1356 ///
1357 /// - `get_file_size()` was called exactly once by thread 1.
1358 /// - Other threads were blocked until thread 1 inserted the value.
1359 ///
1360 /// ```console
1361 /// Thread 1 started.
1362 /// Thread 2 started.
1363 /// get_file_size() called by thread 1.
1364 /// Thread 3 started.
1365 /// Thread 0 started.
1366 /// Thread 2 got the value. (len: 1466)
1367 /// Thread 0 got the value. (len: 1466)
1368 /// Thread 1 got the value. (len: 1466)
1369 /// Thread 3 got the value. (len: 1466)
1370 /// ```
1371 ///
1372 /// # Panics
1373 ///
1374 /// This method panics when the `init` closure has panicked. When it happens,
1375 /// only the caller whose `init` closure panicked will get the panic (e.g. only
1376 /// thread 1 in the above sample). If there are other calls in progress (e.g.
1377 /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1378 /// remaining `init` closure.
1379 ///
1380 pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1381 where
1382 F: FnOnce() -> Result<V, E>,
1383 E: Send + Sync + 'static,
1384 {
1385 let hash = self.base.hash(&key);
1386 let key = Arc::new(key);
1387 self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1388 .map(Entry::into_value)
1389 }
1390
1391 /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1392 /// owned key, you can pass a reference to the key. If the key does not exist in
1393 /// the cache, the key will be cloned to create new entry in the cache.
1394 pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1395 where
1396 F: FnOnce() -> Result<V, E>,
1397 E: Send + Sync + 'static,
1398 K: Borrow<Q>,
1399 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1400 {
1401 let hash = self.base.hash(key);
1402 self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1403 .map(Entry::into_value)
1404 }
1405
1406 pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
1407 &self,
1408 key: Arc<K>,
1409 hash: u64,
1410 init: F,
1411 need_key: bool,
1412 ) -> Result<Entry<K, V>, Arc<E>>
1413 where
1414 F: FnOnce() -> Result<V, E>,
1415 E: Send + Sync + 'static,
1416 {
1417 if let Some(entry) = self.get_with_hash(&key, hash, need_key) {
1418 return Ok(entry);
1419 }
1420
1421 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1422 }
1423
1424 pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
1425 &self,
1426 key: &Q,
1427 hash: u64,
1428 init: F,
1429 need_key: bool,
1430 ) -> Result<Entry<K, V>, Arc<E>>
1431 where
1432 F: FnOnce() -> Result<V, E>,
1433 E: Send + Sync + 'static,
1434 K: Borrow<Q>,
1435 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1436 {
1437 if let Some(entry) = self.get_with_hash(key, hash, false) {
1438 return Ok(entry);
1439 }
1440
1441 let key = Arc::new(key.to_owned());
1442 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1443 }
1444
1445 pub(crate) fn try_insert_with_hash_and_fun<F, E>(
1446 &self,
1447 key: Arc<K>,
1448 hash: u64,
1449 init: F,
1450 need_key: bool,
1451 ) -> Result<Entry<K, V>, Arc<E>>
1452 where
1453 F: FnOnce() -> Result<V, E>,
1454 E: Send + Sync + 'static,
1455 {
1456 let get = || {
1457 let ignore_if = None as Option<&mut fn(&V) -> bool>;
1458 self.base
1459 .get_with_hash_without_recording(&key, hash, ignore_if)
1460 };
1461 let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1462
1463 let k = if need_key {
1464 Some(Arc::clone(&key))
1465 } else {
1466 None
1467 };
1468
1469 let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1470 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1471
1472 match self
1473 .value_initializer
1474 .try_init_or_read(&key, type_id, get, init, insert, post_init)
1475 {
1476 InitResult::Initialized(v) => {
1477 crossbeam_epoch::pin().flush();
1478 Ok(Entry::new(k, v, true, false))
1479 }
1480 InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1481 InitResult::InitErr(e) => {
1482 crossbeam_epoch::pin().flush();
1483 Err(e)
1484 }
1485 }
1486 }
1487
1488 /// Inserts a key-value pair into the cache.
1489 ///
1490 /// If the cache has this key present, the value is updated.
1491 pub fn insert(&self, key: K, value: V) {
1492 let hash = self.base.hash(&key);
1493 let key = Arc::new(key);
1494 self.insert_with_hash(key, hash, value);
1495 }
1496
1497 pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1498 if self.base.is_map_disabled() {
1499 return;
1500 }
1501
1502 let (op, now) = self.base.do_insert_with_hash(key, hash, value);
1503 let hk = self.base.housekeeper.as_ref();
1504 Self::schedule_write_op(
1505 self.base.inner.as_ref(),
1506 &self.base.write_op_ch,
1507 op,
1508 now,
1509 hk,
1510 )
1511 .expect("Failed to insert");
1512 }
1513
1514 pub(crate) fn compute_with_hash_and_fun<F>(
1515 &self,
1516 key: Arc<K>,
1517 hash: u64,
1518 f: F,
1519 ) -> compute::CompResult<K, V>
1520 where
1521 F: FnOnce(Option<Entry<K, V>>) -> compute::Op<V>,
1522 {
1523 let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1524 match self
1525 .value_initializer
1526 .try_compute(key, hash, self, f, post_init, true)
1527 {
1528 Ok(result) => result,
1529 Err(_) => unreachable!(),
1530 }
1531 }
1532
1533 pub(crate) fn try_compute_with_hash_and_fun<F, E>(
1534 &self,
1535 key: Arc<K>,
1536 hash: u64,
1537 f: F,
1538 ) -> Result<compute::CompResult<K, V>, E>
1539 where
1540 F: FnOnce(Option<Entry<K, V>>) -> Result<compute::Op<V>, E>,
1541 E: Send + Sync + 'static,
1542 {
1543 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1544 self.value_initializer
1545 .try_compute(key, hash, self, f, post_init, true)
1546 }
1547
1548 pub(crate) fn upsert_with_hash_and_fun<F>(&self, key: Arc<K>, hash: u64, f: F) -> Entry<K, V>
1549 where
1550 F: FnOnce(Option<Entry<K, V>>) -> V,
1551 {
1552 let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1553 match self
1554 .value_initializer
1555 .try_compute(key, hash, self, f, post_init, false)
1556 {
1557 Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1558 _ => unreachable!(),
1559 }
1560 }
1561
1562 /// Discards any cached value for the key.
1563 ///
1564 /// If you need to get a the value that has been discarded, use the
1565 /// [`remove`](#method.remove) method instead.
1566 ///
1567 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1568 /// on the borrowed form _must_ match those for the key type.
1569 pub fn invalidate<Q>(&self, key: &Q)
1570 where
1571 K: Borrow<Q>,
1572 Q: Hash + Eq + ?Sized,
1573 {
1574 let hash = self.base.hash(key);
1575 self.invalidate_with_hash(key, hash, false);
1576 }
1577
1578 /// Discards any cached value for the key and returns a _clone_ of the value.
1579 ///
1580 /// If you do not need to get the value that has been discarded, use the
1581 /// [`invalidate`](#method.invalidate) method instead.
1582 ///
1583 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1584 /// on the borrowed form _must_ match those for the key type.
1585 pub fn remove<Q>(&self, key: &Q) -> Option<V>
1586 where
1587 K: Borrow<Q>,
1588 Q: Hash + Eq + ?Sized,
1589 {
1590 let hash = self.base.hash(key);
1591 self.invalidate_with_hash(key, hash, true)
1592 }
1593
1594 pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
1595 where
1596 K: Borrow<Q>,
1597 Q: Hash + Eq + ?Sized,
1598 {
1599 // Lock the key for removal if blocking removal notification is enabled.
1600 let mut kl = None;
1601 let mut klg = None;
1602 if self.base.is_removal_notifier_enabled() {
1603 // To lock the key, we have to get Arc<K> for key (&Q).
1604 //
1605 // TODO: Enhance this if possible. This is rather hack now because
1606 // it cannot prevent race conditions like this:
1607 //
1608 // 1. We miss the key because it does not exist. So we do not lock
1609 // the key.
1610 // 2. Somebody else (other thread) inserts the key.
1611 // 3. We remove the entry for the key, but without the key lock!
1612 //
1613 if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1614 kl = self.base.maybe_key_lock(&arc_key);
1615 klg = kl.as_ref().map(|kl| kl.lock());
1616 }
1617 }
1618
1619 match self.base.remove_entry(key, hash) {
1620 None => None,
1621 Some(kv) => {
1622 let now = self.base.current_time();
1623
1624 let info = kv.entry.entry_info();
1625 let entry_gen = info.incr_entry_gen();
1626
1627 if self.base.is_removal_notifier_enabled() {
1628 self.base.notify_invalidate(&kv.key, &kv.entry);
1629 }
1630 // Drop the locks before scheduling write op to avoid a potential
1631 // dead lock. (Scheduling write can do spin lock when the queue is
1632 // full, and queue will be drained by the housekeeping thread that
1633 // can lock the same key)
1634 std::mem::drop(klg);
1635 std::mem::drop(kl);
1636
1637 let maybe_v = if need_value {
1638 Some(kv.entry.value.clone())
1639 } else {
1640 None
1641 };
1642
1643 let op = WriteOp::Remove {
1644 kv_entry: kv,
1645 entry_gen,
1646 };
1647 let hk = self.base.housekeeper.as_ref();
1648 Self::schedule_write_op(
1649 self.base.inner.as_ref(),
1650 &self.base.write_op_ch,
1651 op,
1652 now,
1653 hk,
1654 )
1655 .expect("Failed to remove");
1656 crossbeam_epoch::pin().flush();
1657 maybe_v
1658 }
1659 }
1660 }
1661
1662 /// Discards all cached values.
1663 ///
1664 /// This method returns immediately by just setting the current time as the
1665 /// invalidation time. `get` and other retrieval methods are guaranteed not to
1666 /// return the entries inserted before or at the invalidation time.
1667 ///
1668 /// The actual removal of the invalidated entries is done as a maintenance task
1669 /// driven by a user thread. For more details, see
1670 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1671 /// level documentation.
1672 ///
1673 /// Like the `invalidate` method, this method does not clear the historic
1674 /// popularity estimator of keys so that it retains the client activities of
1675 /// trying to retrieve an item.
1676 pub fn invalidate_all(&self) {
1677 self.base.invalidate_all();
1678 }
1679
1680 /// Discards cached values that satisfy a predicate.
1681 ///
1682 /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1683 /// closure is called against each cached entry inserted before or at the time
1684 /// when this method was called. If the closure returns `true` that entry will be
1685 /// evicted from the cache.
1686 ///
1687 /// This method returns immediately by not actually removing the invalidated
1688 /// entries. Instead, it just sets the predicate to the cache with the time when
1689 /// this method was called. The actual removal of the invalidated entries is done
1690 /// as a maintenance task driven by a user thread. For more details, see
1691 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1692 /// level documentation.
1693 ///
1694 /// Also the `get` and other retrieval methods will apply the closure to a cached
1695 /// entry to determine if it should have been invalidated. Therefore, it is
1696 /// guaranteed that these methods must not return invalidated values.
1697 ///
1698 /// Note that you must call
1699 /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1700 /// at the cache creation time as the cache needs to maintain additional internal
1701 /// data structures to support this method. Otherwise, calling this method will
1702 /// fail with a
1703 /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1704 ///
1705 /// Like the `invalidate` method, this method does not clear the historic
1706 /// popularity estimator of keys so that it retains the client activities of
1707 /// trying to retrieve an item.
1708 ///
1709 /// [support-invalidation-closures]:
1710 /// ./struct.CacheBuilder.html#method.support_invalidation_closures
1711 /// [invalidation-disabled-error]:
1712 /// ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1713 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1714 where
1715 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1716 {
1717 self.base.invalidate_entries_if(Arc::new(predicate))
1718 }
1719
1720 pub(crate) fn invalidate_entries_with_arc_fun<F>(
1721 &self,
1722 predicate: Arc<F>,
1723 ) -> Result<PredicateId, PredicateError>
1724 where
1725 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1726 {
1727 self.base.invalidate_entries_if(predicate)
1728 }
1729
1730 /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1731 /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1732 /// value.
1733 ///
1734 /// Iterators do not block concurrent reads and writes on the cache. An entry can
1735 /// be inserted to, invalidated or evicted from a cache while iterators are alive
1736 /// on the same cache.
1737 ///
1738 /// Unlike the `get` method, visiting entries via an iterator do not update the
1739 /// historic popularity estimator or reset idle timers for keys.
1740 ///
1741 /// # Guarantees
1742 ///
1743 /// In order to allow concurrent access to the cache, iterator's `next` method
1744 /// does _not_ guarantee the following:
1745 ///
1746 /// - It does not guarantee to return a key-value pair (an entry) if its key has
1747 /// been inserted to the cache _after_ the iterator was created.
1748 /// - Such an entry may or may not be returned depending on key's hash and
1749 /// timing.
1750 ///
1751 /// and the `next` method guarantees the followings:
1752 ///
1753 /// - It guarantees not to return the same entry more than once.
1754 /// - It guarantees not to return an entry if it has been removed from the cache
1755 /// after the iterator was created.
1756 /// - Note: An entry can be removed by following reasons:
1757 /// - Manually invalidated.
1758 /// - Expired (e.g. time-to-live).
1759 /// - Evicted as the cache capacity exceeded.
1760 ///
1761 /// # Examples
1762 ///
1763 /// ```rust
1764 /// use moka::sync::Cache;
1765 ///
1766 /// let cache = Cache::new(100);
1767 /// cache.insert("Julia", 14);
1768 ///
1769 /// let mut iter = cache.iter();
1770 /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1771 /// assert_eq!(*k, "Julia");
1772 /// assert_eq!(v, 14);
1773 ///
1774 /// assert!(iter.next().is_none());
1775 /// ```
1776 ///
1777 pub fn iter(&self) -> Iter<'_, K, V> {
1778 Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
1779 }
1780
1781 /// Performs any pending maintenance operations needed by the cache.
1782 pub fn run_pending_tasks(&self) {
1783 if let Some(hk) = &self.base.housekeeper {
1784 hk.run_pending_tasks(&*self.base.inner);
1785 }
1786 }
1787}
1788
1789impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1790where
1791 K: Hash + Eq + Send + Sync + 'static,
1792 V: Clone + Send + Sync + 'static,
1793 S: BuildHasher + Clone + Send + Sync + 'static,
1794{
1795 type Item = (Arc<K>, V);
1796
1797 type IntoIter = Iter<'a, K, V>;
1798
1799 fn into_iter(self) -> Self::IntoIter {
1800 self.iter()
1801 }
1802}
1803
1804//
1805// Iterator support
1806//
1807impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
1808where
1809 K: Hash + Eq + Send + Sync + 'static,
1810 V: Clone + Send + Sync + 'static,
1811 S: BuildHasher + Clone + Send + Sync + 'static,
1812{
1813 fn num_cht_segments(&self) -> usize {
1814 self.base.num_cht_segments()
1815 }
1816
1817 fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
1818 self.base.scanning_get(key)
1819 }
1820
1821 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1822 self.base.keys(cht_segment)
1823 }
1824}
1825
1826//
1827// private methods
1828//
1829impl<K, V, S> Cache<K, V, S>
1830where
1831 K: Hash + Eq + Send + Sync + 'static,
1832 V: Clone + Send + Sync + 'static,
1833 S: BuildHasher + Clone + Send + Sync + 'static,
1834{
1835 // TODO: Like future::Cache, move this method to BaseCache.
1836 #[inline]
1837 fn schedule_write_op(
1838 inner: &impl InnerSync,
1839 ch: &Sender<WriteOp<K, V>>,
1840 op: WriteOp<K, V>,
1841 now: Instant,
1842 housekeeper: Option<&HouseKeeperArc>,
1843 ) -> Result<(), TrySendError<WriteOp<K, V>>> {
1844 let mut op = op;
1845
1846 // NOTES:
1847 // - This will block when the channel is full.
1848 // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`,
1849 // but we got a notable performance degradation.
1850 loop {
1851 BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
1852 match ch.try_send(op) {
1853 Ok(()) => break,
1854 Err(TrySendError::Full(op1)) => {
1855 op = op1;
1856 std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
1857 }
1858 Err(e @ TrySendError::Disconnected(_)) => return Err(e),
1859 }
1860 }
1861 Ok(())
1862 }
1863}
1864
1865// For unit tests.
1866#[cfg(test)]
1867impl<K, V, S> Cache<K, V, S> {
1868 pub(crate) fn is_table_empty(&self) -> bool {
1869 self.entry_count() == 0
1870 }
1871
1872 pub(crate) fn is_waiter_map_empty(&self) -> bool {
1873 self.value_initializer.waiter_count() == 0
1874 }
1875}
1876
1877#[cfg(test)]
1878impl<K, V, S> Cache<K, V, S>
1879where
1880 K: Hash + Eq + Send + Sync + 'static,
1881 V: Clone + Send + Sync + 'static,
1882 S: BuildHasher + Clone + Send + Sync + 'static,
1883{
1884 pub(crate) fn invalidation_predicate_count(&self) -> usize {
1885 self.base.invalidation_predicate_count()
1886 }
1887
1888 pub(crate) fn reconfigure_for_testing(&mut self) {
1889 self.base.reconfigure_for_testing();
1890 }
1891
1892 pub(crate) fn key_locks_map_is_empty(&self) -> bool {
1893 self.base.key_locks_map_is_empty()
1894 }
1895}
1896
1897// To see the debug prints, run test as `cargo test -- --nocapture`
1898#[cfg(test)]
1899mod tests {
1900 use super::Cache;
1901 use crate::{
1902 common::{time::Clock, HousekeeperConfig},
1903 notification::RemovalCause,
1904 policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
1905 Expiry,
1906 };
1907
1908 use parking_lot::Mutex;
1909 use std::{
1910 convert::Infallible,
1911 sync::{
1912 atomic::{AtomicU8, Ordering},
1913 Arc,
1914 },
1915 time::{Duration, Instant as StdInstant},
1916 };
1917
1918 #[test]
1919 fn max_capacity_zero() {
1920 let mut cache = Cache::new(0);
1921 cache.reconfigure_for_testing();
1922
1923 // Make the cache exterior immutable.
1924 let cache = cache;
1925
1926 cache.insert(0, ());
1927
1928 assert!(!cache.contains_key(&0));
1929 assert!(cache.get(&0).is_none());
1930 cache.run_pending_tasks();
1931 assert!(!cache.contains_key(&0));
1932 assert!(cache.get(&0).is_none());
1933 assert_eq!(cache.entry_count(), 0)
1934 }
1935
1936 #[test]
1937 fn basic_single_thread() {
1938 // The following `Vec`s will hold actual and expected notifications.
1939 let actual = Arc::new(Mutex::new(Vec::new()));
1940 let mut expected = Vec::new();
1941
1942 // Create an eviction listener.
1943 let a1 = Arc::clone(&actual);
1944 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
1945
1946 // Create a cache with the eviction listener.
1947 let mut cache = Cache::builder()
1948 .max_capacity(3)
1949 .eviction_listener(listener)
1950 .build();
1951 cache.reconfigure_for_testing();
1952
1953 // Make the cache exterior immutable.
1954 let cache = cache;
1955
1956 cache.insert("a", "alice");
1957 cache.insert("b", "bob");
1958 assert_eq!(cache.get(&"a"), Some("alice"));
1959 assert!(cache.contains_key(&"a"));
1960 assert!(cache.contains_key(&"b"));
1961 assert_eq!(cache.get(&"b"), Some("bob"));
1962 cache.run_pending_tasks();
1963 // counts: a -> 1, b -> 1
1964
1965 cache.insert("c", "cindy");
1966 assert_eq!(cache.get(&"c"), Some("cindy"));
1967 assert!(cache.contains_key(&"c"));
1968 // counts: a -> 1, b -> 1, c -> 1
1969 cache.run_pending_tasks();
1970
1971 assert!(cache.contains_key(&"a"));
1972 assert_eq!(cache.get(&"a"), Some("alice"));
1973 assert_eq!(cache.get(&"b"), Some("bob"));
1974 assert!(cache.contains_key(&"b"));
1975 cache.run_pending_tasks();
1976 // counts: a -> 2, b -> 2, c -> 1
1977
1978 // "d" should not be admitted because its frequency is too low.
1979 cache.insert("d", "david"); // count: d -> 0
1980 expected.push((Arc::new("d"), "david", RemovalCause::Size));
1981 cache.run_pending_tasks();
1982 assert_eq!(cache.get(&"d"), None); // d -> 1
1983 assert!(!cache.contains_key(&"d"));
1984
1985 cache.insert("d", "david");
1986 expected.push((Arc::new("d"), "david", RemovalCause::Size));
1987 cache.run_pending_tasks();
1988 assert!(!cache.contains_key(&"d"));
1989 assert_eq!(cache.get(&"d"), None); // d -> 2
1990
1991 // "d" should be admitted and "c" should be evicted
1992 // because d's frequency is higher than c's.
1993 cache.insert("d", "dennis");
1994 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
1995 cache.run_pending_tasks();
1996 assert_eq!(cache.get(&"a"), Some("alice"));
1997 assert_eq!(cache.get(&"b"), Some("bob"));
1998 assert_eq!(cache.get(&"c"), None);
1999 assert_eq!(cache.get(&"d"), Some("dennis"));
2000 assert!(cache.contains_key(&"a"));
2001 assert!(cache.contains_key(&"b"));
2002 assert!(!cache.contains_key(&"c"));
2003 assert!(cache.contains_key(&"d"));
2004
2005 cache.invalidate(&"b");
2006 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2007 cache.run_pending_tasks();
2008 assert_eq!(cache.get(&"b"), None);
2009 assert!(!cache.contains_key(&"b"));
2010
2011 assert!(cache.remove(&"b").is_none());
2012 assert_eq!(cache.remove(&"d"), Some("dennis"));
2013 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2014 cache.run_pending_tasks();
2015 assert_eq!(cache.get(&"d"), None);
2016 assert!(!cache.contains_key(&"d"));
2017
2018 verify_notification_vec(&cache, actual, &expected);
2019 assert!(cache.key_locks_map_is_empty());
2020 }
2021
2022 #[test]
2023 fn basic_lru_single_thread() {
2024 // The following `Vec`s will hold actual and expected notifications.
2025 let actual = Arc::new(Mutex::new(Vec::new()));
2026 let mut expected = Vec::new();
2027
2028 // Create an eviction listener.
2029 let a1 = Arc::clone(&actual);
2030 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2031
2032 // Create a cache with the eviction listener.
2033 let mut cache = Cache::builder()
2034 .max_capacity(3)
2035 .eviction_policy(EvictionPolicy::lru())
2036 .eviction_listener(listener)
2037 .build();
2038 cache.reconfigure_for_testing();
2039
2040 // Make the cache exterior immutable.
2041 let cache = cache;
2042
2043 cache.insert("a", "alice");
2044 cache.insert("b", "bob");
2045 assert_eq!(cache.get(&"a"), Some("alice"));
2046 assert!(cache.contains_key(&"a"));
2047 assert!(cache.contains_key(&"b"));
2048 assert_eq!(cache.get(&"b"), Some("bob"));
2049 cache.run_pending_tasks();
2050 // a -> b
2051
2052 cache.insert("c", "cindy");
2053 assert_eq!(cache.get(&"c"), Some("cindy"));
2054 assert!(cache.contains_key(&"c"));
2055 cache.run_pending_tasks();
2056 // a -> b -> c
2057
2058 assert!(cache.contains_key(&"a"));
2059 assert_eq!(cache.get(&"a"), Some("alice"));
2060 assert_eq!(cache.get(&"b"), Some("bob"));
2061 assert!(cache.contains_key(&"b"));
2062 cache.run_pending_tasks();
2063 // c -> a -> b
2064
2065 // "d" should be admitted because the cache uses the LRU strategy.
2066 cache.insert("d", "david");
2067 // "c" is the LRU and should have be evicted.
2068 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2069 cache.run_pending_tasks();
2070
2071 assert_eq!(cache.get(&"a"), Some("alice"));
2072 assert_eq!(cache.get(&"b"), Some("bob"));
2073 assert_eq!(cache.get(&"c"), None);
2074 assert_eq!(cache.get(&"d"), Some("david"));
2075 assert!(cache.contains_key(&"a"));
2076 assert!(cache.contains_key(&"b"));
2077 assert!(!cache.contains_key(&"c"));
2078 assert!(cache.contains_key(&"d"));
2079 cache.run_pending_tasks();
2080 // a -> b -> d
2081
2082 cache.invalidate(&"b");
2083 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2084 cache.run_pending_tasks();
2085 // a -> d
2086 assert_eq!(cache.get(&"b"), None);
2087 assert!(!cache.contains_key(&"b"));
2088
2089 assert!(cache.remove(&"b").is_none());
2090 assert_eq!(cache.remove(&"d"), Some("david"));
2091 expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2092 cache.run_pending_tasks();
2093 // a
2094 assert_eq!(cache.get(&"d"), None);
2095 assert!(!cache.contains_key(&"d"));
2096
2097 cache.insert("e", "emily");
2098 cache.insert("f", "frank");
2099 // "a" should be evicted because it is the LRU.
2100 cache.insert("g", "gina");
2101 expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2102 cache.run_pending_tasks();
2103 // e -> f -> g
2104 assert_eq!(cache.get(&"a"), None);
2105 assert_eq!(cache.get(&"e"), Some("emily"));
2106 assert_eq!(cache.get(&"f"), Some("frank"));
2107 assert_eq!(cache.get(&"g"), Some("gina"));
2108 assert!(!cache.contains_key(&"a"));
2109 assert!(cache.contains_key(&"e"));
2110 assert!(cache.contains_key(&"f"));
2111 assert!(cache.contains_key(&"g"));
2112
2113 verify_notification_vec(&cache, actual, &expected);
2114 assert!(cache.key_locks_map_is_empty());
2115 }
2116
2117 #[test]
2118 fn size_aware_eviction() {
2119 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2120
2121 let alice = ("alice", 10);
2122 let bob = ("bob", 15);
2123 let bill = ("bill", 20);
2124 let cindy = ("cindy", 5);
2125 let david = ("david", 15);
2126 let dennis = ("dennis", 15);
2127
2128 // The following `Vec`s will hold actual and expected notifications.
2129 let actual = Arc::new(Mutex::new(Vec::new()));
2130 let mut expected = Vec::new();
2131
2132 // Create an eviction listener.
2133 let a1 = Arc::clone(&actual);
2134 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2135
2136 // Create a cache with the eviction listener.
2137 let mut cache = Cache::builder()
2138 .max_capacity(31)
2139 .weigher(weigher)
2140 .eviction_listener(listener)
2141 .build();
2142 cache.reconfigure_for_testing();
2143
2144 // Make the cache exterior immutable.
2145 let cache = cache;
2146
2147 cache.insert("a", alice);
2148 cache.insert("b", bob);
2149 assert_eq!(cache.get(&"a"), Some(alice));
2150 assert!(cache.contains_key(&"a"));
2151 assert!(cache.contains_key(&"b"));
2152 assert_eq!(cache.get(&"b"), Some(bob));
2153 cache.run_pending_tasks();
2154 // order (LRU -> MRU) and counts: a -> 1, b -> 1
2155
2156 cache.insert("c", cindy);
2157 assert_eq!(cache.get(&"c"), Some(cindy));
2158 assert!(cache.contains_key(&"c"));
2159 // order and counts: a -> 1, b -> 1, c -> 1
2160 cache.run_pending_tasks();
2161
2162 assert!(cache.contains_key(&"a"));
2163 assert_eq!(cache.get(&"a"), Some(alice));
2164 assert_eq!(cache.get(&"b"), Some(bob));
2165 assert!(cache.contains_key(&"b"));
2166 cache.run_pending_tasks();
2167 // order and counts: c -> 1, a -> 2, b -> 2
2168
2169 // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2170 // "d" must have higher count than 3, which is the aggregated count
2171 // of "a" and "c".
2172 cache.insert("d", david); // count: d -> 0
2173 expected.push((Arc::new("d"), david, RemovalCause::Size));
2174 cache.run_pending_tasks();
2175 assert_eq!(cache.get(&"d"), None); // d -> 1
2176 assert!(!cache.contains_key(&"d"));
2177
2178 cache.insert("d", david);
2179 expected.push((Arc::new("d"), david, RemovalCause::Size));
2180 cache.run_pending_tasks();
2181 assert!(!cache.contains_key(&"d"));
2182 assert_eq!(cache.get(&"d"), None); // d -> 2
2183
2184 cache.insert("d", david);
2185 expected.push((Arc::new("d"), david, RemovalCause::Size));
2186 cache.run_pending_tasks();
2187 assert_eq!(cache.get(&"d"), None); // d -> 3
2188 assert!(!cache.contains_key(&"d"));
2189
2190 cache.insert("d", david);
2191 expected.push((Arc::new("d"), david, RemovalCause::Size));
2192 cache.run_pending_tasks();
2193 assert!(!cache.contains_key(&"d"));
2194 assert_eq!(cache.get(&"d"), None); // d -> 4
2195
2196 // Finally "d" should be admitted by evicting "c" and "a".
2197 cache.insert("d", dennis);
2198 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2199 expected.push((Arc::new("a"), alice, RemovalCause::Size));
2200 cache.run_pending_tasks();
2201 assert_eq!(cache.get(&"a"), None);
2202 assert_eq!(cache.get(&"b"), Some(bob));
2203 assert_eq!(cache.get(&"c"), None);
2204 assert_eq!(cache.get(&"d"), Some(dennis));
2205 assert!(!cache.contains_key(&"a"));
2206 assert!(cache.contains_key(&"b"));
2207 assert!(!cache.contains_key(&"c"));
2208 assert!(cache.contains_key(&"d"));
2209
2210 // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2211 cache.insert("b", bill);
2212 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2213 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2214 cache.run_pending_tasks();
2215 assert_eq!(cache.get(&"b"), Some(bill));
2216 assert_eq!(cache.get(&"d"), None);
2217 assert!(cache.contains_key(&"b"));
2218 assert!(!cache.contains_key(&"d"));
2219
2220 // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2221 cache.insert("a", alice);
2222 cache.insert("b", bob);
2223 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2224 cache.run_pending_tasks();
2225 assert_eq!(cache.get(&"a"), Some(alice));
2226 assert_eq!(cache.get(&"b"), Some(bob));
2227 assert_eq!(cache.get(&"d"), None);
2228 assert!(cache.contains_key(&"a"));
2229 assert!(cache.contains_key(&"b"));
2230 assert!(!cache.contains_key(&"d"));
2231
2232 // Verify the sizes.
2233 assert_eq!(cache.entry_count(), 2);
2234 assert_eq!(cache.weighted_size(), 25);
2235
2236 verify_notification_vec(&cache, actual, &expected);
2237 assert!(cache.key_locks_map_is_empty());
2238 }
2239
2240 #[test]
2241 fn basic_multi_threads() {
2242 let num_threads = 4;
2243 let cache = Cache::new(100);
2244
2245 // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
2246 #[allow(clippy::needless_collect)]
2247 let handles = (0..num_threads)
2248 .map(|id| {
2249 let cache = cache.clone();
2250 std::thread::spawn(move || {
2251 cache.insert(10, format!("{id}-100"));
2252 cache.get(&10);
2253 cache.insert(20, format!("{id}-200"));
2254 cache.invalidate(&10);
2255 })
2256 })
2257 .collect::<Vec<_>>();
2258
2259 handles.into_iter().for_each(|h| h.join().expect("Failed"));
2260
2261 assert!(cache.get(&10).is_none());
2262 assert!(cache.get(&20).is_some());
2263 assert!(!cache.contains_key(&10));
2264 assert!(cache.contains_key(&20));
2265 }
2266
2267 #[test]
2268 fn invalidate_all() {
2269 // The following `Vec`s will hold actual and expected notifications.
2270 let actual = Arc::new(Mutex::new(Vec::new()));
2271 let mut expected = Vec::new();
2272
2273 // Create an eviction listener.
2274 let a1 = Arc::clone(&actual);
2275 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2276
2277 // Create a cache with the eviction listener.
2278 let mut cache = Cache::builder()
2279 .max_capacity(100)
2280 .eviction_listener(listener)
2281 .build();
2282 cache.reconfigure_for_testing();
2283
2284 // Make the cache exterior immutable.
2285 let cache = cache;
2286
2287 cache.insert("a", "alice");
2288 cache.insert("b", "bob");
2289 cache.insert("c", "cindy");
2290 assert_eq!(cache.get(&"a"), Some("alice"));
2291 assert_eq!(cache.get(&"b"), Some("bob"));
2292 assert_eq!(cache.get(&"c"), Some("cindy"));
2293 assert!(cache.contains_key(&"a"));
2294 assert!(cache.contains_key(&"b"));
2295 assert!(cache.contains_key(&"c"));
2296
2297 // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
2298 // modified timestamp of the entries were updated when they were inserted.
2299 // https://github.com/moka-rs/moka/issues/155
2300
2301 cache.invalidate_all();
2302 expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2303 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2304 expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2305 cache.run_pending_tasks();
2306
2307 cache.insert("d", "david");
2308 cache.run_pending_tasks();
2309
2310 assert!(cache.get(&"a").is_none());
2311 assert!(cache.get(&"b").is_none());
2312 assert!(cache.get(&"c").is_none());
2313 assert_eq!(cache.get(&"d"), Some("david"));
2314 assert!(!cache.contains_key(&"a"));
2315 assert!(!cache.contains_key(&"b"));
2316 assert!(!cache.contains_key(&"c"));
2317 assert!(cache.contains_key(&"d"));
2318
2319 verify_notification_vec(&cache, actual, &expected);
2320 }
2321
2322 #[test]
2323 fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2324 use std::collections::HashSet;
2325
2326 // The following `Vec`s will hold actual and expected notifications.
2327 let actual = Arc::new(Mutex::new(Vec::new()));
2328 let mut expected = Vec::new();
2329
2330 // Create an eviction listener.
2331 let a1 = Arc::clone(&actual);
2332 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2333
2334 let (clock, mock) = Clock::mock();
2335
2336 // Create a cache with the eviction listener.
2337 let mut cache = Cache::builder()
2338 .max_capacity(100)
2339 .support_invalidation_closures()
2340 .eviction_listener(listener)
2341 .clock(clock)
2342 .build();
2343 cache.reconfigure_for_testing();
2344
2345 // Make the cache exterior immutable.
2346 let cache = cache;
2347
2348 cache.insert(0, "alice");
2349 cache.insert(1, "bob");
2350 cache.insert(2, "alex");
2351 cache.run_pending_tasks();
2352
2353 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2354 cache.run_pending_tasks();
2355
2356 assert_eq!(cache.get(&0), Some("alice"));
2357 assert_eq!(cache.get(&1), Some("bob"));
2358 assert_eq!(cache.get(&2), Some("alex"));
2359 assert!(cache.contains_key(&0));
2360 assert!(cache.contains_key(&1));
2361 assert!(cache.contains_key(&2));
2362
2363 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2364 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2365 assert_eq!(cache.base.invalidation_predicate_count(), 1);
2366 expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2367 expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2368
2369 mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2370
2371 cache.insert(3, "alice");
2372
2373 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2374 cache.run_pending_tasks(); // To submit the invalidation task.
2375 std::thread::sleep(Duration::from_millis(200));
2376 cache.run_pending_tasks(); // To process the task result.
2377 std::thread::sleep(Duration::from_millis(200));
2378
2379 assert!(cache.get(&0).is_none());
2380 assert!(cache.get(&2).is_none());
2381 assert_eq!(cache.get(&1), Some("bob"));
2382 // This should survive as it was inserted after calling invalidate_entries_if.
2383 assert_eq!(cache.get(&3), Some("alice"));
2384
2385 assert!(!cache.contains_key(&0));
2386 assert!(cache.contains_key(&1));
2387 assert!(!cache.contains_key(&2));
2388 assert!(cache.contains_key(&3));
2389
2390 assert_eq!(cache.entry_count(), 2);
2391 assert_eq!(cache.invalidation_predicate_count(), 0);
2392
2393 mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2394
2395 cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2396 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2397 assert_eq!(cache.invalidation_predicate_count(), 2);
2398 // key 1 was inserted before key 3.
2399 expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2400 expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2401
2402 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2403 cache.run_pending_tasks(); // To submit the invalidation task.
2404 std::thread::sleep(Duration::from_millis(200));
2405 cache.run_pending_tasks(); // To process the task result.
2406 std::thread::sleep(Duration::from_millis(200));
2407
2408 assert!(cache.get(&1).is_none());
2409 assert!(cache.get(&3).is_none());
2410
2411 assert!(!cache.contains_key(&1));
2412 assert!(!cache.contains_key(&3));
2413
2414 assert_eq!(cache.entry_count(), 0);
2415 assert_eq!(cache.invalidation_predicate_count(), 0);
2416
2417 verify_notification_vec(&cache, actual, &expected);
2418
2419 Ok(())
2420 }
2421
2422 #[test]
2423 fn time_to_live() {
2424 // The following `Vec`s will hold actual and expected notifications.
2425 let actual = Arc::new(Mutex::new(Vec::new()));
2426 let mut expected = Vec::new();
2427
2428 // Create an eviction listener.
2429 let a1 = Arc::clone(&actual);
2430 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2431
2432 let (clock, mock) = Clock::mock();
2433
2434 // Create a cache with the eviction listener.
2435 let mut cache = Cache::builder()
2436 .max_capacity(100)
2437 .time_to_live(Duration::from_secs(10))
2438 .eviction_listener(listener)
2439 .clock(clock)
2440 .build();
2441 cache.reconfigure_for_testing();
2442
2443 // Make the cache exterior immutable.
2444 let cache = cache;
2445
2446 cache.insert("a", "alice");
2447 cache.run_pending_tasks();
2448
2449 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2450 cache.run_pending_tasks();
2451
2452 assert_eq!(cache.get(&"a"), Some("alice"));
2453 assert!(cache.contains_key(&"a"));
2454
2455 mock.increment(Duration::from_secs(5)); // 10 secs.
2456 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2457 assert_eq!(cache.get(&"a"), None);
2458 assert!(!cache.contains_key(&"a"));
2459
2460 assert_eq!(cache.iter().count(), 0);
2461
2462 cache.run_pending_tasks();
2463 assert!(cache.is_table_empty());
2464
2465 cache.insert("b", "bob");
2466 cache.run_pending_tasks();
2467
2468 assert_eq!(cache.entry_count(), 1);
2469
2470 mock.increment(Duration::from_secs(5)); // 15 secs.
2471 cache.run_pending_tasks();
2472
2473 assert_eq!(cache.get(&"b"), Some("bob"));
2474 assert!(cache.contains_key(&"b"));
2475 assert_eq!(cache.entry_count(), 1);
2476
2477 cache.insert("b", "bill");
2478 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2479 cache.run_pending_tasks();
2480
2481 mock.increment(Duration::from_secs(5)); // 20 secs
2482 cache.run_pending_tasks();
2483
2484 assert_eq!(cache.get(&"b"), Some("bill"));
2485 assert!(cache.contains_key(&"b"));
2486 assert_eq!(cache.entry_count(), 1);
2487
2488 mock.increment(Duration::from_secs(5)); // 25 secs
2489 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2490
2491 assert_eq!(cache.get(&"a"), None);
2492 assert_eq!(cache.get(&"b"), None);
2493 assert!(!cache.contains_key(&"a"));
2494 assert!(!cache.contains_key(&"b"));
2495
2496 assert_eq!(cache.iter().count(), 0);
2497
2498 cache.run_pending_tasks();
2499 assert!(cache.is_table_empty());
2500
2501 verify_notification_vec(&cache, actual, &expected);
2502 }
2503
2504 #[test]
2505 fn time_to_idle() {
2506 // The following `Vec`s will hold actual and expected notifications.
2507 let actual = Arc::new(Mutex::new(Vec::new()));
2508 let mut expected = Vec::new();
2509
2510 // Create an eviction listener.
2511 let a1 = Arc::clone(&actual);
2512 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2513
2514 let (clock, mock) = Clock::mock();
2515
2516 // Create a cache with the eviction listener.
2517 let mut cache = Cache::builder()
2518 .max_capacity(100)
2519 .time_to_idle(Duration::from_secs(10))
2520 .eviction_listener(listener)
2521 .clock(clock)
2522 .build();
2523 cache.reconfigure_for_testing();
2524
2525 // Make the cache exterior immutable.
2526 let cache = cache;
2527
2528 cache.insert("a", "alice");
2529 cache.run_pending_tasks();
2530
2531 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2532 cache.run_pending_tasks();
2533
2534 assert_eq!(cache.get(&"a"), Some("alice"));
2535
2536 mock.increment(Duration::from_secs(5)); // 10 secs.
2537 cache.run_pending_tasks();
2538
2539 cache.insert("b", "bob");
2540 cache.run_pending_tasks();
2541
2542 assert_eq!(cache.entry_count(), 2);
2543
2544 mock.increment(Duration::from_secs(2)); // 12 secs.
2545 cache.run_pending_tasks();
2546
2547 // contains_key does not reset the idle timer for the key.
2548 assert!(cache.contains_key(&"a"));
2549 assert!(cache.contains_key(&"b"));
2550 cache.run_pending_tasks();
2551
2552 assert_eq!(cache.entry_count(), 2);
2553
2554 mock.increment(Duration::from_secs(3)); // 15 secs.
2555 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2556
2557 assert_eq!(cache.get(&"a"), None);
2558 assert_eq!(cache.get(&"b"), Some("bob"));
2559 assert!(!cache.contains_key(&"a"));
2560 assert!(cache.contains_key(&"b"));
2561
2562 assert_eq!(cache.iter().count(), 1);
2563
2564 cache.run_pending_tasks();
2565 assert_eq!(cache.entry_count(), 1);
2566
2567 mock.increment(Duration::from_secs(10)); // 25 secs
2568 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2569
2570 assert_eq!(cache.get(&"a"), None);
2571 assert_eq!(cache.get(&"b"), None);
2572 assert!(!cache.contains_key(&"a"));
2573 assert!(!cache.contains_key(&"b"));
2574
2575 assert_eq!(cache.iter().count(), 0);
2576
2577 cache.run_pending_tasks();
2578 assert!(cache.is_table_empty());
2579
2580 verify_notification_vec(&cache, actual, &expected);
2581 }
2582
2583 // https://github.com/moka-rs/moka/issues/359
2584 #[test]
2585 fn ensure_access_time_is_updated_immediately_after_read() {
2586 let (clock, mock) = Clock::mock();
2587 let mut cache = Cache::builder()
2588 .max_capacity(10)
2589 .time_to_idle(Duration::from_secs(5))
2590 .clock(clock)
2591 .build();
2592 cache.reconfigure_for_testing();
2593
2594 // Make the cache exterior immutable.
2595 let cache = cache;
2596
2597 cache.insert(1, 1);
2598
2599 mock.increment(Duration::from_secs(4));
2600 assert_eq!(cache.get(&1), Some(1));
2601
2602 mock.increment(Duration::from_secs(2));
2603 assert_eq!(cache.get(&1), Some(1));
2604 cache.run_pending_tasks();
2605 assert_eq!(cache.get(&1), Some(1));
2606 }
2607
2608 #[test]
2609 fn time_to_live_by_expiry_type() {
2610 // Define an expiry type.
2611 struct MyExpiry {
2612 counters: Arc<ExpiryCallCounters>,
2613 }
2614
2615 impl MyExpiry {
2616 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2617 Self { counters }
2618 }
2619 }
2620
2621 impl Expiry<&str, &str> for MyExpiry {
2622 fn expire_after_create(
2623 &self,
2624 _key: &&str,
2625 _value: &&str,
2626 _current_time: StdInstant,
2627 ) -> Option<Duration> {
2628 self.counters.incl_actual_creations();
2629 Some(Duration::from_secs(10))
2630 }
2631
2632 fn expire_after_update(
2633 &self,
2634 _key: &&str,
2635 _value: &&str,
2636 _current_time: StdInstant,
2637 _current_duration: Option<Duration>,
2638 ) -> Option<Duration> {
2639 self.counters.incl_actual_updates();
2640 Some(Duration::from_secs(10))
2641 }
2642 }
2643
2644 // The following `Vec`s will hold actual and expected notifications.
2645 let actual = Arc::new(Mutex::new(Vec::new()));
2646 let mut expected = Vec::new();
2647
2648 // Create expiry counters and the expiry.
2649 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2650 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2651
2652 // Create an eviction listener.
2653 let a1 = Arc::clone(&actual);
2654 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2655
2656 let (clock, mock) = Clock::mock();
2657
2658 // Create a cache with the eviction listener.
2659 let mut cache = Cache::builder()
2660 .max_capacity(100)
2661 .expire_after(expiry)
2662 .eviction_listener(listener)
2663 .clock(clock)
2664 .build();
2665 cache.reconfigure_for_testing();
2666
2667 // Make the cache exterior immutable.
2668 let cache = cache;
2669
2670 cache.insert("a", "alice");
2671 expiry_counters.incl_expected_creations();
2672 cache.run_pending_tasks();
2673
2674 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2675 cache.run_pending_tasks();
2676
2677 assert_eq!(cache.get(&"a"), Some("alice"));
2678 assert!(cache.contains_key(&"a"));
2679
2680 mock.increment(Duration::from_secs(5)); // 10 secs.
2681 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2682 assert_eq!(cache.get(&"a"), None);
2683 assert!(!cache.contains_key(&"a"));
2684
2685 assert_eq!(cache.iter().count(), 0);
2686
2687 cache.run_pending_tasks();
2688 assert!(cache.is_table_empty());
2689
2690 cache.insert("b", "bob");
2691 expiry_counters.incl_expected_creations();
2692 cache.run_pending_tasks();
2693
2694 assert_eq!(cache.entry_count(), 1);
2695
2696 mock.increment(Duration::from_secs(5)); // 15 secs.
2697 cache.run_pending_tasks();
2698
2699 assert_eq!(cache.get(&"b"), Some("bob"));
2700 assert!(cache.contains_key(&"b"));
2701 assert_eq!(cache.entry_count(), 1);
2702
2703 cache.insert("b", "bill");
2704 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2705 expiry_counters.incl_expected_updates();
2706 cache.run_pending_tasks();
2707
2708 mock.increment(Duration::from_secs(5)); // 20 secs
2709 cache.run_pending_tasks();
2710
2711 assert_eq!(cache.get(&"b"), Some("bill"));
2712 assert!(cache.contains_key(&"b"));
2713 assert_eq!(cache.entry_count(), 1);
2714
2715 mock.increment(Duration::from_secs(5)); // 25 secs
2716 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2717
2718 assert_eq!(cache.get(&"a"), None);
2719 assert_eq!(cache.get(&"b"), None);
2720 assert!(!cache.contains_key(&"a"));
2721 assert!(!cache.contains_key(&"b"));
2722
2723 assert_eq!(cache.iter().count(), 0);
2724
2725 cache.run_pending_tasks();
2726 assert!(cache.is_table_empty());
2727
2728 expiry_counters.verify();
2729 verify_notification_vec(&cache, actual, &expected);
2730 }
2731
2732 #[test]
2733 fn time_to_idle_by_expiry_type() {
2734 // Define an expiry type.
2735 struct MyExpiry {
2736 counters: Arc<ExpiryCallCounters>,
2737 }
2738
2739 impl MyExpiry {
2740 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2741 Self { counters }
2742 }
2743 }
2744
2745 impl Expiry<&str, &str> for MyExpiry {
2746 fn expire_after_read(
2747 &self,
2748 _key: &&str,
2749 _value: &&str,
2750 _current_time: StdInstant,
2751 _current_duration: Option<Duration>,
2752 _last_modified_at: StdInstant,
2753 ) -> Option<Duration> {
2754 self.counters.incl_actual_reads();
2755 Some(Duration::from_secs(10))
2756 }
2757 }
2758
2759 // The following `Vec`s will hold actual and expected notifications.
2760 let actual = Arc::new(Mutex::new(Vec::new()));
2761 let mut expected = Vec::new();
2762
2763 // Create expiry counters and the expiry.
2764 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2765 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2766
2767 // Create an eviction listener.
2768 let a1 = Arc::clone(&actual);
2769 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2770
2771 let (clock, mock) = Clock::mock();
2772
2773 // Create a cache with the eviction listener.
2774 let mut cache = Cache::builder()
2775 .max_capacity(100)
2776 .expire_after(expiry)
2777 .eviction_listener(listener)
2778 .clock(clock)
2779 .build();
2780 cache.reconfigure_for_testing();
2781
2782 // Make the cache exterior immutable.
2783 let cache = cache;
2784
2785 cache.insert("a", "alice");
2786 cache.run_pending_tasks();
2787
2788 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2789 cache.run_pending_tasks();
2790
2791 assert_eq!(cache.get(&"a"), Some("alice"));
2792 expiry_counters.incl_expected_reads();
2793
2794 mock.increment(Duration::from_secs(5)); // 10 secs.
2795 cache.run_pending_tasks();
2796
2797 cache.insert("b", "bob");
2798 cache.run_pending_tasks();
2799
2800 assert_eq!(cache.entry_count(), 2);
2801
2802 mock.increment(Duration::from_secs(2)); // 12 secs.
2803 cache.run_pending_tasks();
2804
2805 // contains_key does not reset the idle timer for the key.
2806 assert!(cache.contains_key(&"a"));
2807 assert!(cache.contains_key(&"b"));
2808 cache.run_pending_tasks();
2809
2810 assert_eq!(cache.entry_count(), 2);
2811
2812 mock.increment(Duration::from_secs(3)); // 15 secs.
2813 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2814
2815 assert_eq!(cache.get(&"a"), None);
2816 assert_eq!(cache.get(&"b"), Some("bob"));
2817 expiry_counters.incl_expected_reads();
2818 assert!(!cache.contains_key(&"a"));
2819 assert!(cache.contains_key(&"b"));
2820
2821 assert_eq!(cache.iter().count(), 1);
2822
2823 cache.run_pending_tasks();
2824 assert_eq!(cache.entry_count(), 1);
2825
2826 mock.increment(Duration::from_secs(10)); // 25 secs
2827 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2828
2829 assert_eq!(cache.get(&"a"), None);
2830 assert_eq!(cache.get(&"b"), None);
2831 assert!(!cache.contains_key(&"a"));
2832 assert!(!cache.contains_key(&"b"));
2833
2834 assert_eq!(cache.iter().count(), 0);
2835
2836 cache.run_pending_tasks();
2837 assert!(cache.is_table_empty());
2838
2839 expiry_counters.verify();
2840 verify_notification_vec(&cache, actual, &expected);
2841 }
2842
2843 /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
2844 /// only when the key was already present in the cache.
2845 #[test]
2846 fn test_expiry_using_get_with() {
2847 // Define an expiry type, which always return `None`.
2848 struct NoExpiry {
2849 counters: Arc<ExpiryCallCounters>,
2850 }
2851
2852 impl NoExpiry {
2853 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2854 Self { counters }
2855 }
2856 }
2857
2858 impl Expiry<&str, &str> for NoExpiry {
2859 fn expire_after_create(
2860 &self,
2861 _key: &&str,
2862 _value: &&str,
2863 _current_time: StdInstant,
2864 ) -> Option<Duration> {
2865 self.counters.incl_actual_creations();
2866 None
2867 }
2868
2869 fn expire_after_read(
2870 &self,
2871 _key: &&str,
2872 _value: &&str,
2873 _current_time: StdInstant,
2874 _current_duration: Option<Duration>,
2875 _last_modified_at: StdInstant,
2876 ) -> Option<Duration> {
2877 self.counters.incl_actual_reads();
2878 None
2879 }
2880
2881 fn expire_after_update(
2882 &self,
2883 _key: &&str,
2884 _value: &&str,
2885 _current_time: StdInstant,
2886 _current_duration: Option<Duration>,
2887 ) -> Option<Duration> {
2888 unreachable!("The `expire_after_update()` method should not be called.");
2889 }
2890 }
2891
2892 // Create expiry counters and the expiry.
2893 let expiry_counters = Arc::new(ExpiryCallCounters::default());
2894 let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
2895
2896 // Create a cache with the expiry and eviction listener.
2897 let mut cache = Cache::builder()
2898 .max_capacity(100)
2899 .expire_after(expiry)
2900 .build();
2901 cache.reconfigure_for_testing();
2902
2903 // Make the cache exterior immutable.
2904 let cache = cache;
2905
2906 // The key is not present.
2907 cache.get_with("a", || "alice");
2908 expiry_counters.incl_expected_creations();
2909 cache.run_pending_tasks();
2910
2911 // The key is present.
2912 cache.get_with("a", || "alex");
2913 expiry_counters.incl_expected_reads();
2914 cache.run_pending_tasks();
2915
2916 // The key is not present.
2917 cache.invalidate("a");
2918 cache.get_with("a", || "amanda");
2919 expiry_counters.incl_expected_creations();
2920 cache.run_pending_tasks();
2921
2922 expiry_counters.verify();
2923 }
2924
2925 // https://github.com/moka-rs/moka/issues/345
2926 #[test]
2927 fn test_race_between_updating_entry_and_processing_its_write_ops() {
2928 let (clock, mock) = Clock::mock();
2929 let cache = Cache::builder()
2930 .max_capacity(2)
2931 .time_to_idle(Duration::from_secs(1))
2932 .clock(clock)
2933 .build();
2934
2935 cache.insert("a", "alice");
2936 cache.insert("b", "bob");
2937 cache.insert("c", "cathy"); // c1
2938 mock.increment(Duration::from_secs(2));
2939
2940 // The following `insert` will do the followings:
2941 // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
2942 // 2. Runs the pending tasks implicitly.
2943 // (1) "a" will be admitted.
2944 // (2) "b" will be admitted.
2945 // (3) c1 will be evicted by size constraint.
2946 // (4) "a" will be evicted due to expiration.
2947 // (5) "b" will be evicted due to expiration.
2948 // 3. Send its `WriteOp` log to the channel.
2949 cache.insert("c", "cindy"); // c2
2950
2951 // Remove "c" (c2) from the cht.
2952 assert_eq!(cache.remove(&"c"), Some("cindy")); // c-remove
2953
2954 mock.increment(Duration::from_secs(2));
2955
2956 // The following `run_pending_tasks` will do the followings:
2957 // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
2958 // 2. Because of c-remove, removes c2's node from the LRU deque.
2959 cache.run_pending_tasks();
2960 assert_eq!(cache.entry_count(), 0);
2961 }
2962
2963 #[test]
2964 fn test_race_between_recreating_entry_and_processing_its_write_ops() {
2965 let cache = Cache::builder().max_capacity(2).build();
2966
2967 cache.insert('a', "a");
2968 cache.insert('b', "b");
2969 cache.run_pending_tasks();
2970
2971 cache.insert('c', "c1"); // (a) `EntryInfo` 1, gen: 1
2972 assert!(cache.remove(&'a').is_some()); // (b)
2973 assert!(cache.remove(&'b').is_some()); // (c)
2974 assert!(cache.remove(&'c').is_some()); // (d) `EntryInfo` 1, gen: 2
2975 cache.insert('c', "c2"); // (e) `EntryInfo` 2, gen: 1
2976
2977 // Now the `write_op_ch` channel contains the following `WriteOp`s:
2978 //
2979 // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
2980 // - 1: (b) remove "a"
2981 // - 2: (c) remove "b"
2982 // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
2983 // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
2984 //
2985 // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
2986 // processing 0 must not remove "c2" from the concurrent hash table. (Their
2987 // gen are the same, but `EntryInfo`s are different)
2988 cache.run_pending_tasks();
2989 assert_eq!(cache.get(&'c'), Some("c2"));
2990 }
2991
2992 #[test]
2993 fn test_iter() {
2994 const NUM_KEYS: usize = 50;
2995
2996 fn make_value(key: usize) -> String {
2997 format!("val: {key}")
2998 }
2999
3000 let cache = Cache::builder()
3001 .max_capacity(100)
3002 .time_to_idle(Duration::from_secs(10))
3003 .build();
3004
3005 for key in 0..NUM_KEYS {
3006 cache.insert(key, make_value(key));
3007 }
3008
3009 let mut key_set = std::collections::HashSet::new();
3010
3011 for (key, value) in &cache {
3012 assert_eq!(value, make_value(*key));
3013
3014 key_set.insert(*key);
3015 }
3016
3017 // Ensure there are no missing or duplicate keys in the iteration.
3018 assert_eq!(key_set.len(), NUM_KEYS);
3019 }
3020
3021 /// Runs 16 threads at the same time and ensures no deadlock occurs.
3022 ///
3023 /// - Eight of the threads will update key-values in the cache.
3024 /// - Eight others will iterate the cache.
3025 ///
3026 #[test]
3027 fn test_iter_multi_threads() {
3028 use std::collections::HashSet;
3029
3030 const NUM_KEYS: usize = 1024;
3031 const NUM_THREADS: usize = 16;
3032
3033 fn make_value(key: usize) -> String {
3034 format!("val: {key}")
3035 }
3036
3037 let cache = Cache::builder()
3038 .max_capacity(2048)
3039 .time_to_idle(Duration::from_secs(10))
3040 .build();
3041
3042 // Initialize the cache.
3043 for key in 0..NUM_KEYS {
3044 cache.insert(key, make_value(key));
3045 }
3046
3047 let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
3048 let write_lock = rw_lock.write().unwrap();
3049
3050 // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
3051 #[allow(clippy::needless_collect)]
3052 let handles = (0..NUM_THREADS)
3053 .map(|n| {
3054 let cache = cache.clone();
3055 let rw_lock = Arc::clone(&rw_lock);
3056
3057 if n % 2 == 0 {
3058 // This thread will update the cache.
3059 std::thread::spawn(move || {
3060 let read_lock = rw_lock.read().unwrap();
3061 for key in 0..NUM_KEYS {
3062 // TODO: Update keys in a random order?
3063 cache.insert(key, make_value(key));
3064 }
3065 std::mem::drop(read_lock);
3066 })
3067 } else {
3068 // This thread will iterate the cache.
3069 std::thread::spawn(move || {
3070 let read_lock = rw_lock.read().unwrap();
3071 let mut key_set = HashSet::new();
3072 for (key, value) in &cache {
3073 assert_eq!(value, make_value(*key));
3074 key_set.insert(*key);
3075 }
3076 // Ensure there are no missing or duplicate keys in the iteration.
3077 assert_eq!(key_set.len(), NUM_KEYS);
3078 std::mem::drop(read_lock);
3079 })
3080 }
3081 })
3082 .collect::<Vec<_>>();
3083
3084 // Let these threads to run by releasing the write lock.
3085 std::mem::drop(write_lock);
3086
3087 handles.into_iter().for_each(|h| h.join().expect("Failed"));
3088
3089 // Ensure there are no missing or duplicate keys in the iteration.
3090 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3091 assert_eq!(key_set.len(), NUM_KEYS);
3092 }
3093
3094 #[test]
3095 fn get_with() {
3096 use std::thread::{sleep, spawn};
3097
3098 let cache = Cache::new(100);
3099 const KEY: u32 = 0;
3100
3101 // This test will run five threads:
3102 //
3103 // Thread1 will be the first thread to call `get_with` for a key, so its init
3104 // closure will be evaluated and then a &str value "thread1" will be inserted
3105 // to the cache.
3106 let thread1 = {
3107 let cache1 = cache.clone();
3108 spawn(move || {
3109 // Call `get_with` immediately.
3110 let v = cache1.get_with(KEY, || {
3111 // Wait for 300 ms and return a &str value.
3112 sleep(Duration::from_millis(300));
3113 "thread1"
3114 });
3115 assert_eq!(v, "thread1");
3116 })
3117 };
3118
3119 // Thread2 will be the second thread to call `get_with` for the same key, so
3120 // its init closure will not be evaluated. Once thread1's init closure
3121 // finishes, it will get the value inserted by thread1's init closure.
3122 let thread2 = {
3123 let cache2 = cache.clone();
3124 spawn(move || {
3125 // Wait for 100 ms before calling `get_with`.
3126 sleep(Duration::from_millis(100));
3127 let v = cache2.get_with(KEY, || unreachable!());
3128 assert_eq!(v, "thread1");
3129 })
3130 };
3131
3132 // Thread3 will be the third thread to call `get_with` for the same key. By
3133 // the time it calls, thread1's init closure should have finished already and
3134 // the value should be already inserted to the cache. So its init closure
3135 // will not be evaluated and will get the value insert by thread1's init
3136 // closure immediately.
3137 let thread3 = {
3138 let cache3 = cache.clone();
3139 spawn(move || {
3140 // Wait for 400 ms before calling `get_with`.
3141 sleep(Duration::from_millis(400));
3142 let v = cache3.get_with(KEY, || unreachable!());
3143 assert_eq!(v, "thread1");
3144 })
3145 };
3146
3147 // Thread4 will call `get` for the same key. It will call when thread1's init
3148 // closure is still running, so it will get none for the key.
3149 let thread4 = {
3150 let cache4 = cache.clone();
3151 spawn(move || {
3152 // Wait for 200 ms before calling `get`.
3153 sleep(Duration::from_millis(200));
3154 let maybe_v = cache4.get(&KEY);
3155 assert!(maybe_v.is_none());
3156 })
3157 };
3158
3159 // Thread5 will call `get` for the same key. It will call after thread1's init
3160 // closure finished, so it will get the value insert by thread1's init closure.
3161 let thread5 = {
3162 let cache5 = cache.clone();
3163 spawn(move || {
3164 // Wait for 400 ms before calling `get`.
3165 sleep(Duration::from_millis(400));
3166 let maybe_v = cache5.get(&KEY);
3167 assert_eq!(maybe_v, Some("thread1"));
3168 })
3169 };
3170
3171 for t in [thread1, thread2, thread3, thread4, thread5] {
3172 t.join().expect("Failed to join");
3173 }
3174
3175 assert!(cache.is_waiter_map_empty());
3176 }
3177
3178 #[test]
3179 fn get_with_by_ref() {
3180 use std::thread::{sleep, spawn};
3181
3182 let cache = Cache::new(100);
3183 const KEY: &u32 = &0;
3184
3185 // This test will run five threads:
3186 //
3187 // Thread1 will be the first thread to call `get_with_by_ref` for a key, so
3188 // its init closure will be evaluated and then a &str value "thread1" will be
3189 // inserted to the cache.
3190 let thread1 = {
3191 let cache1 = cache.clone();
3192 spawn(move || {
3193 // Call `get_with_by_ref` immediately.
3194 let v = cache1.get_with_by_ref(KEY, || {
3195 // Wait for 300 ms and return a &str value.
3196 sleep(Duration::from_millis(300));
3197 "thread1"
3198 });
3199 assert_eq!(v, "thread1");
3200 })
3201 };
3202
3203 // Thread2 will be the second thread to call `get_with_by_ref` for the same
3204 // key, so its init closure will not be evaluated. Once thread1's init
3205 // closure finishes, it will get the value inserted by thread1's init
3206 // closure.
3207 let thread2 = {
3208 let cache2 = cache.clone();
3209 spawn(move || {
3210 // Wait for 100 ms before calling `get_with_by_ref`.
3211 sleep(Duration::from_millis(100));
3212 let v = cache2.get_with_by_ref(KEY, || unreachable!());
3213 assert_eq!(v, "thread1");
3214 })
3215 };
3216
3217 // Thread3 will be the third thread to call `get_with_by_ref` for the same
3218 // key. By the time it calls, thread1's init closure should have finished
3219 // already and the value should be already inserted to the cache. So its init
3220 // closure will not be evaluated and will get the value insert by thread1's
3221 // init closure immediately.
3222 let thread3 = {
3223 let cache3 = cache.clone();
3224 spawn(move || {
3225 // Wait for 400 ms before calling `get_with_by_ref`.
3226 sleep(Duration::from_millis(400));
3227 let v = cache3.get_with_by_ref(KEY, || unreachable!());
3228 assert_eq!(v, "thread1");
3229 })
3230 };
3231
3232 // Thread4 will call `get` for the same key. It will call when thread1's init
3233 // closure is still running, so it will get none for the key.
3234 let thread4 = {
3235 let cache4 = cache.clone();
3236 spawn(move || {
3237 // Wait for 200 ms before calling `get`.
3238 sleep(Duration::from_millis(200));
3239 let maybe_v = cache4.get(KEY);
3240 assert!(maybe_v.is_none());
3241 })
3242 };
3243
3244 // Thread5 will call `get` for the same key. It will call after thread1's init
3245 // closure finished, so it will get the value insert by thread1's init closure.
3246 let thread5 = {
3247 let cache5 = cache.clone();
3248 spawn(move || {
3249 // Wait for 400 ms before calling `get`.
3250 sleep(Duration::from_millis(400));
3251 let maybe_v = cache5.get(KEY);
3252 assert_eq!(maybe_v, Some("thread1"));
3253 })
3254 };
3255
3256 for t in [thread1, thread2, thread3, thread4, thread5] {
3257 t.join().expect("Failed to join");
3258 }
3259
3260 assert!(cache.is_waiter_map_empty());
3261 }
3262
3263 #[test]
3264 fn entry_or_insert_with_if() {
3265 use std::thread::{sleep, spawn};
3266
3267 let cache = Cache::new(100);
3268 const KEY: u32 = 0;
3269
3270 // This test will run seven threads:
3271 //
3272 // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3273 // its init closure will be evaluated and then a &str value "thread1" will be
3274 // inserted to the cache.
3275 let thread1 = {
3276 let cache1 = cache.clone();
3277 spawn(move || {
3278 // Call `get_with` immediately.
3279 let entry = cache1.entry(KEY).or_insert_with_if(
3280 || {
3281 // Wait for 300 ms and return a &str value.
3282 sleep(Duration::from_millis(300));
3283 "thread1"
3284 },
3285 |_v| unreachable!(),
3286 );
3287 // Entry should be fresh because our async block should have been
3288 // evaluated.
3289 assert!(entry.is_fresh());
3290 assert_eq!(entry.into_value(), "thread1");
3291 })
3292 };
3293
3294 // Thread2 will be the second thread to call `or_insert_with_if` for the same
3295 // key, so its init closure will not be evaluated. Once thread1's init
3296 // closure finishes, it will get the value inserted by thread1's init
3297 // closure.
3298 let thread2 = {
3299 let cache2 = cache.clone();
3300 spawn(move || {
3301 // Wait for 100 ms before calling `get_with`.
3302 sleep(Duration::from_millis(100));
3303 let entry = cache2
3304 .entry(KEY)
3305 .or_insert_with_if(|| unreachable!(), |_v| unreachable!());
3306 // Entry should not be fresh because thread1's async block should have
3307 // been evaluated instead of ours.
3308 assert!(!entry.is_fresh());
3309 assert_eq!(entry.into_value(), "thread1");
3310 })
3311 };
3312
3313 // Thread3 will be the third thread to call `or_insert_with_if` for the same
3314 // key. By the time it calls, thread1's init closure should have finished
3315 // already and the value should be already inserted to the cache. Also
3316 // thread3's `replace_if` closure returns `false`. So its init closure will
3317 // not be evaluated and will get the value inserted by thread1's init closure
3318 // immediately.
3319 let thread3 = {
3320 let cache3 = cache.clone();
3321 spawn(move || {
3322 // Wait for 350 ms before calling `or_insert_with_if`.
3323 sleep(Duration::from_millis(350));
3324 let entry = cache3.entry(KEY).or_insert_with_if(
3325 || unreachable!(),
3326 |v| {
3327 assert_eq!(v, &"thread1");
3328 false
3329 },
3330 );
3331 assert!(!entry.is_fresh());
3332 assert_eq!(entry.into_value(), "thread1");
3333 })
3334 };
3335
3336 // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3337 // key. The value should have been already inserted to the cache by thread1.
3338 // However thread4's `replace_if` closure returns `true`. So its init closure
3339 // will be evaluated to replace the current value.
3340 let thread4 = {
3341 let cache4 = cache.clone();
3342 spawn(move || {
3343 // Wait for 400 ms before calling `or_insert_with_if`.
3344 sleep(Duration::from_millis(400));
3345 let entry = cache4.entry(KEY).or_insert_with_if(
3346 || "thread4",
3347 |v| {
3348 assert_eq!(v, &"thread1");
3349 true
3350 },
3351 );
3352 assert!(entry.is_fresh());
3353 assert_eq!(entry.into_value(), "thread4");
3354 })
3355 };
3356
3357 // Thread5 will call `get` for the same key. It will call when thread1's init
3358 // closure is still running, so it will get none for the key.
3359 let thread5 = {
3360 let cache5 = cache.clone();
3361 spawn(move || {
3362 // Wait for 200 ms before calling `get`.
3363 sleep(Duration::from_millis(200));
3364 let maybe_v = cache5.get(&KEY);
3365 assert!(maybe_v.is_none());
3366 })
3367 };
3368
3369 // Thread6 will call `get` for the same key. It will call when thread1's init
3370 // closure is still running, so it will get none for the key.
3371 let thread6 = {
3372 let cache6 = cache.clone();
3373 spawn(move || {
3374 // Wait for 350 ms before calling `get`.
3375 sleep(Duration::from_millis(350));
3376 let maybe_v = cache6.get(&KEY);
3377 assert_eq!(maybe_v, Some("thread1"));
3378 })
3379 };
3380
3381 // Thread7 will call `get` for the same key. It will call after thread1's init
3382 // closure finished, so it will get the value insert by thread1's init closure.
3383 let thread7 = {
3384 let cache7 = cache.clone();
3385 spawn(move || {
3386 // Wait for 450 ms before calling `get`.
3387 sleep(Duration::from_millis(450));
3388 let maybe_v = cache7.get(&KEY);
3389 assert_eq!(maybe_v, Some("thread4"));
3390 })
3391 };
3392
3393 for t in [
3394 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3395 ] {
3396 t.join().expect("Failed to join");
3397 }
3398
3399 assert!(cache.is_waiter_map_empty());
3400 }
3401
3402 #[test]
3403 fn entry_by_ref_or_insert_with_if() {
3404 use std::thread::{sleep, spawn};
3405
3406 let cache: Cache<u32, &str> = Cache::new(100);
3407 const KEY: &u32 = &0;
3408
3409 // This test will run seven threads:
3410 //
3411 // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3412 // its init closure will be evaluated and then a &str value "thread1" will be
3413 // inserted to the cache.
3414 let thread1 = {
3415 let cache1 = cache.clone();
3416 spawn(move || {
3417 // Call `get_with` immediately.
3418 let v = cache1
3419 .entry_by_ref(KEY)
3420 .or_insert_with_if(
3421 || {
3422 // Wait for 300 ms and return a &str value.
3423 sleep(Duration::from_millis(300));
3424 "thread1"
3425 },
3426 |_v| unreachable!(),
3427 )
3428 .into_value();
3429 assert_eq!(v, "thread1");
3430 })
3431 };
3432
3433 // Thread2 will be the second thread to call `or_insert_with_if` for the same
3434 // key, so its init closure will not be evaluated. Once thread1's init
3435 // closure finishes, it will get the value inserted by thread1's init
3436 // closure.
3437 let thread2 = {
3438 let cache2 = cache.clone();
3439 spawn(move || {
3440 // Wait for 100 ms before calling `get_with`.
3441 sleep(Duration::from_millis(100));
3442 let v = cache2
3443 .entry_by_ref(KEY)
3444 .or_insert_with_if(|| unreachable!(), |_v| unreachable!())
3445 .into_value();
3446 assert_eq!(v, "thread1");
3447 })
3448 };
3449
3450 // Thread3 will be the third thread to call `or_insert_with_if` for the same
3451 // key. By the time it calls, thread1's init closure should have finished
3452 // already and the value should be already inserted to the cache. Also
3453 // thread3's `replace_if` closure returns `false`. So its init closure will
3454 // not be evaluated and will get the value inserted by thread1's init closure
3455 // immediately.
3456 let thread3 = {
3457 let cache3 = cache.clone();
3458 spawn(move || {
3459 // Wait for 350 ms before calling `or_insert_with_if`.
3460 sleep(Duration::from_millis(350));
3461 let v = cache3
3462 .entry_by_ref(KEY)
3463 .or_insert_with_if(
3464 || unreachable!(),
3465 |v| {
3466 assert_eq!(v, &"thread1");
3467 false
3468 },
3469 )
3470 .into_value();
3471 assert_eq!(v, "thread1");
3472 })
3473 };
3474
3475 // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3476 // key. The value should have been already inserted to the cache by
3477 // thread1. However thread4's `replace_if` closure returns `true`. So its
3478 // init closure will be evaluated to replace the current value.
3479 let thread4 = {
3480 let cache4 = cache.clone();
3481 spawn(move || {
3482 // Wait for 400 ms before calling `or_insert_with_if`.
3483 sleep(Duration::from_millis(400));
3484 let v = cache4
3485 .entry_by_ref(KEY)
3486 .or_insert_with_if(
3487 || "thread4",
3488 |v| {
3489 assert_eq!(v, &"thread1");
3490 true
3491 },
3492 )
3493 .into_value();
3494 assert_eq!(v, "thread4");
3495 })
3496 };
3497
3498 // Thread5 will call `get` for the same key. It will call when thread1's init
3499 // closure is still running, so it will get none for the key.
3500 let thread5 = {
3501 let cache5 = cache.clone();
3502 spawn(move || {
3503 // Wait for 200 ms before calling `get`.
3504 sleep(Duration::from_millis(200));
3505 let maybe_v = cache5.get(KEY);
3506 assert!(maybe_v.is_none());
3507 })
3508 };
3509
3510 // Thread6 will call `get` for the same key. It will call when thread1's init
3511 // closure is still running, so it will get none for the key.
3512 let thread6 = {
3513 let cache6 = cache.clone();
3514 spawn(move || {
3515 // Wait for 350 ms before calling `get`.
3516 sleep(Duration::from_millis(350));
3517 let maybe_v = cache6.get(KEY);
3518 assert_eq!(maybe_v, Some("thread1"));
3519 })
3520 };
3521
3522 // Thread7 will call `get` for the same key. It will call after thread1's init
3523 // closure finished, so it will get the value insert by thread1's init closure.
3524 let thread7 = {
3525 let cache7 = cache.clone();
3526 spawn(move || {
3527 // Wait for 450 ms before calling `get`.
3528 sleep(Duration::from_millis(450));
3529 let maybe_v = cache7.get(KEY);
3530 assert_eq!(maybe_v, Some("thread4"));
3531 })
3532 };
3533
3534 for t in [
3535 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3536 ] {
3537 t.join().expect("Failed to join");
3538 }
3539
3540 assert!(cache.is_waiter_map_empty());
3541 }
3542
3543 #[test]
3544 fn try_get_with() {
3545 use std::{
3546 sync::Arc,
3547 thread::{sleep, spawn},
3548 };
3549
3550 // Note that MyError does not implement std::error::Error trait like
3551 // anyhow::Error.
3552 #[derive(Debug)]
3553 pub struct MyError(#[allow(dead_code)] String);
3554
3555 type MyResult<T> = Result<T, Arc<MyError>>;
3556
3557 let cache = Cache::new(100);
3558 const KEY: u32 = 0;
3559
3560 // This test will run eight threads:
3561 //
3562 // Thread1 will be the first thread to call `try_get_with` for a key, so its
3563 // init closure will be evaluated and then an error will be returned. Nothing
3564 // will be inserted to the cache.
3565 let thread1 = {
3566 let cache1 = cache.clone();
3567 spawn(move || {
3568 // Call `try_get_with` immediately.
3569 let v = cache1.try_get_with(KEY, || {
3570 // Wait for 300 ms and return an error.
3571 sleep(Duration::from_millis(300));
3572 Err(MyError("thread1 error".into()))
3573 });
3574 assert!(v.is_err());
3575 })
3576 };
3577
3578 // Thread2 will be the second thread to call `try_get_with` for the same key,
3579 // so its init closure will not be evaluated. Once thread1's init closure
3580 // finishes, it will get the same error value returned by thread1's init
3581 // closure.
3582 let thread2 = {
3583 let cache2 = cache.clone();
3584 spawn(move || {
3585 // Wait for 100 ms before calling `try_get_with`.
3586 sleep(Duration::from_millis(100));
3587 let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
3588 assert!(v.is_err());
3589 })
3590 };
3591
3592 // Thread3 will be the third thread to call `get_with` for the same key. By
3593 // the time it calls, thread1's init closure should have finished already,
3594 // but the key still does not exist in the cache. So its init closure will be
3595 // evaluated and then an okay &str value will be returned. That value will be
3596 // inserted to the cache.
3597 let thread3 = {
3598 let cache3 = cache.clone();
3599 spawn(move || {
3600 // Wait for 400 ms before calling `try_get_with`.
3601 sleep(Duration::from_millis(400));
3602 let v: MyResult<_> = cache3.try_get_with(KEY, || {
3603 // Wait for 300 ms and return an Ok(&str) value.
3604 sleep(Duration::from_millis(300));
3605 Ok("thread3")
3606 });
3607 assert_eq!(v.unwrap(), "thread3");
3608 })
3609 };
3610
3611 // thread4 will be the fourth thread to call `try_get_with` for the same
3612 // key. So its init closure will not be evaluated. Once thread3's init
3613 // closure finishes, it will get the same okay &str value.
3614 let thread4 = {
3615 let cache4 = cache.clone();
3616 spawn(move || {
3617 // Wait for 500 ms before calling `try_get_with`.
3618 sleep(Duration::from_millis(500));
3619 let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
3620 assert_eq!(v.unwrap(), "thread3");
3621 })
3622 };
3623
3624 // Thread5 will be the fifth thread to call `try_get_with` for the same
3625 // key. So its init closure will not be evaluated. By the time it calls,
3626 // thread3's init closure should have finished already, so its init closure
3627 // will not be evaluated and will get the value insert by thread3's init
3628 // closure immediately.
3629 let thread5 = {
3630 let cache5 = cache.clone();
3631 spawn(move || {
3632 // Wait for 800 ms before calling `try_get_with`.
3633 sleep(Duration::from_millis(800));
3634 let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
3635 assert_eq!(v.unwrap(), "thread3");
3636 })
3637 };
3638
3639 // Thread6 will call `get` for the same key. It will call when thread1's init
3640 // closure is still running, so it will get none for the key.
3641 let thread6 = {
3642 let cache6 = cache.clone();
3643 spawn(move || {
3644 // Wait for 200 ms before calling `get`.
3645 sleep(Duration::from_millis(200));
3646 let maybe_v = cache6.get(&KEY);
3647 assert!(maybe_v.is_none());
3648 })
3649 };
3650
3651 // Thread7 will call `get` for the same key. It will call after thread1's init
3652 // closure finished with an error. So it will get none for the key.
3653 let thread7 = {
3654 let cache7 = cache.clone();
3655 spawn(move || {
3656 // Wait for 400 ms before calling `get`.
3657 sleep(Duration::from_millis(400));
3658 let maybe_v = cache7.get(&KEY);
3659 assert!(maybe_v.is_none());
3660 })
3661 };
3662
3663 // Thread8 will call `get` for the same key. It will call after thread3's init
3664 // closure finished, so it will get the value insert by thread3's init closure.
3665 let thread8 = {
3666 let cache8 = cache.clone();
3667 spawn(move || {
3668 // Wait for 800 ms before calling `get`.
3669 sleep(Duration::from_millis(800));
3670 let maybe_v = cache8.get(&KEY);
3671 assert_eq!(maybe_v, Some("thread3"));
3672 })
3673 };
3674
3675 for t in [
3676 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3677 ] {
3678 t.join().expect("Failed to join");
3679 }
3680
3681 assert!(cache.is_waiter_map_empty());
3682 }
3683
3684 #[test]
3685 fn try_get_with_by_ref() {
3686 use std::{
3687 sync::Arc,
3688 thread::{sleep, spawn},
3689 };
3690
3691 // Note that MyError does not implement std::error::Error trait like
3692 // anyhow::Error.
3693 #[derive(Debug)]
3694 pub struct MyError(#[allow(dead_code)] String);
3695
3696 type MyResult<T> = Result<T, Arc<MyError>>;
3697
3698 let cache = Cache::new(100);
3699 const KEY: &u32 = &0;
3700
3701 // This test will run eight threads:
3702 //
3703 // Thread1 will be the first thread to call `try_get_with_by_ref` for a key,
3704 // so its init closure will be evaluated and then an error will be returned.
3705 // Nothing will be inserted to the cache.
3706 let thread1 = {
3707 let cache1 = cache.clone();
3708 spawn(move || {
3709 // Call `try_get_with_by_ref` immediately.
3710 let v = cache1.try_get_with_by_ref(KEY, || {
3711 // Wait for 300 ms and return an error.
3712 sleep(Duration::from_millis(300));
3713 Err(MyError("thread1 error".into()))
3714 });
3715 assert!(v.is_err());
3716 })
3717 };
3718
3719 // Thread2 will be the second thread to call `try_get_with_by_ref` for the
3720 // same key, so its init closure will not be evaluated. Once thread1's init
3721 // closure finishes, it will get the same error value returned by thread1's
3722 // init closure.
3723 let thread2 = {
3724 let cache2 = cache.clone();
3725 spawn(move || {
3726 // Wait for 100 ms before calling `try_get_with_by_ref`.
3727 sleep(Duration::from_millis(100));
3728 let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
3729 assert!(v.is_err());
3730 })
3731 };
3732
3733 // Thread3 will be the third thread to call `get_with` for the same key. By
3734 // the time it calls, thread1's init closure should have finished already,
3735 // but the key still does not exist in the cache. So its init closure will be
3736 // evaluated and then an okay &str value will be returned. That value will be
3737 // inserted to the cache.
3738 let thread3 = {
3739 let cache3 = cache.clone();
3740 spawn(move || {
3741 // Wait for 400 ms before calling `try_get_with_by_ref`.
3742 sleep(Duration::from_millis(400));
3743 let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
3744 // Wait for 300 ms and return an Ok(&str) value.
3745 sleep(Duration::from_millis(300));
3746 Ok("thread3")
3747 });
3748 assert_eq!(v.unwrap(), "thread3");
3749 })
3750 };
3751
3752 // thread4 will be the fourth thread to call `try_get_with_by_ref` for the
3753 // same key. So its init closure will not be evaluated. Once thread3's init
3754 // closure finishes, it will get the same okay &str value.
3755 let thread4 = {
3756 let cache4 = cache.clone();
3757 spawn(move || {
3758 // Wait for 500 ms before calling `try_get_with_by_ref`.
3759 sleep(Duration::from_millis(500));
3760 let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
3761 assert_eq!(v.unwrap(), "thread3");
3762 })
3763 };
3764
3765 // Thread5 will be the fifth thread to call `try_get_with_by_ref` for the
3766 // same key. So its init closure will not be evaluated. By the time it calls,
3767 // thread3's init closure should have finished already, so its init closure
3768 // will not be evaluated and will get the value insert by thread3's init
3769 // closure immediately.
3770 let thread5 = {
3771 let cache5 = cache.clone();
3772 spawn(move || {
3773 // Wait for 800 ms before calling `try_get_with_by_ref`.
3774 sleep(Duration::from_millis(800));
3775 let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
3776 assert_eq!(v.unwrap(), "thread3");
3777 })
3778 };
3779
3780 // Thread6 will call `get` for the same key. It will call when thread1's init
3781 // closure is still running, so it will get none for the key.
3782 let thread6 = {
3783 let cache6 = cache.clone();
3784 spawn(move || {
3785 // Wait for 200 ms before calling `get`.
3786 sleep(Duration::from_millis(200));
3787 let maybe_v = cache6.get(KEY);
3788 assert!(maybe_v.is_none());
3789 })
3790 };
3791
3792 // Thread7 will call `get` for the same key. It will call after thread1's init
3793 // closure finished with an error. So it will get none for the key.
3794 let thread7 = {
3795 let cache7 = cache.clone();
3796 spawn(move || {
3797 // Wait for 400 ms before calling `get`.
3798 sleep(Duration::from_millis(400));
3799 let maybe_v = cache7.get(KEY);
3800 assert!(maybe_v.is_none());
3801 })
3802 };
3803
3804 // Thread8 will call `get` for the same key. It will call after thread3's init
3805 // closure finished, so it will get the value insert by thread3's init closure.
3806 let thread8 = {
3807 let cache8 = cache.clone();
3808 spawn(move || {
3809 // Wait for 800 ms before calling `get`.
3810 sleep(Duration::from_millis(800));
3811 let maybe_v = cache8.get(KEY);
3812 assert_eq!(maybe_v, Some("thread3"));
3813 })
3814 };
3815
3816 for t in [
3817 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3818 ] {
3819 t.join().expect("Failed to join");
3820 }
3821
3822 assert!(cache.is_waiter_map_empty());
3823 }
3824
3825 #[test]
3826 fn optionally_get_with() {
3827 use std::thread::{sleep, spawn};
3828
3829 let cache = Cache::new(100);
3830 const KEY: u32 = 0;
3831
3832 // This test will run eight threads:
3833 //
3834 // Thread1 will be the first thread to call `optionally_get_with` for a key,
3835 // so its init closure will be evaluated and then an error will be returned.
3836 // Nothing will be inserted to the cache.
3837 let thread1 = {
3838 let cache1 = cache.clone();
3839 spawn(move || {
3840 // Call `optionally_get_with` immediately.
3841 let v = cache1.optionally_get_with(KEY, || {
3842 // Wait for 300 ms and return an error.
3843 sleep(Duration::from_millis(300));
3844 None
3845 });
3846 assert!(v.is_none());
3847 })
3848 };
3849
3850 // Thread2 will be the second thread to call `optionally_get_with` for the
3851 // same key, so its init closure will not be evaluated. Once thread1's init
3852 // closure finishes, it will get the same error value returned by thread1's
3853 // init closure.
3854 let thread2 = {
3855 let cache2 = cache.clone();
3856 spawn(move || {
3857 // Wait for 100 ms before calling `optionally_get_with`.
3858 sleep(Duration::from_millis(100));
3859 let v = cache2.optionally_get_with(KEY, || unreachable!());
3860 assert!(v.is_none());
3861 })
3862 };
3863
3864 // Thread3 will be the third thread to call `get_with` for the same key. By
3865 // the time it calls, thread1's init closure should have finished already,
3866 // but the key still does not exist in the cache. So its init closure will be
3867 // evaluated and then an okay &str value will be returned. That value will be
3868 // inserted to the cache.
3869 let thread3 = {
3870 let cache3 = cache.clone();
3871 spawn(move || {
3872 // Wait for 400 ms before calling `optionally_get_with`.
3873 sleep(Duration::from_millis(400));
3874 let v = cache3.optionally_get_with(KEY, || {
3875 // Wait for 300 ms and return an Ok(&str) value.
3876 sleep(Duration::from_millis(300));
3877 Some("thread3")
3878 });
3879 assert_eq!(v.unwrap(), "thread3");
3880 })
3881 };
3882
3883 // thread4 will be the fourth thread to call `optionally_get_with` for the
3884 // same key. So its init closure will not be evaluated. Once thread3's init
3885 // closure finishes, it will get the same okay &str value.
3886 let thread4 = {
3887 let cache4 = cache.clone();
3888 spawn(move || {
3889 // Wait for 500 ms before calling `optionally_get_with`.
3890 sleep(Duration::from_millis(500));
3891 let v = cache4.optionally_get_with(KEY, || unreachable!());
3892 assert_eq!(v.unwrap(), "thread3");
3893 })
3894 };
3895
3896 // Thread5 will be the fifth thread to call `optionally_get_with` for the
3897 // same key. So its init closure will not be evaluated. By the time it calls,
3898 // thread3's init closure should have finished already, so its init closure
3899 // will not be evaluated and will get the value insert by thread3's init
3900 // closure immediately.
3901 let thread5 = {
3902 let cache5 = cache.clone();
3903 spawn(move || {
3904 // Wait for 800 ms before calling `optionally_get_with`.
3905 sleep(Duration::from_millis(800));
3906 let v = cache5.optionally_get_with(KEY, || unreachable!());
3907 assert_eq!(v.unwrap(), "thread3");
3908 })
3909 };
3910
3911 // Thread6 will call `get` for the same key. It will call when thread1's init
3912 // closure is still running, so it will get none for the key.
3913 let thread6 = {
3914 let cache6 = cache.clone();
3915 spawn(move || {
3916 // Wait for 200 ms before calling `get`.
3917 sleep(Duration::from_millis(200));
3918 let maybe_v = cache6.get(&KEY);
3919 assert!(maybe_v.is_none());
3920 })
3921 };
3922
3923 // Thread7 will call `get` for the same key. It will call after thread1's init
3924 // closure finished with an error. So it will get none for the key.
3925 let thread7 = {
3926 let cache7 = cache.clone();
3927 spawn(move || {
3928 // Wait for 400 ms before calling `get`.
3929 sleep(Duration::from_millis(400));
3930 let maybe_v = cache7.get(&KEY);
3931 assert!(maybe_v.is_none());
3932 })
3933 };
3934
3935 // Thread8 will call `get` for the same key. It will call after thread3's init
3936 // closure finished, so it will get the value insert by thread3's init closure.
3937 let thread8 = {
3938 let cache8 = cache.clone();
3939 spawn(move || {
3940 // Wait for 800 ms before calling `get`.
3941 sleep(Duration::from_millis(800));
3942 let maybe_v = cache8.get(&KEY);
3943 assert_eq!(maybe_v, Some("thread3"));
3944 })
3945 };
3946
3947 for t in [
3948 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3949 ] {
3950 t.join().expect("Failed to join");
3951 }
3952
3953 assert!(cache.is_waiter_map_empty());
3954 }
3955
3956 #[test]
3957 fn optionally_get_with_by_ref() {
3958 use std::thread::{sleep, spawn};
3959
3960 let cache = Cache::new(100);
3961 const KEY: &u32 = &0;
3962
3963 // This test will run eight threads:
3964 //
3965 // Thread1 will be the first thread to call `optionally_get_with_by_ref` for
3966 // a key, so its init closure will be evaluated and then an error will be
3967 // returned. Nothing will be inserted to the cache.
3968 let thread1 = {
3969 let cache1 = cache.clone();
3970 spawn(move || {
3971 // Call `optionally_get_with_by_ref` immediately.
3972 let v = cache1.optionally_get_with_by_ref(KEY, || {
3973 // Wait for 300 ms and return an error.
3974 sleep(Duration::from_millis(300));
3975 None
3976 });
3977 assert!(v.is_none());
3978 })
3979 };
3980
3981 // Thread2 will be the second thread to call `optionally_get_with_by_ref` for
3982 // the same key, so its init closure will not be evaluated. Once thread1's
3983 // init closure finishes, it will get the same error value returned by
3984 // thread1's init closure.
3985 let thread2 = {
3986 let cache2 = cache.clone();
3987 spawn(move || {
3988 // Wait for 100 ms before calling `optionally_get_with_by_ref`.
3989 sleep(Duration::from_millis(100));
3990 let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
3991 assert!(v.is_none());
3992 })
3993 };
3994
3995 // Thread3 will be the third thread to call `get_with` for the same key. By
3996 // the time it calls, thread1's init closure should have finished already,
3997 // but the key still does not exist in the cache. So its init closure will be
3998 // evaluated and then an okay &str value will be returned. That value will be
3999 // inserted to the cache.
4000 let thread3 = {
4001 let cache3 = cache.clone();
4002 spawn(move || {
4003 // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4004 sleep(Duration::from_millis(400));
4005 let v = cache3.optionally_get_with_by_ref(KEY, || {
4006 // Wait for 300 ms and return an Ok(&str) value.
4007 sleep(Duration::from_millis(300));
4008 Some("thread3")
4009 });
4010 assert_eq!(v.unwrap(), "thread3");
4011 })
4012 };
4013
4014 // thread4 will be the fourth thread to call `optionally_get_with_by_ref` for
4015 // the same key. So its init closure will not be evaluated. Once thread3's
4016 // init closure finishes, it will get the same okay &str value.
4017 let thread4 = {
4018 let cache4 = cache.clone();
4019 spawn(move || {
4020 // Wait for 500 ms before calling `optionally_get_with_by_ref`.
4021 sleep(Duration::from_millis(500));
4022 let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
4023 assert_eq!(v.unwrap(), "thread3");
4024 })
4025 };
4026
4027 // Thread5 will be the fifth thread to call `optionally_get_with_by_ref` for
4028 // the same key. So its init closure will not be evaluated. By the time it
4029 // calls, thread3's init closure should have finished already, so its init
4030 // closure will not be evaluated and will get the value insert by thread3's
4031 // init closure immediately.
4032 let thread5 = {
4033 let cache5 = cache.clone();
4034 spawn(move || {
4035 // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4036 sleep(Duration::from_millis(800));
4037 let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
4038 assert_eq!(v.unwrap(), "thread3");
4039 })
4040 };
4041
4042 // Thread6 will call `get` for the same key. It will call when thread1's init
4043 // closure is still running, so it will get none for the key.
4044 let thread6 = {
4045 let cache6 = cache.clone();
4046 spawn(move || {
4047 // Wait for 200 ms before calling `get`.
4048 sleep(Duration::from_millis(200));
4049 let maybe_v = cache6.get(KEY);
4050 assert!(maybe_v.is_none());
4051 })
4052 };
4053
4054 // Thread7 will call `get` for the same key. It will call after thread1's init
4055 // closure finished with an error. So it will get none for the key.
4056 let thread7 = {
4057 let cache7 = cache.clone();
4058 spawn(move || {
4059 // Wait for 400 ms before calling `get`.
4060 sleep(Duration::from_millis(400));
4061 let maybe_v = cache7.get(KEY);
4062 assert!(maybe_v.is_none());
4063 })
4064 };
4065
4066 // Thread8 will call `get` for the same key. It will call after thread3's init
4067 // closure finished, so it will get the value insert by thread3's init closure.
4068 let thread8 = {
4069 let cache8 = cache.clone();
4070 spawn(move || {
4071 // Wait for 800 ms before calling `get`.
4072 sleep(Duration::from_millis(800));
4073 let maybe_v = cache8.get(KEY);
4074 assert_eq!(maybe_v, Some("thread3"));
4075 })
4076 };
4077
4078 for t in [
4079 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4080 ] {
4081 t.join().expect("Failed to join");
4082 }
4083
4084 assert!(cache.is_waiter_map_empty());
4085 }
4086
4087 #[test]
4088 fn upsert_with() {
4089 use std::thread::{sleep, spawn};
4090
4091 let cache = Cache::new(100);
4092 const KEY: u32 = 0;
4093
4094 // Spawn three threads to call `and_upsert_with` for the same key and each
4095 // task increments the current value by 1. Ensure the key-level lock is
4096 // working by verifying the value is 3 after all threads finish.
4097 //
4098 // | | thread 1 | thread 2 | thread 3 |
4099 // |--------|----------|----------|----------|
4100 // | 0 ms | get none | | |
4101 // | 100 ms | | blocked | |
4102 // | 200 ms | insert 1 | | |
4103 // | | | get 1 | |
4104 // | 300 ms | | | blocked |
4105 // | 400 ms | | insert 2 | |
4106 // | | | | get 2 |
4107 // | 500 ms | | | insert 3 |
4108
4109 let thread1 = {
4110 let cache1 = cache.clone();
4111 spawn(move || {
4112 cache1.entry(KEY).and_upsert_with(|maybe_entry| {
4113 sleep(Duration::from_millis(200));
4114 assert!(maybe_entry.is_none());
4115 1
4116 })
4117 })
4118 };
4119
4120 let thread2 = {
4121 let cache2 = cache.clone();
4122 spawn(move || {
4123 sleep(Duration::from_millis(100));
4124 cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4125 sleep(Duration::from_millis(200));
4126 let entry = maybe_entry.expect("The entry should exist");
4127 entry.into_value() + 1
4128 })
4129 })
4130 };
4131
4132 let thread3 = {
4133 let cache3 = cache.clone();
4134 spawn(move || {
4135 sleep(Duration::from_millis(300));
4136 cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4137 sleep(Duration::from_millis(100));
4138 let entry = maybe_entry.expect("The entry should exist");
4139 entry.into_value() + 1
4140 })
4141 })
4142 };
4143
4144 let ent1 = thread1.join().expect("Thread 1 should finish");
4145 let ent2 = thread2.join().expect("Thread 2 should finish");
4146 let ent3 = thread3.join().expect("Thread 3 should finish");
4147 assert_eq!(ent1.into_value(), 1);
4148 assert_eq!(ent2.into_value(), 2);
4149 assert_eq!(ent3.into_value(), 3);
4150
4151 assert_eq!(cache.get(&KEY), Some(3));
4152
4153 assert!(cache.is_waiter_map_empty());
4154 }
4155
4156 #[test]
4157 fn compute_with() {
4158 use crate::ops::compute;
4159 use std::{
4160 sync::RwLock,
4161 thread::{sleep, spawn},
4162 };
4163
4164 let cache = Cache::new(100);
4165 const KEY: u32 = 0;
4166
4167 // Spawn six threads to call `and_compute_with` for the same key. Ensure the
4168 // key-level lock is working by verifying the value after all threads finish.
4169 //
4170 // | | thread 1 | thread 2 | thread 3 | thread 4 | thread 5 | thread 6 |
4171 // |---------|------------|---------------|------------|----------|------------|----------|
4172 // | 0 ms | get none | | | | | |
4173 // | 100 ms | | blocked | | | | |
4174 // | 200 ms | insert [1] | | | | | |
4175 // | | | get [1] | | | | |
4176 // | 300 ms | | | blocked | | | |
4177 // | 400 ms | | insert [1, 2] | | | | |
4178 // | | | | get [1, 2] | | | |
4179 // | 500 ms | | | | blocked | | |
4180 // | 600 ms | | | remove | | | |
4181 // | | | | | get none | | |
4182 // | 700 ms | | | | | blocked | |
4183 // | 800 ms | | | | nop | | |
4184 // | | | | | | get none | |
4185 // | 900 ms | | | | | | blocked |
4186 // | 1000 ms | | | | | insert [5] | |
4187 // | | | | | | | get [5] |
4188 // | 1100 ms | | | | | | nop |
4189
4190 let thread1 = {
4191 let cache1 = cache.clone();
4192 spawn(move || {
4193 cache1.entry(KEY).and_compute_with(|maybe_entry| {
4194 sleep(Duration::from_millis(200));
4195 assert!(maybe_entry.is_none());
4196 compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4197 })
4198 })
4199 };
4200
4201 let thread2 = {
4202 let cache2 = cache.clone();
4203 spawn(move || {
4204 sleep(Duration::from_millis(100));
4205 cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4206 let entry = maybe_entry.expect("The entry should exist");
4207 let value = entry.into_value();
4208 assert_eq!(*value.read().unwrap(), vec![1]);
4209 sleep(Duration::from_millis(200));
4210 value.write().unwrap().push(2);
4211 compute::Op::Put(value)
4212 })
4213 })
4214 };
4215
4216 let thread3 = {
4217 let cache3 = cache.clone();
4218 spawn(move || {
4219 sleep(Duration::from_millis(300));
4220 cache3.entry(KEY).and_compute_with(|maybe_entry| {
4221 let entry = maybe_entry.expect("The entry should exist");
4222 let value = entry.into_value();
4223 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4224 sleep(Duration::from_millis(200));
4225 compute::Op::Remove
4226 })
4227 })
4228 };
4229
4230 let thread4 = {
4231 let cache4 = cache.clone();
4232 spawn(move || {
4233 sleep(Duration::from_millis(500));
4234 cache4.entry(KEY).and_compute_with(|maybe_entry| {
4235 assert!(maybe_entry.is_none());
4236 sleep(Duration::from_millis(200));
4237 compute::Op::Nop
4238 })
4239 })
4240 };
4241
4242 let thread5 = {
4243 let cache5 = cache.clone();
4244 spawn(move || {
4245 sleep(Duration::from_millis(700));
4246 cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4247 assert!(maybe_entry.is_none());
4248 sleep(Duration::from_millis(200));
4249 compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4250 })
4251 })
4252 };
4253
4254 let thread6 = {
4255 let cache6 = cache.clone();
4256 spawn(move || {
4257 sleep(Duration::from_millis(900));
4258 cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4259 let entry = maybe_entry.expect("The entry should exist");
4260 let value = entry.into_value();
4261 assert_eq!(*value.read().unwrap(), vec![5]);
4262 sleep(Duration::from_millis(100));
4263 compute::Op::Nop
4264 })
4265 })
4266 };
4267
4268 let res1 = thread1.join().expect("Thread 1 should finish");
4269 let res2 = thread2.join().expect("Thread 2 should finish");
4270 let res3 = thread3.join().expect("Thread 3 should finish");
4271 let res4 = thread4.join().expect("Thread 4 should finish");
4272 let res5 = thread5.join().expect("Thread 5 should finish");
4273 let res6 = thread6.join().expect("Thread 6 should finish");
4274
4275 let compute::CompResult::Inserted(entry) = res1 else {
4276 panic!("Expected `Inserted`. Got {res1:?}")
4277 };
4278 assert_eq!(
4279 *entry.into_value().read().unwrap(),
4280 vec![1, 2] // The same Vec was modified by task2.
4281 );
4282
4283 let compute::CompResult::ReplacedWith(entry) = res2 else {
4284 panic!("Expected `ReplacedWith`. Got {res2:?}")
4285 };
4286 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4287
4288 let compute::CompResult::Removed(entry) = res3 else {
4289 panic!("Expected `Removed`. Got {res3:?}")
4290 };
4291 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4292
4293 let compute::CompResult::StillNone(key) = res4 else {
4294 panic!("Expected `StillNone`. Got {res4:?}")
4295 };
4296 assert_eq!(*key, KEY);
4297
4298 let compute::CompResult::Inserted(entry) = res5 else {
4299 panic!("Expected `Inserted`. Got {res5:?}")
4300 };
4301 assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4302
4303 let compute::CompResult::Unchanged(entry) = res6 else {
4304 panic!("Expected `Unchanged`. Got {res6:?}")
4305 };
4306 assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4307
4308 assert!(cache.is_waiter_map_empty());
4309 }
4310
4311 #[test]
4312 fn try_compute_with() {
4313 use crate::ops::compute;
4314 use std::{
4315 sync::RwLock,
4316 thread::{sleep, spawn},
4317 };
4318
4319 let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4320 const KEY: u32 = 0;
4321
4322 // Spawn four threads to call `and_try_compute_with` for the same key. Ensure
4323 // the key-level lock is working by verifying the value after all threads
4324 // finish.
4325 //
4326 // | | thread 1 | thread 2 | thread 3 | thread 4 |
4327 // |---------|------------|---------------|------------|------------|
4328 // | 0 ms | get none | | | |
4329 // | 100 ms | | blocked | | |
4330 // | 200 ms | insert [1] | | | |
4331 // | | | get [1] | | |
4332 // | 300 ms | | | blocked | |
4333 // | 400 ms | | insert [1, 2] | | |
4334 // | | | | get [1, 2] | |
4335 // | 500 ms | | | | blocked |
4336 // | 600 ms | | | err | |
4337 // | | | | | get [1, 2] |
4338 // | 700 ms | | | | remove |
4339 //
4340 // This test is shorter than `compute_with` test because this one omits `Nop`
4341 // cases.
4342
4343 let thread1 = {
4344 let cache1 = cache.clone();
4345 spawn(move || {
4346 cache1.entry(KEY).and_try_compute_with(|maybe_entry| {
4347 sleep(Duration::from_millis(200));
4348 assert!(maybe_entry.is_none());
4349 Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4350 })
4351 })
4352 };
4353
4354 let thread2 = {
4355 let cache2 = cache.clone();
4356 spawn(move || {
4357 sleep(Duration::from_millis(100));
4358 cache2
4359 .entry_by_ref(&KEY)
4360 .and_try_compute_with(|maybe_entry| {
4361 let entry = maybe_entry.expect("The entry should exist");
4362 let value = entry.into_value();
4363 assert_eq!(*value.read().unwrap(), vec![1]);
4364 sleep(Duration::from_millis(200));
4365 value.write().unwrap().push(2);
4366 Ok(compute::Op::Put(value)) as Result<_, ()>
4367 })
4368 })
4369 };
4370
4371 let thread3 = {
4372 let cache3 = cache.clone();
4373 spawn(move || {
4374 sleep(Duration::from_millis(300));
4375 cache3.entry(KEY).and_try_compute_with(|maybe_entry| {
4376 let entry = maybe_entry.expect("The entry should exist");
4377 let value = entry.into_value();
4378 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4379 sleep(Duration::from_millis(200));
4380 Err(())
4381 })
4382 })
4383 };
4384
4385 let thread4 = {
4386 let cache4 = cache.clone();
4387 spawn(move || {
4388 sleep(Duration::from_millis(500));
4389 cache4.entry(KEY).and_try_compute_with(|maybe_entry| {
4390 let entry = maybe_entry.expect("The entry should exist");
4391 let value = entry.into_value();
4392 assert_eq!(*value.read().unwrap(), vec![1, 2]);
4393 sleep(Duration::from_millis(100));
4394 Ok(compute::Op::Remove) as Result<_, ()>
4395 })
4396 })
4397 };
4398
4399 let res1 = thread1.join().expect("Thread 1 should finish");
4400 let res2 = thread2.join().expect("Thread 2 should finish");
4401 let res3 = thread3.join().expect("Thread 3 should finish");
4402 let res4 = thread4.join().expect("Thread 4 should finish");
4403
4404 let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4405 panic!("Expected `Inserted`. Got {res1:?}")
4406 };
4407 assert_eq!(
4408 *entry.into_value().read().unwrap(),
4409 vec![1, 2] // The same Vec was modified by task2.
4410 );
4411
4412 let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4413 panic!("Expected `ReplacedWith`. Got {res2:?}")
4414 };
4415 assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4416
4417 assert!(res3.is_err());
4418
4419 let Ok(compute::CompResult::Removed(entry)) = res4 else {
4420 panic!("Expected `Removed`. Got {res4:?}")
4421 };
4422 assert_eq!(
4423 *entry.into_value().read().unwrap(),
4424 vec![1, 2] // Removed value.
4425 );
4426
4427 assert!(cache.is_waiter_map_empty());
4428 }
4429
4430 #[test]
4431 // https://github.com/moka-rs/moka/issues/43
4432 fn handle_panic_in_get_with() {
4433 use std::{sync::Barrier, thread};
4434
4435 let cache = Cache::new(16);
4436 let barrier = Arc::new(Barrier::new(2));
4437 {
4438 let cache_ref = cache.clone();
4439 let barrier_ref = barrier.clone();
4440 thread::spawn(move || {
4441 let _ = cache_ref.get_with(1, || {
4442 barrier_ref.wait();
4443 thread::sleep(Duration::from_millis(50));
4444 panic!("Panic during get_with");
4445 });
4446 });
4447 }
4448
4449 barrier.wait();
4450 assert_eq!(cache.get_with(1, || 5), 5);
4451
4452 assert!(cache.is_waiter_map_empty());
4453 }
4454
4455 #[test]
4456 // https://github.com/moka-rs/moka/issues/43
4457 fn handle_panic_in_try_get_with() {
4458 use std::{sync::Barrier, thread};
4459
4460 let cache = Cache::new(16);
4461 let barrier = Arc::new(Barrier::new(2));
4462 {
4463 let cache_ref = cache.clone();
4464 let barrier_ref = barrier.clone();
4465 thread::spawn(move || {
4466 let _ = cache_ref.try_get_with(1, || {
4467 barrier_ref.wait();
4468 thread::sleep(Duration::from_millis(50));
4469 panic!("Panic during try_get_with");
4470 }) as Result<_, Arc<Infallible>>;
4471 });
4472 }
4473
4474 barrier.wait();
4475 assert_eq!(
4476 cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
4477 Ok(5)
4478 );
4479
4480 assert!(cache.is_waiter_map_empty());
4481 }
4482
4483 #[test]
4484 fn test_removal_notifications() {
4485 // The following `Vec`s will hold actual and expected notifications.
4486 let actual = Arc::new(Mutex::new(Vec::new()));
4487 let mut expected = Vec::new();
4488
4489 // Create an eviction listener.
4490 let a1 = Arc::clone(&actual);
4491 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4492
4493 // Create a cache with the eviction listener.
4494 let mut cache = Cache::builder()
4495 .max_capacity(3)
4496 .eviction_listener(listener)
4497 .build();
4498 cache.reconfigure_for_testing();
4499
4500 // Make the cache exterior immutable.
4501 let cache = cache;
4502
4503 cache.insert('a', "alice");
4504 cache.invalidate(&'a');
4505 expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4506
4507 cache.run_pending_tasks();
4508 assert_eq!(cache.entry_count(), 0);
4509
4510 cache.insert('b', "bob");
4511 cache.insert('c', "cathy");
4512 cache.insert('d', "david");
4513 cache.run_pending_tasks();
4514 assert_eq!(cache.entry_count(), 3);
4515
4516 // This will be rejected due to the size constraint.
4517 cache.insert('e', "emily");
4518 expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4519 cache.run_pending_tasks();
4520 assert_eq!(cache.entry_count(), 3);
4521
4522 // Raise the popularity of 'e' so it will be accepted next time.
4523 cache.get(&'e');
4524 cache.run_pending_tasks();
4525
4526 // Retry.
4527 cache.insert('e', "eliza");
4528 // and the LRU entry will be evicted.
4529 expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4530 cache.run_pending_tasks();
4531 assert_eq!(cache.entry_count(), 3);
4532
4533 // Replace an existing entry.
4534 cache.insert('d', "dennis");
4535 expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4536 cache.run_pending_tasks();
4537 assert_eq!(cache.entry_count(), 3);
4538
4539 verify_notification_vec(&cache, actual, &expected);
4540 }
4541
4542 #[test]
4543 fn test_immediate_removal_notifications_with_updates() {
4544 // The following `Vec` will hold actual notifications.
4545 let actual = Arc::new(Mutex::new(Vec::new()));
4546
4547 // Create an eviction listener.
4548 let a1 = Arc::clone(&actual);
4549 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4550
4551 let (clock, mock) = Clock::mock();
4552
4553 // Create a cache with the eviction listener and also TTL and TTI.
4554 let mut cache = Cache::builder()
4555 .eviction_listener(listener)
4556 .time_to_live(Duration::from_secs(7))
4557 .time_to_idle(Duration::from_secs(5))
4558 .clock(clock)
4559 .build();
4560 cache.reconfigure_for_testing();
4561
4562 // Make the cache exterior immutable.
4563 let cache = cache;
4564
4565 cache.insert("alice", "a0");
4566 cache.run_pending_tasks();
4567
4568 // Now alice (a0) has been expired by the idle timeout (TTI).
4569 mock.increment(Duration::from_secs(6));
4570 assert_eq!(cache.get(&"alice"), None);
4571
4572 // We have not ran sync after the expiration of alice (a0), so it is
4573 // still in the cache.
4574 assert_eq!(cache.entry_count(), 1);
4575
4576 // Re-insert alice with a different value. Since alice (a0) is still
4577 // in the cache, this is actually a replace operation rather than an
4578 // insert operation. We want to verify that the RemovalCause of a0 is
4579 // Expired, not Replaced.
4580 cache.insert("alice", "a1");
4581 {
4582 let mut a = actual.lock();
4583 assert_eq!(a.len(), 1);
4584 assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
4585 a.clear();
4586 }
4587
4588 cache.run_pending_tasks();
4589
4590 mock.increment(Duration::from_secs(4));
4591 assert_eq!(cache.get(&"alice"), Some("a1"));
4592 cache.run_pending_tasks();
4593
4594 // Now alice has been expired by time-to-live (TTL).
4595 mock.increment(Duration::from_secs(4));
4596 assert_eq!(cache.get(&"alice"), None);
4597
4598 // But, again, it is still in the cache.
4599 assert_eq!(cache.entry_count(), 1);
4600
4601 // Re-insert alice with a different value and verify that the
4602 // RemovalCause of a1 is Expired (not Replaced).
4603 cache.insert("alice", "a2");
4604 {
4605 let mut a = actual.lock();
4606 assert_eq!(a.len(), 1);
4607 assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
4608 a.clear();
4609 }
4610
4611 cache.run_pending_tasks();
4612
4613 assert_eq!(cache.entry_count(), 1);
4614
4615 // Now alice (a2) has been expired by the idle timeout.
4616 mock.increment(Duration::from_secs(6));
4617 assert_eq!(cache.get(&"alice"), None);
4618 assert_eq!(cache.entry_count(), 1);
4619
4620 // This invalidate will internally remove alice (a2).
4621 cache.invalidate(&"alice");
4622 cache.run_pending_tasks();
4623 assert_eq!(cache.entry_count(), 0);
4624
4625 {
4626 let mut a = actual.lock();
4627 assert_eq!(a.len(), 1);
4628 assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
4629 a.clear();
4630 }
4631
4632 // Re-insert, and this time, make it expired by the TTL.
4633 cache.insert("alice", "a3");
4634 cache.run_pending_tasks();
4635 mock.increment(Duration::from_secs(4));
4636 assert_eq!(cache.get(&"alice"), Some("a3"));
4637 cache.run_pending_tasks();
4638 mock.increment(Duration::from_secs(4));
4639 assert_eq!(cache.get(&"alice"), None);
4640 assert_eq!(cache.entry_count(), 1);
4641
4642 // This invalidate will internally remove alice (a2).
4643 cache.invalidate(&"alice");
4644 cache.run_pending_tasks();
4645
4646 assert_eq!(cache.entry_count(), 0);
4647
4648 {
4649 let mut a = actual.lock();
4650 assert_eq!(a.len(), 1);
4651 assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
4652 a.clear();
4653 }
4654
4655 assert!(cache.key_locks_map_is_empty());
4656 }
4657
4658 // This test ensures the key-level lock for the immediate notification
4659 // delivery mode is working so that the notifications for a given key
4660 // should always be ordered. This is true even if multiple client threads
4661 // try to modify the entries for the key at the same time. (This test will
4662 // run three client threads)
4663 //
4664 // This test is ignored by default. It becomes unstable when run in parallel
4665 // with other tests.
4666 #[test]
4667 #[ignore]
4668 fn test_key_lock_used_by_immediate_removal_notifications() {
4669 use std::thread::{sleep, spawn};
4670
4671 const KEY: &str = "alice";
4672
4673 type Val = &'static str;
4674
4675 #[derive(PartialEq, Eq, Debug)]
4676 enum Event {
4677 Insert(Val),
4678 Invalidate(Val),
4679 BeginNotify(Val, RemovalCause),
4680 EndNotify(Val, RemovalCause),
4681 }
4682
4683 // The following `Vec will hold actual notifications.
4684 let actual = Arc::new(Mutex::new(Vec::new()));
4685
4686 // Create an eviction listener.
4687 // Note that this listener is slow and will take 300 ms to complete.
4688 let a0 = Arc::clone(&actual);
4689 let listener = move |_k, v, cause| {
4690 a0.lock().push(Event::BeginNotify(v, cause));
4691 sleep(Duration::from_millis(300));
4692 a0.lock().push(Event::EndNotify(v, cause));
4693 };
4694
4695 // Create a cache with the eviction listener and also TTL 500 ms.
4696 let mut cache = Cache::builder()
4697 .eviction_listener(listener)
4698 .time_to_live(Duration::from_millis(500))
4699 .build();
4700 cache.reconfigure_for_testing();
4701
4702 // Make the cache exterior immutable.
4703 let cache = cache;
4704
4705 // - Notifications for the same key must not overlap.
4706
4707 // Time Event
4708 // ----- -------------------------------------
4709 // 0000: Insert value a0
4710 // 0500: a0 expired
4711 // 0600: Insert value a1 -> expired a0 (N-A0)
4712 // 0800: Insert value a2 (waiting) (A-A2)
4713 // 0900: N-A0 processed
4714 // A-A2 finished waiting -> replace a1 (N-A1)
4715 // 1100: Invalidate (waiting) (R-A2)
4716 // 1200: N-A1 processed
4717 // R-A2 finished waiting -> explicit a2 (N-A2)
4718 // 1500: N-A2 processed
4719
4720 let expected = vec![
4721 Event::Insert("a0"),
4722 Event::Insert("a1"),
4723 Event::BeginNotify("a0", RemovalCause::Expired),
4724 Event::Insert("a2"),
4725 Event::EndNotify("a0", RemovalCause::Expired),
4726 Event::BeginNotify("a1", RemovalCause::Replaced),
4727 Event::Invalidate("a2"),
4728 Event::EndNotify("a1", RemovalCause::Replaced),
4729 Event::BeginNotify("a2", RemovalCause::Explicit),
4730 Event::EndNotify("a2", RemovalCause::Explicit),
4731 ];
4732
4733 // 0000: Insert value a0
4734 actual.lock().push(Event::Insert("a0"));
4735 cache.insert(KEY, "a0");
4736 // Call `sync` to set the last modified for the KEY immediately so that
4737 // this entry should expire in 1000 ms from now.
4738 cache.run_pending_tasks();
4739
4740 // 0500: Insert value a1 -> expired a0 (N-A0)
4741 let thread1 = {
4742 let a1 = Arc::clone(&actual);
4743 let c1 = cache.clone();
4744 spawn(move || {
4745 sleep(Duration::from_millis(600));
4746 a1.lock().push(Event::Insert("a1"));
4747 c1.insert(KEY, "a1");
4748 })
4749 };
4750
4751 // 0800: Insert value a2 (waiting) (A-A2)
4752 let thread2 = {
4753 let a2 = Arc::clone(&actual);
4754 let c2 = cache.clone();
4755 spawn(move || {
4756 sleep(Duration::from_millis(800));
4757 a2.lock().push(Event::Insert("a2"));
4758 c2.insert(KEY, "a2");
4759 })
4760 };
4761
4762 // 1100: Invalidate (waiting) (R-A2)
4763 let thread3 = {
4764 let a3 = Arc::clone(&actual);
4765 let c3 = cache.clone();
4766 spawn(move || {
4767 sleep(Duration::from_millis(1100));
4768 a3.lock().push(Event::Invalidate("a2"));
4769 c3.invalidate(&KEY);
4770 })
4771 };
4772
4773 for t in [thread1, thread2, thread3] {
4774 t.join().expect("Failed to join");
4775 }
4776
4777 let actual = actual.lock();
4778 assert_eq!(actual.len(), expected.len());
4779
4780 for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
4781 assert_eq!(actual, expected, "expected[{i}]");
4782 }
4783
4784 assert!(cache.key_locks_map_is_empty());
4785 }
4786
4787 // When the eviction listener is not set, calling `run_pending_tasks` once should
4788 // evict all entries that can be removed.
4789 #[test]
4790 fn no_batch_size_limit_on_eviction() {
4791 const MAX_CAPACITY: u64 = 20;
4792
4793 const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
4794 const MAX_LOG_SYNC_REPEATS: u32 = 1;
4795 const EVICTION_BATCH_SIZE: u32 = 1;
4796
4797 let hk_conf = HousekeeperConfig::new(
4798 // Timeout should be ignored when the eviction listener is not provided.
4799 Some(EVICTION_TIMEOUT),
4800 Some(MAX_LOG_SYNC_REPEATS),
4801 Some(EVICTION_BATCH_SIZE),
4802 );
4803
4804 // Create a cache with the LRU policy.
4805 let mut cache = Cache::builder()
4806 .max_capacity(MAX_CAPACITY)
4807 .eviction_policy(EvictionPolicy::lru())
4808 .housekeeper_config(hk_conf)
4809 .build();
4810 cache.reconfigure_for_testing();
4811
4812 // Make the cache exterior immutable.
4813 let cache = cache;
4814
4815 // Fill the cache.
4816 for i in 0..MAX_CAPACITY {
4817 let v = format!("v{i}");
4818 cache.insert(i, v)
4819 }
4820 // The max capacity should not change because we have not called
4821 // `run_pending_tasks` yet.
4822 assert_eq!(cache.entry_count(), 0);
4823
4824 cache.run_pending_tasks();
4825 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4826
4827 // Insert more items to the cache.
4828 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
4829 let v = format!("v{i}");
4830 cache.insert(i, v)
4831 }
4832 // The max capacity should not change because we have not called
4833 // `run_pending_tasks` yet.
4834 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4835 // Both old and new keys should exist.
4836 assert!(cache.contains_key(&0)); // old
4837 assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
4838 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
4839
4840 // Process the remaining write op logs (there should be MAX_CAPACITY logs),
4841 // and evict the LRU entries.
4842 cache.run_pending_tasks();
4843 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4844
4845 // Now all the old keys should be gone.
4846 assert!(!cache.contains_key(&0));
4847 assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
4848 // And the new keys should exist.
4849 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
4850 }
4851
4852 #[test]
4853 fn slow_eviction_listener() {
4854 const MAX_CAPACITY: u64 = 20;
4855
4856 const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
4857 const LISTENER_DELAY: Duration = Duration::from_millis(11);
4858 const MAX_LOG_SYNC_REPEATS: u32 = 1;
4859 const EVICTION_BATCH_SIZE: u32 = 1;
4860
4861 let hk_conf = HousekeeperConfig::new(
4862 Some(EVICTION_TIMEOUT),
4863 Some(MAX_LOG_SYNC_REPEATS),
4864 Some(EVICTION_BATCH_SIZE),
4865 );
4866
4867 let (clock, mock) = Clock::mock();
4868 let listener_call_count = Arc::new(AtomicU8::new(0));
4869 let lcc = Arc::clone(&listener_call_count);
4870
4871 // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
4872 // notification.
4873 let listener = move |_k, _v, _cause| {
4874 mock.increment(LISTENER_DELAY);
4875 lcc.fetch_add(1, Ordering::AcqRel);
4876 };
4877
4878 // Create a cache with the LRU policy.
4879 let mut cache = Cache::builder()
4880 .max_capacity(MAX_CAPACITY)
4881 .eviction_policy(EvictionPolicy::lru())
4882 .eviction_listener(listener)
4883 .housekeeper_config(hk_conf)
4884 .clock(clock)
4885 .build();
4886 cache.reconfigure_for_testing();
4887
4888 // Make the cache exterior immutable.
4889 let cache = cache;
4890
4891 // Fill the cache.
4892 for i in 0..MAX_CAPACITY {
4893 let v = format!("v{i}");
4894 cache.insert(i, v)
4895 }
4896 // The max capacity should not change because we have not called
4897 // `run_pending_tasks` yet.
4898 assert_eq!(cache.entry_count(), 0);
4899
4900 cache.run_pending_tasks();
4901 assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
4902 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4903
4904 // Insert more items to the cache.
4905 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
4906 let v = format!("v{i}");
4907 cache.insert(i, v);
4908 }
4909 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4910
4911 cache.run_pending_tasks();
4912 // Because of the slow listener, cache should get an over capacity.
4913 let mut expected_call_count = 3;
4914 assert_eq!(
4915 listener_call_count.load(Ordering::Acquire) as u64,
4916 expected_call_count
4917 );
4918 assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
4919
4920 loop {
4921 cache.run_pending_tasks();
4922
4923 expected_call_count += 3;
4924 if expected_call_count > MAX_CAPACITY {
4925 expected_call_count = MAX_CAPACITY;
4926 }
4927
4928 let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
4929 assert_eq!(actual_count, expected_call_count);
4930 let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
4931 assert_eq!(cache.entry_count(), expected_entry_count);
4932
4933 if expected_call_count >= MAX_CAPACITY {
4934 break;
4935 }
4936 }
4937
4938 assert_eq!(cache.entry_count(), MAX_CAPACITY);
4939 }
4940
4941 // NOTE: To enable the panic logging, run the following command:
4942 //
4943 // RUST_LOG=moka=info cargo test --features 'logging' -- \
4944 // sync::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
4945 //
4946 #[test]
4947 fn recover_from_panicking_eviction_listener() {
4948 #[cfg(feature = "logging")]
4949 let _ = env_logger::builder().is_test(true).try_init();
4950
4951 // The following `Vec`s will hold actual and expected notifications.
4952 let actual = Arc::new(Mutex::new(Vec::new()));
4953 let mut expected = Vec::new();
4954
4955 // Create an eviction listener that panics when it see
4956 // a value "panic now!".
4957 let a1 = Arc::clone(&actual);
4958 let listener = move |k, v, cause| {
4959 if v == "panic now!" {
4960 panic!("Panic now!");
4961 }
4962 a1.lock().push((k, v, cause))
4963 };
4964
4965 // Create a cache with the eviction listener.
4966 let mut cache = Cache::builder()
4967 .name("My Sync Cache")
4968 .eviction_listener(listener)
4969 .build();
4970 cache.reconfigure_for_testing();
4971
4972 // Make the cache exterior immutable.
4973 let cache = cache;
4974
4975 // Insert an okay value.
4976 cache.insert("alice", "a0");
4977 cache.run_pending_tasks();
4978
4979 // Insert a value that will cause the eviction listener to panic.
4980 cache.insert("alice", "panic now!");
4981 expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
4982 cache.run_pending_tasks();
4983
4984 // Insert an okay value. This will replace the previous
4985 // value "panic now!" so the eviction listener will panic.
4986 cache.insert("alice", "a2");
4987 cache.run_pending_tasks();
4988 // No more removal notification should be sent.
4989
4990 // Invalidate the okay value.
4991 cache.invalidate(&"alice");
4992 cache.run_pending_tasks();
4993
4994 verify_notification_vec(&cache, actual, &expected);
4995 }
4996
4997 // This test ensures that the `contains_key`, `get` and `invalidate` can use
4998 // borrowed form `&[u8]` for key with type `Vec<u8>`.
4999 // https://github.com/moka-rs/moka/issues/166
5000 #[test]
5001 fn borrowed_forms_of_key() {
5002 let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5003
5004 let key = vec![1_u8];
5005 cache.insert(key.clone(), ());
5006
5007 // key as &Vec<u8>
5008 let key_v: &Vec<u8> = &key;
5009 assert!(cache.contains_key(key_v));
5010 assert_eq!(cache.get(key_v), Some(()));
5011 cache.invalidate(key_v);
5012
5013 cache.insert(key, ());
5014
5015 // key as &[u8]
5016 let key_s: &[u8] = &[1_u8];
5017 assert!(cache.contains_key(key_s));
5018 assert_eq!(cache.get(key_s), Some(()));
5019 cache.invalidate(key_s);
5020 }
5021
5022 // Ignored by default. This test becomes unstable when run in parallel with
5023 // other tests.
5024 #[test]
5025 #[ignore]
5026 fn drop_value_immediately_after_eviction() {
5027 use crate::common::test_utils::{Counters, Value};
5028
5029 const MAX_CAPACITY: u32 = 500;
5030 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5031
5032 let counters = Arc::new(Counters::default());
5033 let counters1 = Arc::clone(&counters);
5034
5035 let listener = move |_k, _v, cause| match cause {
5036 RemovalCause::Size => counters1.incl_evicted(),
5037 RemovalCause::Explicit => counters1.incl_invalidated(),
5038 _ => (),
5039 };
5040
5041 let mut cache = Cache::builder()
5042 .max_capacity(MAX_CAPACITY as u64)
5043 .eviction_listener(listener)
5044 .build();
5045 cache.reconfigure_for_testing();
5046
5047 // Make the cache exterior immutable.
5048 let cache = cache;
5049
5050 for key in 0..KEYS {
5051 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5052 cache.insert(key, value);
5053 counters.incl_inserted();
5054 cache.run_pending_tasks();
5055 }
5056
5057 let eviction_count = KEYS - MAX_CAPACITY;
5058
5059 cache.run_pending_tasks();
5060 assert_eq!(counters.inserted(), KEYS, "inserted");
5061 assert_eq!(counters.value_created(), KEYS, "value_created");
5062 assert_eq!(counters.evicted(), eviction_count, "evicted");
5063 assert_eq!(counters.invalidated(), 0, "invalidated");
5064 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5065
5066 for key in 0..KEYS {
5067 cache.invalidate(&key);
5068 cache.run_pending_tasks();
5069 }
5070
5071 cache.run_pending_tasks();
5072 assert_eq!(counters.inserted(), KEYS, "inserted");
5073 assert_eq!(counters.value_created(), KEYS, "value_created");
5074 assert_eq!(counters.evicted(), eviction_count, "evicted");
5075 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5076 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5077
5078 std::mem::drop(cache);
5079 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5080 }
5081
5082 // For testing the issue reported by: https://github.com/moka-rs/moka/issues/383
5083 //
5084 // Ignored by default. This test becomes unstable when run in parallel with
5085 // other tests.
5086 #[test]
5087 #[ignore]
5088 fn ensure_gc_runs_when_dropping_cache() {
5089 let cache = Cache::builder().build();
5090 let val = Arc::new(0);
5091 {
5092 let val = Arc::clone(&val);
5093 cache.get_with(1, move || val);
5094 }
5095 drop(cache);
5096 assert_eq!(Arc::strong_count(&val), 1);
5097 }
5098
5099 #[test]
5100 fn test_debug_format() {
5101 let cache = Cache::new(10);
5102 cache.insert('a', "alice");
5103 cache.insert('b', "bob");
5104 cache.insert('c', "cindy");
5105
5106 let debug_str = format!("{cache:?}");
5107 assert!(debug_str.starts_with('{'));
5108 assert!(debug_str.contains(r#"'a': "alice""#));
5109 assert!(debug_str.contains(r#"'b': "bob""#));
5110 assert!(debug_str.contains(r#"'c': "cindy""#));
5111 assert!(debug_str.ends_with('}'));
5112 }
5113
5114 type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5115
5116 fn verify_notification_vec<K, V, S>(
5117 cache: &Cache<K, V, S>,
5118 actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5119 expected: &[NotificationTuple<K, V>],
5120 ) where
5121 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5122 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5123 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5124 {
5125 // Retries will be needed when testing in a QEMU VM.
5126 const MAX_RETRIES: usize = 5;
5127 let mut retries = 0;
5128 loop {
5129 // Ensure all scheduled notifications have been processed.
5130 cache.run_pending_tasks();
5131 std::thread::sleep(Duration::from_millis(500));
5132
5133 let actual = &*actual.lock();
5134 if actual.len() != expected.len() {
5135 if retries <= MAX_RETRIES {
5136 retries += 1;
5137 continue;
5138 } else {
5139 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5140 }
5141 }
5142
5143 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5144 assert_eq!(actual, expected, "expected[{i}]");
5145 }
5146
5147 break;
5148 }
5149 }
5150}