moka/sync/
segment.rs

1use equivalent::Equivalent;
2
3use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
4use crate::common::concurrent::Weigher;
5use crate::common::time::Clock;
6use crate::{
7    common::{
8        iter::{Iter, ScanningGet},
9        HousekeeperConfig,
10    },
11    notification::EvictionListener,
12    policy::{EvictionPolicy, ExpirationPolicy},
13    Entry, Policy, PredicateError,
14};
15
16use std::{
17    collections::hash_map::RandomState,
18    fmt,
19    hash::{BuildHasher, Hash},
20    sync::Arc,
21};
22
23/// A thread-safe concurrent in-memory cache, with multiple internal segments.
24///
25/// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for
26/// increased concurrent update performance. However, it has little overheads on
27/// retrievals and updates for managing these segments.
28///
29/// For usage examples, see the document of the [`Cache`][cache-struct].
30///
31/// [cache-struct]: ./struct.Cache.html
32///
33pub struct SegmentedCache<K, V, S = RandomState> {
34    inner: Arc<Inner<K, V, S>>,
35}
36
37unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
38where
39    K: Send + Sync,
40    V: Send + Sync,
41    S: Send,
42{
43}
44
45unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
46where
47    K: Send + Sync,
48    V: Send + Sync,
49    S: Sync,
50{
51}
52
53impl<K, V, S> Clone for SegmentedCache<K, V, S> {
54    /// Makes a clone of this shared cache.
55    ///
56    /// This operation is cheap as it only creates thread-safe reference counted
57    /// pointers to the shared internal data structures.
58    fn clone(&self) -> Self {
59        Self {
60            inner: Arc::clone(&self.inner),
61        }
62    }
63}
64
65impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
66where
67    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
68    V: fmt::Debug + Clone + Send + Sync + 'static,
69    // TODO: Remove these bounds from S.
70    S: BuildHasher + Clone + Send + Sync + 'static,
71{
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        let mut d_map = f.debug_map();
74
75        for (k, v) in self {
76            d_map.entry(&k, &v);
77        }
78
79        d_map.finish()
80    }
81}
82
83impl<K, V> SegmentedCache<K, V, RandomState>
84where
85    K: Hash + Eq + Send + Sync + 'static,
86    V: Clone + Send + Sync + 'static,
87{
88    /// Constructs a new `SegmentedCache<K, V>` that has multiple internal
89    /// segments and will store up to the `max_capacity`.
90    ///
91    /// To adjust various configuration knobs such as `initial_capacity` or
92    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
93    ///
94    /// [builder-struct]: ./struct.CacheBuilder.html
95    ///
96    /// # Panics
97    ///
98    /// Panics if `num_segments` is 0.
99    pub fn new(max_capacity: u64, num_segments: usize) -> Self {
100        let build_hasher = RandomState::default();
101        Self::with_everything(
102            None,
103            Some(max_capacity),
104            None,
105            num_segments,
106            build_hasher,
107            None,
108            EvictionPolicy::default(),
109            None,
110            ExpirationPolicy::default(),
111            HousekeeperConfig::default(),
112            false,
113            Clock::default(),
114        )
115    }
116
117    /// Returns a [`CacheBuilder`][builder-struct], which can builds a
118    /// `SegmentedCache` with various configuration knobs.
119    ///
120    /// [builder-struct]: ./struct.CacheBuilder.html
121    pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
122        CacheBuilder::default().segments(num_segments)
123    }
124}
125
126impl<K, V, S> SegmentedCache<K, V, S> {
127    /// Returns cache’s name.
128    pub fn name(&self) -> Option<&str> {
129        self.inner.segments[0].name()
130    }
131
132    /// Returns a read-only cache policy of this cache.
133    ///
134    /// At this time, cache policy cannot be modified after cache creation.
135    /// A future version may support to modify it.
136    pub fn policy(&self) -> Policy {
137        let mut policy = self.inner.segments[0].policy();
138        policy.set_max_capacity(self.inner.desired_capacity);
139        policy.set_num_segments(self.inner.segments.len());
140        policy
141    }
142
143    /// Returns an approximate number of entries in this cache.
144    ///
145    /// The value returned is _an estimate_; the actual count may differ if there are
146    /// concurrent insertions or removals, or if some entries are pending removal due
147    /// to expiration. This inaccuracy can be mitigated by performing a
148    /// `run_pending_tasks` first.
149    ///
150    /// # Example
151    ///
152    /// ```rust
153    /// use moka::sync::SegmentedCache;
154    ///
155    /// let cache = SegmentedCache::new(10, 4);
156    /// cache.insert('n', "Netherland Dwarf");
157    /// cache.insert('l', "Lop Eared");
158    /// cache.insert('d', "Dutch");
159    ///
160    /// // Ensure an entry exists.
161    /// assert!(cache.contains_key(&'n'));
162    ///
163    /// // However, followings may print stale number zeros instead of threes.
164    /// println!("{}", cache.entry_count());   // -> 0
165    /// println!("{}", cache.weighted_size()); // -> 0
166    ///
167    /// // To mitigate the inaccuracy, call `run_pending_tasks` method to run
168    /// // pending internal tasks.
169    /// cache.run_pending_tasks();
170    ///
171    /// // Followings will print the actual numbers.
172    /// println!("{}", cache.entry_count());   // -> 3
173    /// println!("{}", cache.weighted_size()); // -> 3
174    /// ```
175    ///
176    pub fn entry_count(&self) -> u64 {
177        self.inner
178            .segments
179            .iter()
180            .map(|seg| seg.entry_count())
181            .sum()
182    }
183
184    /// Returns an approximate total weighted size of entries in this cache.
185    ///
186    /// The value returned is _an estimate_; the actual size may differ if there are
187    /// concurrent insertions or removals, or if some entries are pending removal due
188    /// to expiration. This inaccuracy can be mitigated by performing a
189    /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
190    /// sample code.
191    pub fn weighted_size(&self) -> u64 {
192        self.inner
193            .segments
194            .iter()
195            .map(|seg| seg.weighted_size())
196            .sum()
197    }
198}
199
200impl<K, V, S> SegmentedCache<K, V, S>
201where
202    K: Hash + Eq + Send + Sync + 'static,
203    V: Clone + Send + Sync + 'static,
204    S: BuildHasher + Clone + Send + Sync + 'static,
205{
206    /// # Panics
207    ///
208    /// Panics if `num_segments` is 0.
209    #[allow(clippy::too_many_arguments)]
210    pub(crate) fn with_everything(
211        name: Option<String>,
212        max_capacity: Option<u64>,
213        initial_capacity: Option<usize>,
214        num_segments: usize,
215        build_hasher: S,
216        weigher: Option<Weigher<K, V>>,
217        eviction_policy: EvictionPolicy,
218        eviction_listener: Option<EvictionListener<K, V>>,
219        expiration_policy: ExpirationPolicy<K, V>,
220        housekeeper_config: HousekeeperConfig,
221        invalidator_enabled: bool,
222        clock: Clock,
223    ) -> Self {
224        Self {
225            inner: Arc::new(Inner::new(
226                name,
227                max_capacity,
228                initial_capacity,
229                num_segments,
230                build_hasher,
231                weigher,
232                eviction_policy,
233                eviction_listener,
234                expiration_policy,
235                housekeeper_config,
236                invalidator_enabled,
237                clock,
238            )),
239        }
240    }
241
242    /// Returns `true` if the cache contains a value for the key.
243    ///
244    /// Unlike the `get` method, this method is not considered a cache read operation,
245    /// so it does not update the historic popularity estimator or reset the idle
246    /// timer for the key.
247    ///
248    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
249    /// on the borrowed form _must_ match those for the key type.
250    pub fn contains_key<Q>(&self, key: &Q) -> bool
251    where
252        Q: Equivalent<K> + Hash + ?Sized,
253    {
254        let hash = self.inner.hash(key);
255        self.inner.select(hash).contains_key_with_hash(key, hash)
256    }
257
258    /// Returns a _clone_ of the value corresponding to the key.
259    ///
260    /// If you want to store values that will be expensive to clone, wrap them by
261    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
262    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
263    ///
264    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
265    /// on the borrowed form _must_ match those for the key type.
266    ///
267    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
268    pub fn get<Q>(&self, key: &Q) -> Option<V>
269    where
270        Q: Equivalent<K> + Hash + Eq + ?Sized,
271    {
272        let hash = self.inner.hash(key);
273        self.inner
274            .select(hash)
275            .get_with_hash(key, hash, false)
276            .map(Entry::into_value)
277    }
278
279    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280    where
281        K: Hash + Eq,
282    {
283        let hash = self.inner.hash(&key);
284        let cache = self.inner.select(hash);
285        OwnedKeyEntrySelector::new(key, hash, cache)
286    }
287
288    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289    where
290        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
291    {
292        let hash = self.inner.hash(key);
293        let cache = self.inner.select(hash);
294        RefKeyEntrySelector::new(key, hash, cache)
295    }
296
297    /// TODO: Remove this in v0.13.0.
298    /// Deprecated, replaced with [`get_with`](#method.get_with)
299    #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
300    pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
301        self.get_with(key, init)
302    }
303
304    /// TODO: Remove this in v0.13.0.
305    /// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
306    #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
307    pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
308    where
309        F: FnOnce() -> Result<V, E>,
310        E: Send + Sync + 'static,
311    {
312        self.try_get_with(key, init)
313    }
314
315    /// Returns a _clone_ of the value corresponding to the key. If the value does
316    /// not exist, evaluates the `init` closure and inserts the output.
317    ///
318    /// # Concurrent calls on the same key
319    ///
320    /// This method guarantees that concurrent calls on the same not-existing key are
321    /// coalesced into one evaluation of the `init` closure. Only one of the calls
322    /// evaluates its closure, and other calls wait for that closure to complete. See
323    /// [`Cache::get_with`][get-with-method] for more details.
324    ///
325    /// [get-with-method]: ./struct.Cache.html#method.get_with
326    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
327        let hash = self.inner.hash(&key);
328        let key = Arc::new(key);
329        let replace_if = None as Option<fn(&V) -> bool>;
330        self.inner
331            .select(hash)
332            .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
333            .into_value()
334    }
335
336    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
337    /// key, you can pass a reference to the key. If the key does not exist in the
338    /// cache, the key will be cloned to create new entry in the cache.
339    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
340    where
341        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
342    {
343        let hash = self.inner.hash(key);
344        let replace_if = None as Option<fn(&V) -> bool>;
345        self.inner
346            .select(hash)
347            .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
348            .into_value()
349    }
350
351    /// Works like [`get_with`](#method.get_with), but takes an additional
352    /// `replace_if` closure.
353    ///
354    /// This method will evaluate the `init` closure and insert the output to the
355    /// cache when:
356    ///
357    /// - The key does not exist.
358    /// - Or, `replace_if` closure returns `true`.
359    pub fn get_with_if(
360        &self,
361        key: K,
362        init: impl FnOnce() -> V,
363        replace_if: impl FnMut(&V) -> bool,
364    ) -> V {
365        let hash = self.inner.hash(&key);
366        let key = Arc::new(key);
367        self.inner
368            .select(hash)
369            .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
370            .into_value()
371    }
372
373    /// Returns a _clone_ of the value corresponding to the key. If the value does
374    /// not exist, evaluates the `init` closure, and inserts the value if
375    /// `Some(value)` was returned. If `None` was returned from the closure, this
376    /// method does not insert a value and returns `None`.
377    ///
378    /// # Concurrent calls on the same key
379    ///
380    /// This method guarantees that concurrent calls on the same not-existing key are
381    /// coalesced into one evaluation of the `init` closure. Only one of the calls
382    /// evaluates its closure, and other calls wait for that closure to complete.
383    /// See [`Cache::optionally_get_with`][opt-get-with-method] for more details.
384    ///
385    /// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with
386    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
387    where
388        F: FnOnce() -> Option<V>,
389    {
390        let hash = self.inner.hash(&key);
391        let key = Arc::new(key);
392        self.inner
393            .select(hash)
394            .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
395            .map(Entry::into_value)
396    }
397
398    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
399    /// of passing an owned key, you can pass a reference to the key. If the key does
400    /// not exist in the cache, the key will be cloned to create new entry in the
401    /// cache.
402    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
403    where
404        F: FnOnce() -> Option<V>,
405        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
406    {
407        let hash = self.inner.hash(key);
408        self.inner
409            .select(hash)
410            .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
411            .map(Entry::into_value)
412    }
413
414    /// Returns a _clone_ of the value corresponding to the key. If the value does
415    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
416    /// was returned. If `Err(_)` was returned from the closure, this method does not
417    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
418    ///
419    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
420    ///
421    /// # Concurrent calls on the same key
422    ///
423    /// This method guarantees that concurrent calls on the same not-existing key are
424    /// coalesced into one evaluation of the `init` closure (as long as these
425    /// closures return the same error type). Only one of the calls evaluates its
426    /// closure, and other calls wait for that closure to complete. See
427    /// [`Cache::try_get_with`][try-get-with-method] for more details.
428    ///
429    /// [try-get-with-method]: ./struct.Cache.html#method.try_get_with
430    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
431    where
432        F: FnOnce() -> Result<V, E>,
433        E: Send + Sync + 'static,
434    {
435        let hash = self.inner.hash(&key);
436        let key = Arc::new(key);
437        self.inner
438            .select(hash)
439            .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
440            .map(Entry::into_value)
441    }
442
443    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
444    /// owned key, you can pass a reference to the key. If the key does not exist in
445    /// the cache, the key will be cloned to create new entry in the cache.
446    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
447    where
448        F: FnOnce() -> Result<V, E>,
449        E: Send + Sync + 'static,
450        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
451    {
452        let hash = self.inner.hash(key);
453        self.inner
454            .select(hash)
455            .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
456            .map(Entry::into_value)
457    }
458
459    /// Inserts a key-value pair into the cache.
460    ///
461    /// If the cache has this key present, the value is updated.
462    pub fn insert(&self, key: K, value: V) {
463        let hash = self.inner.hash(&key);
464        let key = Arc::new(key);
465        self.inner.select(hash).insert_with_hash(key, hash, value);
466    }
467
468    /// Discards any cached value for the key.
469    ///
470    /// If you need to get a the value that has been discarded, use the
471    /// [`remove`](#method.remove) method instead.
472    ///
473    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
474    /// on the borrowed form _must_ match those for the key type.
475    pub fn invalidate<Q>(&self, key: &Q)
476    where
477        Q: Equivalent<K> + Hash + ?Sized,
478    {
479        let hash = self.inner.hash(key);
480        self.inner
481            .select(hash)
482            .invalidate_with_hash(key, hash, false);
483    }
484
485    /// Discards any cached value for the key and returns a clone of the value.
486    ///
487    /// If you do not need to get the value that has been discarded, use the
488    /// [`invalidate`](#method.invalidate) method instead.
489    ///
490    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
491    /// on the borrowed form _must_ match those for the key type.
492    pub fn remove<Q>(&self, key: &Q) -> Option<V>
493    where
494        Q: Equivalent<K> + Hash + ?Sized,
495    {
496        let hash = self.inner.hash(key);
497        self.inner
498            .select(hash)
499            .invalidate_with_hash(key, hash, true)
500    }
501
502    /// Discards all cached values.
503    ///
504    /// This method returns immediately by just setting the current time as the
505    /// invalidation time. `get` and other retrieval methods are guaranteed not to
506    /// return the entries inserted before or at the invalidation time.
507    ///
508    /// The actual removal of the invalidated entries is done as a maintenance task
509    /// driven by a user thread. For more details, see
510    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
511    /// level documentation.
512    ///
513    /// Like the `invalidate` method, this method does not clear the historic
514    /// popularity estimator of keys so that it retains the client activities of
515    /// trying to retrieve an item.
516    pub fn invalidate_all(&self) {
517        for segment in self.inner.segments.iter() {
518            segment.invalidate_all();
519        }
520    }
521
522    /// Discards cached values that satisfy a predicate.
523    ///
524    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
525    /// closure is called against each cached entry inserted before or at the time
526    /// when this method was called. If the closure returns `true` that entry will be
527    /// evicted from the cache.
528    ///
529    /// This method returns immediately by not actually removing the invalidated
530    /// entries. Instead, it just sets the predicate to the cache with the time when
531    /// this method was called. The actual removal of the invalidated entries is done
532    /// as a maintenance task driven by a user thread. For more details, see
533    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
534    /// level documentation.
535    ///
536    /// Also the `get` and other retrieval methods will apply the closure to a cached
537    /// entry to determine if it should have been invalidated. Therefore, it is
538    /// guaranteed that these methods must not return invalidated values.
539    ///
540    /// Note that you must call
541    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
542    /// at the cache creation time as the cache needs to maintain additional internal
543    /// data structures to support this method. Otherwise, calling this method will
544    /// fail with a
545    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
546    ///
547    /// Like the `invalidate` method, this method does not clear the historic
548    /// popularity estimator of keys so that it retains the client activities of
549    /// trying to retrieve an item.
550    ///
551    /// [support-invalidation-closures]:
552    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
553    /// [invalidation-disabled-error]:
554    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
555    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
556    where
557        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
558    {
559        let pred = Arc::new(predicate);
560        for segment in self.inner.segments.iter() {
561            segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
562        }
563        Ok(())
564    }
565
566    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
567    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
568    /// value.
569    ///
570    /// Iterators do not block concurrent reads and writes on the cache. An entry can
571    /// be inserted to, invalidated or evicted from a cache while iterators are alive
572    /// on the same cache.
573    ///
574    /// Unlike the `get` method, visiting entries via an iterator do not update the
575    /// historic popularity estimator or reset idle timers for keys.
576    ///
577    /// # Guarantees
578    ///
579    /// In order to allow concurrent access to the cache, iterator's `next` method
580    /// does _not_ guarantee the following:
581    ///
582    /// - It does not guarantee to return a key-value pair (an entry) if its key has
583    ///   been inserted to the cache _after_ the iterator was created.
584    ///   - Such an entry may or may not be returned depending on key's hash and
585    ///     timing.
586    ///
587    /// and the `next` method guarantees the followings:
588    ///
589    /// - It guarantees not to return the same entry more than once.
590    /// - It guarantees not to return an entry if it has been removed from the cache
591    ///   after the iterator was created.
592    ///     - Note: An entry can be removed by following reasons:
593    ///         - Manually invalidated.
594    ///         - Expired (e.g. time-to-live).
595    ///         - Evicted as the cache capacity exceeded.
596    ///
597    /// # Examples
598    ///
599    /// ```rust
600    /// use moka::sync::SegmentedCache;
601    ///
602    /// let cache = SegmentedCache::new(100, 4);
603    /// cache.insert("Julia", 14);
604    ///
605    /// let mut iter = cache.iter();
606    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
607    /// assert_eq!(*k, "Julia");
608    /// assert_eq!(v, 14);
609    ///
610    /// assert!(iter.next().is_none());
611    /// ```
612    ///
613    pub fn iter(&self) -> Iter<'_, K, V> {
614        let num_cht_segments = self.inner.segments[0].num_cht_segments();
615        let segments = self
616            .inner
617            .segments
618            .iter()
619            .map(|c| c as &dyn ScanningGet<_, _>)
620            .collect::<Vec<_>>()
621            .into_boxed_slice();
622        Iter::with_multiple_cache_segments(segments, num_cht_segments)
623    }
624
625    /// Performs any pending maintenance operations needed by the cache.
626    pub fn run_pending_tasks(&self) {
627        for segment in self.inner.segments.iter() {
628            segment.run_pending_tasks();
629        }
630    }
631}
632
633impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
634where
635    K: Hash + Eq + Send + Sync + 'static,
636    V: Clone + Send + Sync + 'static,
637    S: BuildHasher + Clone + Send + Sync + 'static,
638{
639    type Item = (Arc<K>, V);
640
641    type IntoIter = Iter<'a, K, V>;
642
643    fn into_iter(self) -> Self::IntoIter {
644        self.iter()
645    }
646}
647
648// For unit tests.
649#[cfg(test)]
650impl<K, V, S> SegmentedCache<K, V, S> {
651    fn is_waiter_map_empty(&self) -> bool {
652        self.inner.segments.iter().all(Cache::is_waiter_map_empty)
653    }
654}
655
656#[cfg(test)]
657impl<K, V, S> SegmentedCache<K, V, S>
658where
659    K: Hash + Eq + Send + Sync + 'static,
660    V: Clone + Send + Sync + 'static,
661    S: BuildHasher + Clone + Send + Sync + 'static,
662{
663    fn invalidation_predicate_count(&self) -> usize {
664        self.inner
665            .segments
666            .iter()
667            .map(|seg| seg.invalidation_predicate_count())
668            .sum()
669    }
670
671    fn reconfigure_for_testing(&mut self) {
672        let inner = Arc::get_mut(&mut self.inner)
673            .expect("There are other strong reference to self.inner Arc");
674
675        for segment in inner.segments.iter_mut() {
676            segment.reconfigure_for_testing();
677        }
678    }
679
680    fn key_locks_map_is_empty(&self) -> bool {
681        self.inner
682            .segments
683            .iter()
684            .all(|seg| seg.key_locks_map_is_empty())
685    }
686}
687
688struct Inner<K, V, S> {
689    desired_capacity: Option<u64>,
690    segments: Box<[Cache<K, V, S>]>,
691    build_hasher: S,
692    segment_shift: u32,
693}
694
695impl<K, V, S> Inner<K, V, S>
696where
697    K: Hash + Eq + Send + Sync + 'static,
698    V: Clone + Send + Sync + 'static,
699    S: BuildHasher + Clone + Send + Sync + 'static,
700{
701    /// # Panics
702    ///
703    /// Panics if `num_segments` is 0.
704    #[allow(clippy::too_many_arguments)]
705    fn new(
706        name: Option<String>,
707        max_capacity: Option<u64>,
708        initial_capacity: Option<usize>,
709        num_segments: usize,
710        build_hasher: S,
711        weigher: Option<Weigher<K, V>>,
712        eviction_policy: EvictionPolicy,
713        eviction_listener: Option<EvictionListener<K, V>>,
714        expiration_policy: ExpirationPolicy<K, V>,
715        housekeeper_config: HousekeeperConfig,
716        invalidator_enabled: bool,
717        clock: Clock,
718    ) -> Self {
719        assert!(num_segments > 0);
720
721        let actual_num_segments = num_segments.next_power_of_two();
722        let segment_shift = 64 - actual_num_segments.trailing_zeros();
723        let seg_max_capacity =
724            max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
725        let seg_init_capacity =
726            initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
727        // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]`
728        // because Cache::clone() does not clone its inner but shares the same inner.
729        let segments = (0..actual_num_segments)
730            .map(|_| {
731                Cache::with_everything(
732                    name.clone(),
733                    seg_max_capacity,
734                    seg_init_capacity,
735                    build_hasher.clone(),
736                    weigher.clone(),
737                    eviction_policy.clone(),
738                    eviction_listener.clone(),
739                    expiration_policy.clone(),
740                    housekeeper_config.clone(),
741                    invalidator_enabled,
742                    clock.clone(),
743                )
744            })
745            .collect::<Vec<_>>();
746
747        Self {
748            desired_capacity: max_capacity,
749            segments: segments.into_boxed_slice(),
750            build_hasher,
751            segment_shift,
752        }
753    }
754
755    #[inline]
756    fn hash<Q>(&self, key: &Q) -> u64
757    where
758        Q: Equivalent<K> + Hash + ?Sized,
759    {
760        self.build_hasher.hash_one(key)
761    }
762
763    #[inline]
764    fn select(&self, hash: u64) -> &Cache<K, V, S> {
765        let index = self.segment_index_from_hash(hash);
766        &self.segments[index]
767    }
768
769    #[inline]
770    fn segment_index_from_hash(&self, hash: u64) -> usize {
771        if self.segment_shift == 64 {
772            0
773        } else {
774            (hash >> self.segment_shift) as usize
775        }
776    }
777}
778
779#[cfg(test)]
780mod tests {
781    use super::SegmentedCache;
782    use crate::notification::RemovalCause;
783    use parking_lot::Mutex;
784    use std::{error::Error, fmt::Display, sync::Arc, time::Duration};
785
786    #[test]
787    fn max_capacity_zero() {
788        let mut cache = SegmentedCache::new(0, 1);
789        cache.reconfigure_for_testing();
790
791        // Make the cache exterior immutable.
792        let cache = cache;
793
794        cache.insert(0, ());
795
796        assert!(!cache.contains_key(&0));
797        assert!(cache.get(&0).is_none());
798        cache.run_pending_tasks();
799        assert!(!cache.contains_key(&0));
800        assert!(cache.get(&0).is_none());
801        assert_eq!(cache.entry_count(), 0)
802    }
803
804    #[test]
805    fn basic_single_thread() {
806        // The following `Vec`s will hold actual and expected notifications.
807        let actual = Arc::new(Mutex::new(Vec::new()));
808        let mut expected = Vec::new();
809
810        // Create an eviction listener.
811        let a1 = Arc::clone(&actual);
812        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
813
814        // Create a cache with the eviction listener.
815        let mut cache = SegmentedCache::builder(1)
816            .max_capacity(3)
817            .eviction_listener(listener)
818            .build();
819        cache.reconfigure_for_testing();
820
821        // Make the cache exterior immutable.
822        let cache = cache;
823
824        cache.insert("a", "alice");
825        cache.insert("b", "bob");
826        assert_eq!(cache.get(&"a"), Some("alice"));
827        assert!(cache.contains_key(&"a"));
828        assert!(cache.contains_key(&"b"));
829        assert_eq!(cache.get(&"b"), Some("bob"));
830        cache.run_pending_tasks();
831        // counts: a -> 1, b -> 1
832
833        cache.insert("c", "cindy");
834        assert_eq!(cache.get(&"c"), Some("cindy"));
835        assert!(cache.contains_key(&"c"));
836        // counts: a -> 1, b -> 1, c -> 1
837        cache.run_pending_tasks();
838
839        assert!(cache.contains_key(&"a"));
840        assert_eq!(cache.get(&"a"), Some("alice"));
841        assert_eq!(cache.get(&"b"), Some("bob"));
842        assert!(cache.contains_key(&"b"));
843        cache.run_pending_tasks();
844        // counts: a -> 2, b -> 2, c -> 1
845
846        // "d" should not be admitted because its frequency is too low.
847        cache.insert("d", "david"); //   count: d -> 0
848        expected.push((Arc::new("d"), "david", RemovalCause::Size));
849        cache.run_pending_tasks();
850        assert_eq!(cache.get(&"d"), None); //   d -> 1
851        assert!(!cache.contains_key(&"d"));
852
853        cache.insert("d", "david");
854        expected.push((Arc::new("d"), "david", RemovalCause::Size));
855        cache.run_pending_tasks();
856        assert!(!cache.contains_key(&"d"));
857        assert_eq!(cache.get(&"d"), None); //   d -> 2
858
859        // "d" should be admitted and "c" should be evicted
860        // because d's frequency is higher than c's.
861        cache.insert("d", "dennis");
862        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
863        cache.run_pending_tasks();
864        assert_eq!(cache.get(&"a"), Some("alice"));
865        assert_eq!(cache.get(&"b"), Some("bob"));
866        assert_eq!(cache.get(&"c"), None);
867        assert_eq!(cache.get(&"d"), Some("dennis"));
868        assert!(cache.contains_key(&"a"));
869        assert!(cache.contains_key(&"b"));
870        assert!(!cache.contains_key(&"c"));
871        assert!(cache.contains_key(&"d"));
872
873        cache.invalidate(&"b");
874        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
875        cache.run_pending_tasks();
876        assert_eq!(cache.get(&"b"), None);
877        assert!(!cache.contains_key(&"b"));
878
879        assert!(cache.remove(&"b").is_none());
880        assert_eq!(cache.remove(&"d"), Some("dennis"));
881        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
882        cache.run_pending_tasks();
883        assert_eq!(cache.get(&"d"), None);
884        assert!(!cache.contains_key(&"d"));
885
886        verify_notification_vec(&cache, actual, &expected);
887        assert!(cache.key_locks_map_is_empty());
888    }
889
890    #[test]
891    fn non_power_of_two_segments() {
892        let mut cache = SegmentedCache::new(100, 5);
893        cache.reconfigure_for_testing();
894
895        // Make the cache exterior immutable.
896        let cache = cache;
897
898        assert_eq!(cache.iter().count(), 0);
899
900        cache.insert("a", "alice");
901        cache.insert("b", "bob");
902        cache.insert("c", "cindy");
903
904        assert_eq!(cache.iter().count(), 3);
905        cache.run_pending_tasks();
906        assert_eq!(cache.iter().count(), 3);
907    }
908
909    #[test]
910    fn size_aware_eviction() {
911        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
912
913        let alice = ("alice", 10);
914        let bob = ("bob", 15);
915        let bill = ("bill", 20);
916        let cindy = ("cindy", 5);
917        let david = ("david", 15);
918        let dennis = ("dennis", 15);
919
920        // The following `Vec`s will hold actual and expected notifications.
921        let actual = Arc::new(Mutex::new(Vec::new()));
922        let mut expected = Vec::new();
923
924        // Create an eviction listener.
925        let a1 = Arc::clone(&actual);
926        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
927
928        // Create a cache with the eviction listener.
929        let mut cache = SegmentedCache::builder(1)
930            .max_capacity(31)
931            .weigher(weigher)
932            .eviction_listener(listener)
933            .build();
934        cache.reconfigure_for_testing();
935
936        // Make the cache exterior immutable.
937        let cache = cache;
938
939        cache.insert("a", alice);
940        cache.insert("b", bob);
941        assert_eq!(cache.get(&"a"), Some(alice));
942        assert!(cache.contains_key(&"a"));
943        assert!(cache.contains_key(&"b"));
944        assert_eq!(cache.get(&"b"), Some(bob));
945        cache.run_pending_tasks();
946        // order (LRU -> MRU) and counts: a -> 1, b -> 1
947
948        cache.insert("c", cindy);
949        assert_eq!(cache.get(&"c"), Some(cindy));
950        assert!(cache.contains_key(&"c"));
951        // order and counts: a -> 1, b -> 1, c -> 1
952        cache.run_pending_tasks();
953
954        assert!(cache.contains_key(&"a"));
955        assert_eq!(cache.get(&"a"), Some(alice));
956        assert_eq!(cache.get(&"b"), Some(bob));
957        assert!(cache.contains_key(&"b"));
958        cache.run_pending_tasks();
959        // order and counts: c -> 1, a -> 2, b -> 2
960
961        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
962        // "d" must have higher count than 3, which is the aggregated count
963        // of "a" and "c".
964        cache.insert("d", david); //   count: d -> 0
965        expected.push((Arc::new("d"), david, RemovalCause::Size));
966        cache.run_pending_tasks();
967        assert_eq!(cache.get(&"d"), None); //   d -> 1
968        assert!(!cache.contains_key(&"d"));
969
970        cache.insert("d", david);
971        expected.push((Arc::new("d"), david, RemovalCause::Size));
972        cache.run_pending_tasks();
973        assert!(!cache.contains_key(&"d"));
974        assert_eq!(cache.get(&"d"), None); //   d -> 2
975
976        cache.insert("d", david);
977        expected.push((Arc::new("d"), david, RemovalCause::Size));
978        cache.run_pending_tasks();
979        assert_eq!(cache.get(&"d"), None); //   d -> 3
980        assert!(!cache.contains_key(&"d"));
981
982        cache.insert("d", david);
983        expected.push((Arc::new("d"), david, RemovalCause::Size));
984        cache.run_pending_tasks();
985        assert!(!cache.contains_key(&"d"));
986        assert_eq!(cache.get(&"d"), None); //   d -> 4
987
988        // Finally "d" should be admitted by evicting "c" and "a".
989        cache.insert("d", dennis);
990        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
991        expected.push((Arc::new("a"), alice, RemovalCause::Size));
992        cache.run_pending_tasks();
993        assert_eq!(cache.get(&"a"), None);
994        assert_eq!(cache.get(&"b"), Some(bob));
995        assert_eq!(cache.get(&"c"), None);
996        assert_eq!(cache.get(&"d"), Some(dennis));
997        assert!(!cache.contains_key(&"a"));
998        assert!(cache.contains_key(&"b"));
999        assert!(!cache.contains_key(&"c"));
1000        assert!(cache.contains_key(&"d"));
1001
1002        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
1003        cache.insert("b", bill);
1004        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1005        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1006        cache.run_pending_tasks();
1007        assert_eq!(cache.get(&"b"), Some(bill));
1008        assert_eq!(cache.get(&"d"), None);
1009        assert!(cache.contains_key(&"b"));
1010        assert!(!cache.contains_key(&"d"));
1011
1012        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
1013        cache.insert("a", alice);
1014        cache.insert("b", bob);
1015        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1016        cache.run_pending_tasks();
1017        assert_eq!(cache.get(&"a"), Some(alice));
1018        assert_eq!(cache.get(&"b"), Some(bob));
1019        assert_eq!(cache.get(&"d"), None);
1020        assert!(cache.contains_key(&"a"));
1021        assert!(cache.contains_key(&"b"));
1022        assert!(!cache.contains_key(&"d"));
1023
1024        // Verify the sizes.
1025        assert_eq!(cache.entry_count(), 2);
1026        assert_eq!(cache.weighted_size(), 25);
1027
1028        verify_notification_vec(&cache, actual, &expected);
1029        assert!(cache.key_locks_map_is_empty());
1030    }
1031
1032    #[test]
1033    fn basic_multi_threads() {
1034        let num_threads = 4;
1035
1036        let mut cache = SegmentedCache::new(100, num_threads);
1037        cache.reconfigure_for_testing();
1038
1039        // Make the cache exterior immutable.
1040        let cache = cache;
1041
1042        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1043        #[allow(clippy::needless_collect)]
1044        let handles = (0..num_threads)
1045            .map(|id| {
1046                let cache = cache.clone();
1047                std::thread::spawn(move || {
1048                    cache.insert(10, format!("{id}-100"));
1049                    cache.get(&10);
1050                    cache.run_pending_tasks();
1051                    cache.insert(20, format!("{id}-200"));
1052                    cache.invalidate(&10);
1053                })
1054            })
1055            .collect::<Vec<_>>();
1056
1057        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1058
1059        cache.run_pending_tasks();
1060
1061        assert!(cache.get(&10).is_none());
1062        assert!(cache.get(&20).is_some());
1063        assert!(!cache.contains_key(&10));
1064        assert!(cache.contains_key(&20));
1065    }
1066
1067    #[test]
1068    fn invalidate_all() {
1069        use std::collections::HashMap;
1070
1071        // The following `HashMap`s will hold actual and expected notifications.
1072        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1073        let actual = Arc::new(Mutex::new(HashMap::new()));
1074        let mut expected = HashMap::new();
1075
1076        // Create an eviction listener.
1077        let a1 = Arc::clone(&actual);
1078        let listener = move |k, v, cause| {
1079            a1.lock().insert(k, (v, cause));
1080        };
1081
1082        // Create a cache with the eviction listener.
1083        let mut cache = SegmentedCache::builder(4)
1084            .max_capacity(100)
1085            .eviction_listener(listener)
1086            .build();
1087        cache.reconfigure_for_testing();
1088
1089        // Make the cache exterior immutable.
1090        let cache = cache;
1091
1092        cache.insert("a", "alice");
1093        cache.insert("b", "bob");
1094        cache.insert("c", "cindy");
1095        assert_eq!(cache.get(&"a"), Some("alice"));
1096        assert_eq!(cache.get(&"b"), Some("bob"));
1097        assert_eq!(cache.get(&"c"), Some("cindy"));
1098        assert!(cache.contains_key(&"a"));
1099        assert!(cache.contains_key(&"b"));
1100        assert!(cache.contains_key(&"c"));
1101
1102        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
1103        // modified timestamp of the entries were updated when they were inserted.
1104        // https://github.com/moka-rs/moka/issues/155
1105
1106        cache.invalidate_all();
1107        expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1108        expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1109        expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1110        cache.run_pending_tasks();
1111
1112        cache.insert("d", "david");
1113        cache.run_pending_tasks();
1114
1115        assert!(cache.get(&"a").is_none());
1116        assert!(cache.get(&"b").is_none());
1117        assert!(cache.get(&"c").is_none());
1118        assert_eq!(cache.get(&"d"), Some("david"));
1119        assert!(!cache.contains_key(&"a"));
1120        assert!(!cache.contains_key(&"b"));
1121        assert!(!cache.contains_key(&"c"));
1122        assert!(cache.contains_key(&"d"));
1123
1124        verify_notification_map(&cache, actual, &expected);
1125    }
1126
1127    #[test]
1128    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1129        use std::collections::{HashMap, HashSet};
1130
1131        const SEGMENTS: usize = 4;
1132
1133        // The following `HashMap`s will hold actual and expected notifications.
1134        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1135        let actual = Arc::new(Mutex::new(HashMap::new()));
1136        let mut expected = HashMap::new();
1137
1138        // Create an eviction listener.
1139        let a1 = Arc::clone(&actual);
1140        let listener = move |k, v, cause| {
1141            a1.lock().insert(k, (v, cause));
1142        };
1143
1144        let (clock, mock) = crate::common::time::Clock::mock();
1145
1146        // Create a cache with the eviction listener.
1147        let mut cache = SegmentedCache::builder(SEGMENTS)
1148            .max_capacity(100)
1149            .support_invalidation_closures()
1150            .eviction_listener(listener)
1151            .clock(clock)
1152            .build();
1153        cache.reconfigure_for_testing();
1154
1155        // Make the cache exterior immutable.
1156        let cache = cache;
1157
1158        cache.insert(0, "alice");
1159        cache.insert(1, "bob");
1160        cache.insert(2, "alex");
1161        cache.run_pending_tasks();
1162        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
1163        cache.run_pending_tasks();
1164
1165        assert_eq!(cache.get(&0), Some("alice"));
1166        assert_eq!(cache.get(&1), Some("bob"));
1167        assert_eq!(cache.get(&2), Some("alex"));
1168        assert!(cache.contains_key(&0));
1169        assert!(cache.contains_key(&1));
1170        assert!(cache.contains_key(&2));
1171
1172        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1173        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1174        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1175        expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1176        expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1177
1178        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
1179
1180        cache.insert(3, "alice");
1181
1182        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1183        cache.run_pending_tasks(); // To submit the invalidation task.
1184        std::thread::sleep(Duration::from_millis(200));
1185        cache.run_pending_tasks(); // To process the task result.
1186        std::thread::sleep(Duration::from_millis(200));
1187
1188        assert!(cache.get(&0).is_none());
1189        assert!(cache.get(&2).is_none());
1190        assert_eq!(cache.get(&1), Some("bob"));
1191        // This should survive as it was inserted after calling invalidate_entries_if.
1192        assert_eq!(cache.get(&3), Some("alice"));
1193
1194        assert!(!cache.contains_key(&0));
1195        assert!(cache.contains_key(&1));
1196        assert!(!cache.contains_key(&2));
1197        assert!(cache.contains_key(&3));
1198
1199        assert_eq!(cache.entry_count(), 2);
1200        assert_eq!(cache.invalidation_predicate_count(), 0);
1201
1202        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
1203
1204        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1205        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1206        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1207        expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1208        expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1209
1210        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1211        cache.run_pending_tasks(); // To submit the invalidation task.
1212        std::thread::sleep(Duration::from_millis(200));
1213        cache.run_pending_tasks(); // To process the task result.
1214        std::thread::sleep(Duration::from_millis(200));
1215
1216        assert!(cache.get(&1).is_none());
1217        assert!(cache.get(&3).is_none());
1218
1219        assert!(!cache.contains_key(&1));
1220        assert!(!cache.contains_key(&3));
1221
1222        assert_eq!(cache.entry_count(), 0);
1223        assert_eq!(cache.invalidation_predicate_count(), 0);
1224
1225        verify_notification_map(&cache, actual, &expected);
1226
1227        Ok(())
1228    }
1229
1230    #[test]
1231    fn test_iter() {
1232        const NUM_KEYS: usize = 50;
1233
1234        fn make_value(key: usize) -> String {
1235            format!("val: {key}")
1236        }
1237
1238        // let cache = SegmentedCache::builder(5)
1239        let cache = SegmentedCache::builder(4)
1240            .max_capacity(100)
1241            .time_to_idle(Duration::from_secs(10))
1242            .build();
1243
1244        for key in 0..NUM_KEYS {
1245            cache.insert(key, make_value(key));
1246        }
1247
1248        let mut key_set = std::collections::HashSet::new();
1249
1250        for (key, value) in &cache {
1251            assert_eq!(value, make_value(*key));
1252
1253            key_set.insert(*key);
1254        }
1255
1256        // Ensure there are no missing or duplicate keys in the iteration.
1257        assert_eq!(key_set.len(), NUM_KEYS);
1258    }
1259
1260    /// Runs 16 threads at the same time and ensures no deadlock occurs.
1261    ///
1262    /// - Eight of the threads will update key-values in the cache.
1263    /// - Eight others will iterate the cache.
1264    ///
1265    #[test]
1266    fn test_iter_multi_threads() {
1267        use std::collections::HashSet;
1268
1269        const NUM_KEYS: usize = 1024;
1270        const NUM_THREADS: usize = 16;
1271
1272        fn make_value(key: usize) -> String {
1273            format!("val: {key}")
1274        }
1275
1276        let cache = SegmentedCache::builder(4)
1277            .max_capacity(2048)
1278            .time_to_idle(Duration::from_secs(10))
1279            .build();
1280
1281        // Initialize the cache.
1282        for key in 0..NUM_KEYS {
1283            cache.insert(key, make_value(key));
1284        }
1285
1286        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1287        let write_lock = rw_lock.write().unwrap();
1288
1289        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1290        #[allow(clippy::needless_collect)]
1291        let handles = (0..NUM_THREADS)
1292            .map(|n| {
1293                let cache = cache.clone();
1294                let rw_lock = Arc::clone(&rw_lock);
1295
1296                if n % 2 == 0 {
1297                    // This thread will update the cache.
1298                    std::thread::spawn(move || {
1299                        let read_lock = rw_lock.read().unwrap();
1300                        for key in 0..NUM_KEYS {
1301                            // TODO: Update keys in a random order?
1302                            cache.insert(key, make_value(key));
1303                        }
1304                        std::mem::drop(read_lock);
1305                    })
1306                } else {
1307                    // This thread will iterate the cache.
1308                    std::thread::spawn(move || {
1309                        let read_lock = rw_lock.read().unwrap();
1310                        let mut key_set = HashSet::new();
1311                        for (key, value) in &cache {
1312                            assert_eq!(value, make_value(*key));
1313                            key_set.insert(*key);
1314                        }
1315                        // Ensure there are no missing or duplicate keys in the iteration.
1316                        assert_eq!(key_set.len(), NUM_KEYS);
1317                        std::mem::drop(read_lock);
1318                    })
1319                }
1320            })
1321            .collect::<Vec<_>>();
1322
1323        // Let these threads to run by releasing the write lock.
1324        std::mem::drop(write_lock);
1325
1326        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1327
1328        // Ensure there are no missing or duplicate keys in the iteration.
1329        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1330        assert_eq!(key_set.len(), NUM_KEYS);
1331    }
1332
1333    #[test]
1334    fn get_with() {
1335        use std::thread::{sleep, spawn};
1336
1337        let cache = SegmentedCache::new(100, 4);
1338        const KEY: u32 = 0;
1339
1340        // This test will run five threads:
1341        //
1342        // Thread1 will be the first thread to call `get_with` for a key, so its init
1343        // closure will be evaluated and then a &str value "thread1" will be inserted
1344        // to the cache.
1345        let thread1 = {
1346            let cache1 = cache.clone();
1347            spawn(move || {
1348                // Call `get_with` immediately.
1349                let v = cache1.get_with(KEY, || {
1350                    // Wait for 300 ms and return a &str value.
1351                    sleep(Duration::from_millis(300));
1352                    "thread1"
1353                });
1354                assert_eq!(v, "thread1");
1355            })
1356        };
1357
1358        // Thread2 will be the second thread to call `get_with` for the same key, so
1359        // its init closure will not be evaluated. Once thread1's init closure
1360        // finishes, it will get the value inserted by thread1's init closure.
1361        let thread2 = {
1362            let cache2 = cache.clone();
1363            spawn(move || {
1364                // Wait for 100 ms before calling `get_with`.
1365                sleep(Duration::from_millis(100));
1366                let v = cache2.get_with(KEY, || unreachable!());
1367                assert_eq!(v, "thread1");
1368            })
1369        };
1370
1371        // Thread3 will be the third thread to call `get_with` for the same key. By
1372        // the time it calls, thread1's init closure should have finished already and
1373        // the value should be already inserted to the cache. So its init closure
1374        // will not be evaluated and will get the value insert by thread1's init
1375        // closure immediately.
1376        let thread3 = {
1377            let cache3 = cache.clone();
1378            spawn(move || {
1379                // Wait for 400 ms before calling `get_with`.
1380                sleep(Duration::from_millis(400));
1381                let v = cache3.get_with(KEY, || unreachable!());
1382                assert_eq!(v, "thread1");
1383            })
1384        };
1385
1386        // Thread4 will call `get` for the same key. It will call when thread1's init
1387        // closure is still running, so it will get none for the key.
1388        let thread4 = {
1389            let cache4 = cache.clone();
1390            spawn(move || {
1391                // Wait for 200 ms before calling `get`.
1392                sleep(Duration::from_millis(200));
1393                let maybe_v = cache4.get(&KEY);
1394                assert!(maybe_v.is_none());
1395            })
1396        };
1397
1398        // Thread5 will call `get` for the same key. It will call after thread1's init
1399        // closure finished, so it will get the value insert by thread1's init closure.
1400        let thread5 = {
1401            let cache5 = cache.clone();
1402            spawn(move || {
1403                // Wait for 400 ms before calling `get`.
1404                sleep(Duration::from_millis(400));
1405                let maybe_v = cache5.get(&KEY);
1406                assert_eq!(maybe_v, Some("thread1"));
1407            })
1408        };
1409
1410        for t in [thread1, thread2, thread3, thread4, thread5] {
1411            t.join().expect("Failed to join");
1412        }
1413
1414        assert!(cache.is_waiter_map_empty());
1415    }
1416
1417    #[test]
1418    fn get_with_if() {
1419        use std::thread::{sleep, spawn};
1420
1421        let cache = SegmentedCache::new(100, 4);
1422        const KEY: u32 = 0;
1423
1424        // This test will run seven threads:
1425        //
1426        // Thread1 will be the first thread to call `get_with_if` for a key, so its
1427        // init closure will be evaluated and then a &str value "thread1" will be
1428        // inserted to the cache.
1429        let thread1 = {
1430            let cache1 = cache.clone();
1431            spawn(move || {
1432                // Call `get_with` immediately.
1433                let v = cache1.get_with_if(
1434                    KEY,
1435                    || {
1436                        // Wait for 300 ms and return a &str value.
1437                        sleep(Duration::from_millis(300));
1438                        "thread1"
1439                    },
1440                    |_v| unreachable!(),
1441                );
1442                assert_eq!(v, "thread1");
1443            })
1444        };
1445
1446        // Thread2 will be the second thread to call `get_with_if` for the same key,
1447        // so its init closure will not be evaluated. Once thread1's init closure
1448        // finishes, it will get the value inserted by thread1's init closure.
1449        let thread2 = {
1450            let cache2 = cache.clone();
1451            spawn(move || {
1452                // Wait for 100 ms before calling `get_with`.
1453                sleep(Duration::from_millis(100));
1454                let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1455                assert_eq!(v, "thread1");
1456            })
1457        };
1458
1459        // Thread3 will be the third thread to call `get_with_if` for the same
1460        // key. By the time it calls, thread1's init closure should have finished
1461        // already and the value should be already inserted to the cache. Also
1462        // thread3's `replace_if` closure returns `false`. So its init closure will
1463        // not be evaluated and will get the value inserted by thread1's init closure
1464        // immediately.
1465        let thread3 = {
1466            let cache3 = cache.clone();
1467            spawn(move || {
1468                // Wait for 350 ms before calling `get_with_if`.
1469                sleep(Duration::from_millis(350));
1470                let v = cache3.get_with_if(
1471                    KEY,
1472                    || unreachable!(),
1473                    |v| {
1474                        assert_eq!(v, &"thread1");
1475                        false
1476                    },
1477                );
1478                assert_eq!(v, "thread1");
1479            })
1480        };
1481
1482        // Thread4 will be the fourth thread to call `get_with_if` for the same
1483        // key. The value should have been already inserted to the cache by
1484        // thread1. However thread4's `replace_if` closure returns `true`. So its
1485        // init closure will be evaluated to replace the current value.
1486        let thread4 = {
1487            let cache4 = cache.clone();
1488            spawn(move || {
1489                // Wait for 400 ms before calling `get_with_if`.
1490                sleep(Duration::from_millis(400));
1491                let v = cache4.get_with_if(
1492                    KEY,
1493                    || "thread4",
1494                    |v| {
1495                        assert_eq!(v, &"thread1");
1496                        true
1497                    },
1498                );
1499                assert_eq!(v, "thread4");
1500            })
1501        };
1502
1503        // Thread5 will call `get` for the same key. It will call when thread1's init
1504        // closure is still running, so it will get none for the key.
1505        let thread5 = {
1506            let cache5 = cache.clone();
1507            spawn(move || {
1508                // Wait for 200 ms before calling `get`.
1509                sleep(Duration::from_millis(200));
1510                let maybe_v = cache5.get(&KEY);
1511                assert!(maybe_v.is_none());
1512            })
1513        };
1514
1515        // Thread6 will call `get` for the same key. It will call when thread1's init
1516        // closure is still running, so it will get none for the key.
1517        let thread6 = {
1518            let cache6 = cache.clone();
1519            spawn(move || {
1520                // Wait for 200 ms before calling `get`.
1521                sleep(Duration::from_millis(350));
1522                let maybe_v = cache6.get(&KEY);
1523                assert_eq!(maybe_v, Some("thread1"));
1524            })
1525        };
1526
1527        // Thread7 will call `get` for the same key. It will call after thread1's init
1528        // closure finished, so it will get the value insert by thread1's init closure.
1529        let thread7 = {
1530            let cache7 = cache.clone();
1531            spawn(move || {
1532                // Wait for 400 ms before calling `get`.
1533                sleep(Duration::from_millis(450));
1534                let maybe_v = cache7.get(&KEY);
1535                assert_eq!(maybe_v, Some("thread4"));
1536            })
1537        };
1538
1539        for t in [
1540            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1541        ] {
1542            t.join().expect("Failed to join");
1543        }
1544
1545        assert!(cache.is_waiter_map_empty());
1546    }
1547
1548    #[test]
1549    fn try_get_with() {
1550        use std::{
1551            sync::Arc,
1552            thread::{sleep, spawn},
1553        };
1554
1555        #[derive(Debug)]
1556        pub struct MyError(String);
1557
1558        impl Display for MyError {
1559            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1560                write!(f, "{}", self.0)
1561            }
1562        }
1563
1564        impl Error for MyError {}
1565
1566        type MyResult<T> = Result<T, Arc<MyError>>;
1567
1568        let cache = SegmentedCache::new(100, 4);
1569        const KEY: u32 = 0;
1570
1571        // This test will run eight threads:
1572        //
1573        // Thread1 will be the first thread to call `try_get_with` for a key, so its
1574        // init closure will be evaluated and then an error will be returned. Nothing
1575        // will be inserted to the cache.
1576        let thread1 = {
1577            let cache1 = cache.clone();
1578            spawn(move || {
1579                // Call `try_get_with` immediately.
1580                let v = cache1.try_get_with(KEY, || {
1581                    // Wait for 300 ms and return an error.
1582                    sleep(Duration::from_millis(300));
1583                    Err(MyError("thread1 error".into()))
1584                });
1585                assert!(v.is_err());
1586            })
1587        };
1588
1589        // Thread2 will be the second thread to call `try_get_with` for the same key,
1590        // so its init closure will not be evaluated. Once thread1's init closure
1591        // finishes, it will get the same error value returned by thread1's init
1592        // closure.
1593        let thread2 = {
1594            let cache2 = cache.clone();
1595            spawn(move || {
1596                // Wait for 100 ms before calling `try_get_with`.
1597                sleep(Duration::from_millis(100));
1598                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1599                assert!(v.is_err());
1600            })
1601        };
1602
1603        // Thread3 will be the third thread to call `get_with` for the same key. By
1604        // the time it calls, thread1's init closure should have finished already,
1605        // but the key still does not exist in the cache. So its init closure will be
1606        // evaluated and then an okay &str value will be returned. That value will be
1607        // inserted to the cache.
1608        let thread3 = {
1609            let cache3 = cache.clone();
1610            spawn(move || {
1611                // Wait for 400 ms before calling `try_get_with`.
1612                sleep(Duration::from_millis(400));
1613                let v: MyResult<_> = cache3.try_get_with(KEY, || {
1614                    // Wait for 300 ms and return an Ok(&str) value.
1615                    sleep(Duration::from_millis(300));
1616                    Ok("thread3")
1617                });
1618                assert_eq!(v.unwrap(), "thread3");
1619            })
1620        };
1621
1622        // thread4 will be the fourth thread to call `try_get_with` for the same
1623        // key. So its init closure will not be evaluated. Once thread3's init
1624        // closure finishes, it will get the same okay &str value.
1625        let thread4 = {
1626            let cache4 = cache.clone();
1627            spawn(move || {
1628                // Wait for 500 ms before calling `try_get_with`.
1629                sleep(Duration::from_millis(500));
1630                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1631                assert_eq!(v.unwrap(), "thread3");
1632            })
1633        };
1634
1635        // Thread5 will be the fifth thread to call `try_get_with` for the same
1636        // key. So its init closure will not be evaluated. By the time it calls,
1637        // thread3's init closure should have finished already, so its init closure
1638        // will not be evaluated and will get the value insert by thread3's init
1639        // closure immediately.
1640        let thread5 = {
1641            let cache5 = cache.clone();
1642            spawn(move || {
1643                // Wait for 800 ms before calling `try_get_with`.
1644                sleep(Duration::from_millis(800));
1645                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1646                assert_eq!(v.unwrap(), "thread3");
1647            })
1648        };
1649
1650        // Thread6 will call `get` for the same key. It will call when thread1's init
1651        // closure is still running, so it will get none for the key.
1652        let thread6 = {
1653            let cache6 = cache.clone();
1654            spawn(move || {
1655                // Wait for 200 ms before calling `get`.
1656                sleep(Duration::from_millis(200));
1657                let maybe_v = cache6.get(&KEY);
1658                assert!(maybe_v.is_none());
1659            })
1660        };
1661
1662        // Thread7 will call `get` for the same key. It will call after thread1's init
1663        // closure finished with an error. So it will get none for the key.
1664        let thread7 = {
1665            let cache7 = cache.clone();
1666            spawn(move || {
1667                // Wait for 400 ms before calling `get`.
1668                sleep(Duration::from_millis(400));
1669                let maybe_v = cache7.get(&KEY);
1670                assert!(maybe_v.is_none());
1671            })
1672        };
1673
1674        // Thread8 will call `get` for the same key. It will call after thread3's init
1675        // closure finished, so it will get the value insert by thread3's init closure.
1676        let thread8 = {
1677            let cache8 = cache.clone();
1678            spawn(move || {
1679                // Wait for 800 ms before calling `get`.
1680                sleep(Duration::from_millis(800));
1681                let maybe_v = cache8.get(&KEY);
1682                assert_eq!(maybe_v, Some("thread3"));
1683            })
1684        };
1685
1686        for t in [
1687            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1688        ] {
1689            t.join().expect("Failed to join");
1690        }
1691
1692        assert!(cache.is_waiter_map_empty());
1693    }
1694
1695    #[test]
1696    fn optionally_get_with() {
1697        use std::thread::{sleep, spawn};
1698
1699        let cache = SegmentedCache::new(100, 4);
1700        const KEY: u32 = 0;
1701
1702        // This test will run eight threads:
1703        //
1704        // Thread1 will be the first thread to call `optionally_get_with` for a key, so its
1705        // init closure will be evaluated and then an error will be returned. Nothing
1706        // will be inserted to the cache.
1707        let thread1 = {
1708            let cache1 = cache.clone();
1709            spawn(move || {
1710                // Call `optionally_get_with` immediately.
1711                let v = cache1.optionally_get_with(KEY, || {
1712                    // Wait for 300 ms and return an error.
1713                    sleep(Duration::from_millis(300));
1714                    None
1715                });
1716                assert!(v.is_none());
1717            })
1718        };
1719
1720        // Thread2 will be the second thread to call `optionally_get_with` for the same key,
1721        // so its init closure will not be evaluated. Once thread1's init closure
1722        // finishes, it will get the same error value returned by thread1's init
1723        // closure.
1724        let thread2 = {
1725            let cache2 = cache.clone();
1726            spawn(move || {
1727                // Wait for 100 ms before calling `optionally_get_with`.
1728                sleep(Duration::from_millis(100));
1729                let v = cache2.optionally_get_with(KEY, || unreachable!());
1730                assert!(v.is_none());
1731            })
1732        };
1733
1734        // Thread3 will be the third thread to call `get_with` for the same key. By
1735        // the time it calls, thread1's init closure should have finished already,
1736        // but the key still does not exist in the cache. So its init closure will be
1737        // evaluated and then an okay &str value will be returned. That value will be
1738        // inserted to the cache.
1739        let thread3 = {
1740            let cache3 = cache.clone();
1741            spawn(move || {
1742                // Wait for 400 ms before calling `optionally_get_with`.
1743                sleep(Duration::from_millis(400));
1744                let v = cache3.optionally_get_with(KEY, || {
1745                    // Wait for 300 ms and return an Ok(&str) value.
1746                    sleep(Duration::from_millis(300));
1747                    Some("thread3")
1748                });
1749                assert_eq!(v.unwrap(), "thread3");
1750            })
1751        };
1752
1753        // thread4 will be the fourth thread to call `optionally_get_with` for the same
1754        // key. So its init closure will not be evaluated. Once thread3's init
1755        // closure finishes, it will get the same okay &str value.
1756        let thread4 = {
1757            let cache4 = cache.clone();
1758            spawn(move || {
1759                // Wait for 500 ms before calling `optionally_get_with`.
1760                sleep(Duration::from_millis(500));
1761                let v = cache4.optionally_get_with(KEY, || unreachable!());
1762                assert_eq!(v.unwrap(), "thread3");
1763            })
1764        };
1765
1766        // Thread5 will be the fifth thread to call `optionally_get_with` for the same
1767        // key. So its init closure will not be evaluated. By the time it calls,
1768        // thread3's init closure should have finished already, so its init closure
1769        // will not be evaluated and will get the value insert by thread3's init
1770        // closure immediately.
1771        let thread5 = {
1772            let cache5 = cache.clone();
1773            spawn(move || {
1774                // Wait for 800 ms before calling `optionally_get_with`.
1775                sleep(Duration::from_millis(800));
1776                let v = cache5.optionally_get_with(KEY, || unreachable!());
1777                assert_eq!(v.unwrap(), "thread3");
1778            })
1779        };
1780
1781        // Thread6 will call `get` for the same key. It will call when thread1's init
1782        // closure is still running, so it will get none for the key.
1783        let thread6 = {
1784            let cache6 = cache.clone();
1785            spawn(move || {
1786                // Wait for 200 ms before calling `get`.
1787                sleep(Duration::from_millis(200));
1788                let maybe_v = cache6.get(&KEY);
1789                assert!(maybe_v.is_none());
1790            })
1791        };
1792
1793        // Thread7 will call `get` for the same key. It will call after thread1's init
1794        // closure finished with an error. So it will get none for the key.
1795        let thread7 = {
1796            let cache7 = cache.clone();
1797            spawn(move || {
1798                // Wait for 400 ms before calling `get`.
1799                sleep(Duration::from_millis(400));
1800                let maybe_v = cache7.get(&KEY);
1801                assert!(maybe_v.is_none());
1802            })
1803        };
1804
1805        // Thread8 will call `get` for the same key. It will call after thread3's init
1806        // closure finished, so it will get the value insert by thread3's init closure.
1807        let thread8 = {
1808            let cache8 = cache.clone();
1809            spawn(move || {
1810                // Wait for 800 ms before calling `get`.
1811                sleep(Duration::from_millis(800));
1812                let maybe_v = cache8.get(&KEY);
1813                assert_eq!(maybe_v, Some("thread3"));
1814            })
1815        };
1816
1817        for t in [
1818            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1819        ] {
1820            t.join().expect("Failed to join");
1821        }
1822
1823        assert!(cache.is_waiter_map_empty());
1824    }
1825
1826    // This test ensures that the `contains_key`, `get` and `invalidate` can use
1827    // borrowed form `&[u8]` for key with type `Vec<u8>`.
1828    // https://github.com/moka-rs/moka/issues/166
1829    #[test]
1830    fn borrowed_forms_of_key() {
1831        let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1832
1833        let key = vec![1_u8];
1834        cache.insert(key.clone(), ());
1835
1836        // key as &Vec<u8>
1837        let key_v: &Vec<u8> = &key;
1838        assert!(cache.contains_key(key_v));
1839        assert_eq!(cache.get(key_v), Some(()));
1840        cache.invalidate(key_v);
1841
1842        cache.insert(key, ());
1843
1844        // key as &[u8]
1845        let key_s: &[u8] = &[1_u8];
1846        assert!(cache.contains_key(key_s));
1847        assert_eq!(cache.get(key_s), Some(()));
1848        cache.invalidate(key_s);
1849    }
1850
1851    // Ignored by default. This test becomes unstable when run in parallel with
1852    // other tests.
1853    #[test]
1854    #[ignore]
1855    fn drop_value_immediately_after_eviction() {
1856        use crate::common::test_utils::{Counters, Value};
1857
1858        const NUM_SEGMENTS: usize = 1;
1859        const MAX_CAPACITY: u32 = 500;
1860        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1861
1862        let counters = Arc::new(Counters::default());
1863        let counters1 = Arc::clone(&counters);
1864
1865        let listener = move |_k, _v, cause| match cause {
1866            RemovalCause::Size => counters1.incl_evicted(),
1867            RemovalCause::Explicit => counters1.incl_invalidated(),
1868            _ => (),
1869        };
1870
1871        let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1872            .max_capacity(MAX_CAPACITY as u64)
1873            .eviction_listener(listener)
1874            .build();
1875        cache.reconfigure_for_testing();
1876
1877        // Make the cache exterior immutable.
1878        let cache = cache;
1879
1880        for key in 0..KEYS {
1881            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1882            cache.insert(key, value);
1883            counters.incl_inserted();
1884            cache.run_pending_tasks();
1885        }
1886
1887        let eviction_count = KEYS - MAX_CAPACITY;
1888
1889        cache.run_pending_tasks();
1890        assert_eq!(counters.inserted(), KEYS, "inserted");
1891        assert_eq!(counters.value_created(), KEYS, "value_created");
1892        assert_eq!(counters.evicted(), eviction_count, "evicted");
1893        assert_eq!(counters.invalidated(), 0, "invalidated");
1894        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1895
1896        for key in 0..KEYS {
1897            cache.invalidate(&key);
1898            cache.run_pending_tasks();
1899        }
1900
1901        cache.run_pending_tasks();
1902        assert_eq!(counters.inserted(), KEYS, "inserted");
1903        assert_eq!(counters.value_created(), KEYS, "value_created");
1904        assert_eq!(counters.evicted(), eviction_count, "evicted");
1905        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1906        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1907
1908        std::mem::drop(cache);
1909        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1910    }
1911
1912    #[test]
1913    fn test_debug_format() {
1914        let cache = SegmentedCache::new(10, 4);
1915        cache.insert('a', "alice");
1916        cache.insert('b', "bob");
1917        cache.insert('c', "cindy");
1918
1919        let debug_str = format!("{cache:?}");
1920        assert!(debug_str.starts_with('{'));
1921        assert!(debug_str.contains(r#"'a': "alice""#));
1922        assert!(debug_str.contains(r#"'b': "bob""#));
1923        assert!(debug_str.contains(r#"'c': "cindy""#));
1924        assert!(debug_str.ends_with('}'));
1925    }
1926
1927    type NotificationPair<V> = (V, RemovalCause);
1928    type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1929
1930    fn verify_notification_vec<K, V, S>(
1931        cache: &SegmentedCache<K, V, S>,
1932        actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1933        expected: &[NotificationTriple<K, V>],
1934    ) where
1935        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1936        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1937        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1938    {
1939        // Retries will be needed when testing in a QEMU VM.
1940        const MAX_RETRIES: usize = 5;
1941        let mut retries = 0;
1942        loop {
1943            // Ensure all scheduled notifications have been processed.
1944            std::thread::sleep(Duration::from_millis(500));
1945
1946            let actual = &*actual.lock();
1947            if actual.len() != expected.len() {
1948                if retries <= MAX_RETRIES {
1949                    retries += 1;
1950                    cache.run_pending_tasks();
1951                    continue;
1952                } else {
1953                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1954                }
1955            }
1956
1957            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1958                assert_eq!(actual, expected, "expected[{i}]");
1959            }
1960
1961            break;
1962        }
1963    }
1964
1965    fn verify_notification_map<K, V, S>(
1966        cache: &SegmentedCache<K, V, S>,
1967        actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1968        expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1969    ) where
1970        K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1971        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1972        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1973    {
1974        // Retries will be needed when testing in a QEMU VM.
1975        const MAX_RETRIES: usize = 5;
1976        let mut retries = 0;
1977        loop {
1978            // Ensure all scheduled notifications have been processed.
1979            std::thread::sleep(Duration::from_millis(500));
1980
1981            let actual = &*actual.lock();
1982            if actual.len() != expected.len() {
1983                if retries <= MAX_RETRIES {
1984                    retries += 1;
1985                    cache.run_pending_tasks();
1986                    continue;
1987                } else {
1988                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1989                }
1990            }
1991
1992            for actual_key in actual.keys() {
1993                assert_eq!(
1994                    actual.get(actual_key),
1995                    expected.get(actual_key),
1996                    "expected[{actual_key}]",
1997                );
1998            }
1999
2000            break;
2001        }
2002    }
2003}