moka/sync/
segment.rs

1use equivalent::Equivalent;
2
3use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
4use crate::common::concurrent::Weigher;
5use crate::common::time::Clock;
6use crate::{
7    common::{
8        iter::{Iter, ScanningGet},
9        HousekeeperConfig,
10    },
11    notification::EvictionListener,
12    policy::{EvictionPolicy, ExpirationPolicy},
13    Entry, Policy, PredicateError,
14};
15
16use std::{
17    collections::hash_map::RandomState,
18    fmt,
19    hash::{BuildHasher, Hash, Hasher},
20    sync::Arc,
21};
22
23/// A thread-safe concurrent in-memory cache, with multiple internal segments.
24///
25/// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for
26/// increased concurrent update performance. However, it has little overheads on
27/// retrievals and updates for managing these segments.
28///
29/// For usage examples, see the document of the [`Cache`][cache-struct].
30///
31/// [cache-struct]: ./struct.Cache.html
32///
33pub struct SegmentedCache<K, V, S = RandomState> {
34    inner: Arc<Inner<K, V, S>>,
35}
36
37unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
38where
39    K: Send + Sync,
40    V: Send + Sync,
41    S: Send,
42{
43}
44
45unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
46where
47    K: Send + Sync,
48    V: Send + Sync,
49    S: Sync,
50{
51}
52
53impl<K, V, S> Clone for SegmentedCache<K, V, S> {
54    /// Makes a clone of this shared cache.
55    ///
56    /// This operation is cheap as it only creates thread-safe reference counted
57    /// pointers to the shared internal data structures.
58    fn clone(&self) -> Self {
59        Self {
60            inner: Arc::clone(&self.inner),
61        }
62    }
63}
64
65impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
66where
67    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
68    V: fmt::Debug + Clone + Send + Sync + 'static,
69    // TODO: Remove these bounds from S.
70    S: BuildHasher + Clone + Send + Sync + 'static,
71{
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        let mut d_map = f.debug_map();
74
75        for (k, v) in self {
76            d_map.entry(&k, &v);
77        }
78
79        d_map.finish()
80    }
81}
82
83impl<K, V> SegmentedCache<K, V, RandomState>
84where
85    K: Hash + Eq + Send + Sync + 'static,
86    V: Clone + Send + Sync + 'static,
87{
88    /// Constructs a new `SegmentedCache<K, V>` that has multiple internal
89    /// segments and will store up to the `max_capacity`.
90    ///
91    /// To adjust various configuration knobs such as `initial_capacity` or
92    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
93    ///
94    /// [builder-struct]: ./struct.CacheBuilder.html
95    ///
96    /// # Panics
97    ///
98    /// Panics if `num_segments` is 0.
99    pub fn new(max_capacity: u64, num_segments: usize) -> Self {
100        let build_hasher = RandomState::default();
101        Self::with_everything(
102            None,
103            Some(max_capacity),
104            None,
105            num_segments,
106            build_hasher,
107            None,
108            EvictionPolicy::default(),
109            None,
110            ExpirationPolicy::default(),
111            HousekeeperConfig::default(),
112            false,
113            Clock::default(),
114        )
115    }
116
117    /// Returns a [`CacheBuilder`][builder-struct], which can builds a
118    /// `SegmentedCache` with various configuration knobs.
119    ///
120    /// [builder-struct]: ./struct.CacheBuilder.html
121    pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
122        CacheBuilder::default().segments(num_segments)
123    }
124}
125
126impl<K, V, S> SegmentedCache<K, V, S> {
127    /// Returns cache’s name.
128    pub fn name(&self) -> Option<&str> {
129        self.inner.segments[0].name()
130    }
131
132    /// Returns a read-only cache policy of this cache.
133    ///
134    /// At this time, cache policy cannot be modified after cache creation.
135    /// A future version may support to modify it.
136    pub fn policy(&self) -> Policy {
137        let mut policy = self.inner.segments[0].policy();
138        policy.set_max_capacity(self.inner.desired_capacity);
139        policy.set_num_segments(self.inner.segments.len());
140        policy
141    }
142
143    /// Returns an approximate number of entries in this cache.
144    ///
145    /// The value returned is _an estimate_; the actual count may differ if there are
146    /// concurrent insertions or removals, or if some entries are pending removal due
147    /// to expiration. This inaccuracy can be mitigated by performing a
148    /// `run_pending_tasks` first.
149    ///
150    /// # Example
151    ///
152    /// ```rust
153    /// use moka::sync::SegmentedCache;
154    ///
155    /// let cache = SegmentedCache::new(10, 4);
156    /// cache.insert('n', "Netherland Dwarf");
157    /// cache.insert('l', "Lop Eared");
158    /// cache.insert('d', "Dutch");
159    ///
160    /// // Ensure an entry exists.
161    /// assert!(cache.contains_key(&'n'));
162    ///
163    /// // However, followings may print stale number zeros instead of threes.
164    /// println!("{}", cache.entry_count());   // -> 0
165    /// println!("{}", cache.weighted_size()); // -> 0
166    ///
167    /// // To mitigate the inaccuracy, call `run_pending_tasks` method to run
168    /// // pending internal tasks.
169    /// cache.run_pending_tasks();
170    ///
171    /// // Followings will print the actual numbers.
172    /// println!("{}", cache.entry_count());   // -> 3
173    /// println!("{}", cache.weighted_size()); // -> 3
174    /// ```
175    ///
176    pub fn entry_count(&self) -> u64 {
177        self.inner
178            .segments
179            .iter()
180            .map(|seg| seg.entry_count())
181            .sum()
182    }
183
184    /// Returns an approximate total weighted size of entries in this cache.
185    ///
186    /// The value returned is _an estimate_; the actual size may differ if there are
187    /// concurrent insertions or removals, or if some entries are pending removal due
188    /// to expiration. This inaccuracy can be mitigated by performing a
189    /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
190    /// sample code.
191    pub fn weighted_size(&self) -> u64 {
192        self.inner
193            .segments
194            .iter()
195            .map(|seg| seg.weighted_size())
196            .sum()
197    }
198}
199
200impl<K, V, S> SegmentedCache<K, V, S>
201where
202    K: Hash + Eq + Send + Sync + 'static,
203    V: Clone + Send + Sync + 'static,
204    S: BuildHasher + Clone + Send + Sync + 'static,
205{
206    /// # Panics
207    ///
208    /// Panics if `num_segments` is 0.
209    #[allow(clippy::too_many_arguments)]
210    pub(crate) fn with_everything(
211        name: Option<String>,
212        max_capacity: Option<u64>,
213        initial_capacity: Option<usize>,
214        num_segments: usize,
215        build_hasher: S,
216        weigher: Option<Weigher<K, V>>,
217        eviction_policy: EvictionPolicy,
218        eviction_listener: Option<EvictionListener<K, V>>,
219        expiration_policy: ExpirationPolicy<K, V>,
220        housekeeper_config: HousekeeperConfig,
221        invalidator_enabled: bool,
222        clock: Clock,
223    ) -> Self {
224        Self {
225            inner: Arc::new(Inner::new(
226                name,
227                max_capacity,
228                initial_capacity,
229                num_segments,
230                build_hasher,
231                weigher,
232                eviction_policy,
233                eviction_listener,
234                expiration_policy,
235                housekeeper_config,
236                invalidator_enabled,
237                clock,
238            )),
239        }
240    }
241
242    /// Returns `true` if the cache contains a value for the key.
243    ///
244    /// Unlike the `get` method, this method is not considered a cache read operation,
245    /// so it does not update the historic popularity estimator or reset the idle
246    /// timer for the key.
247    ///
248    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
249    /// on the borrowed form _must_ match those for the key type.
250    pub fn contains_key<Q>(&self, key: &Q) -> bool
251    where
252        Q: Equivalent<K> + Hash + ?Sized,
253    {
254        let hash = self.inner.hash(key);
255        self.inner.select(hash).contains_key_with_hash(key, hash)
256    }
257
258    /// Returns a _clone_ of the value corresponding to the key.
259    ///
260    /// If you want to store values that will be expensive to clone, wrap them by
261    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
262    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
263    ///
264    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
265    /// on the borrowed form _must_ match those for the key type.
266    ///
267    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
268    pub fn get<Q>(&self, key: &Q) -> Option<V>
269    where
270        Q: Equivalent<K> + Hash + Eq + ?Sized,
271    {
272        let hash = self.inner.hash(key);
273        self.inner
274            .select(hash)
275            .get_with_hash(key, hash, false)
276            .map(Entry::into_value)
277    }
278
279    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280    where
281        K: Hash + Eq,
282    {
283        let hash = self.inner.hash(&key);
284        let cache = self.inner.select(hash);
285        OwnedKeyEntrySelector::new(key, hash, cache)
286    }
287
288    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289    where
290        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
291    {
292        let hash = self.inner.hash(key);
293        let cache = self.inner.select(hash);
294        RefKeyEntrySelector::new(key, hash, cache)
295    }
296
297    /// TODO: Remove this in v0.13.0.
298    /// Deprecated, replaced with [`get_with`](#method.get_with)
299    #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
300    pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
301        self.get_with(key, init)
302    }
303
304    /// TODO: Remove this in v0.13.0.
305    /// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
306    #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
307    pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
308    where
309        F: FnOnce() -> Result<V, E>,
310        E: Send + Sync + 'static,
311    {
312        self.try_get_with(key, init)
313    }
314
315    /// Returns a _clone_ of the value corresponding to the key. If the value does
316    /// not exist, evaluates the `init` closure and inserts the output.
317    ///
318    /// # Concurrent calls on the same key
319    ///
320    /// This method guarantees that concurrent calls on the same not-existing key are
321    /// coalesced into one evaluation of the `init` closure. Only one of the calls
322    /// evaluates its closure, and other calls wait for that closure to complete. See
323    /// [`Cache::get_with`][get-with-method] for more details.
324    ///
325    /// [get-with-method]: ./struct.Cache.html#method.get_with
326    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
327        let hash = self.inner.hash(&key);
328        let key = Arc::new(key);
329        let replace_if = None as Option<fn(&V) -> bool>;
330        self.inner
331            .select(hash)
332            .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
333            .into_value()
334    }
335
336    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
337    /// key, you can pass a reference to the key. If the key does not exist in the
338    /// cache, the key will be cloned to create new entry in the cache.
339    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
340    where
341        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
342    {
343        let hash = self.inner.hash(key);
344        let replace_if = None as Option<fn(&V) -> bool>;
345        self.inner
346            .select(hash)
347            .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
348            .into_value()
349    }
350
351    /// Works like [`get_with`](#method.get_with), but takes an additional
352    /// `replace_if` closure.
353    ///
354    /// This method will evaluate the `init` closure and insert the output to the
355    /// cache when:
356    ///
357    /// - The key does not exist.
358    /// - Or, `replace_if` closure returns `true`.
359    pub fn get_with_if(
360        &self,
361        key: K,
362        init: impl FnOnce() -> V,
363        replace_if: impl FnMut(&V) -> bool,
364    ) -> V {
365        let hash = self.inner.hash(&key);
366        let key = Arc::new(key);
367        self.inner
368            .select(hash)
369            .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
370            .into_value()
371    }
372
373    /// Returns a _clone_ of the value corresponding to the key. If the value does
374    /// not exist, evaluates the `init` closure, and inserts the value if
375    /// `Some(value)` was returned. If `None` was returned from the closure, this
376    /// method does not insert a value and returns `None`.
377    ///
378    /// # Concurrent calls on the same key
379    ///
380    /// This method guarantees that concurrent calls on the same not-existing key are
381    /// coalesced into one evaluation of the `init` closure. Only one of the calls
382    /// evaluates its closure, and other calls wait for that closure to complete.
383    /// See [`Cache::optionally_get_with`][opt-get-with-method] for more details.
384    ///
385    /// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with
386    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
387    where
388        F: FnOnce() -> Option<V>,
389    {
390        let hash = self.inner.hash(&key);
391        let key = Arc::new(key);
392        self.inner
393            .select(hash)
394            .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
395            .map(Entry::into_value)
396    }
397
398    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
399    /// of passing an owned key, you can pass a reference to the key. If the key does
400    /// not exist in the cache, the key will be cloned to create new entry in the
401    /// cache.
402    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
403    where
404        F: FnOnce() -> Option<V>,
405        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
406    {
407        let hash = self.inner.hash(key);
408        self.inner
409            .select(hash)
410            .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
411            .map(Entry::into_value)
412    }
413
414    /// Returns a _clone_ of the value corresponding to the key. If the value does
415    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
416    /// was returned. If `Err(_)` was returned from the closure, this method does not
417    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
418    ///
419    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
420    ///
421    /// # Concurrent calls on the same key
422    ///
423    /// This method guarantees that concurrent calls on the same not-existing key are
424    /// coalesced into one evaluation of the `init` closure (as long as these
425    /// closures return the same error type). Only one of the calls evaluates its
426    /// closure, and other calls wait for that closure to complete. See
427    /// [`Cache::try_get_with`][try-get-with-method] for more details.
428    ///
429    /// [try-get-with-method]: ./struct.Cache.html#method.try_get_with
430    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
431    where
432        F: FnOnce() -> Result<V, E>,
433        E: Send + Sync + 'static,
434    {
435        let hash = self.inner.hash(&key);
436        let key = Arc::new(key);
437        self.inner
438            .select(hash)
439            .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
440            .map(Entry::into_value)
441    }
442
443    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
444    /// owned key, you can pass a reference to the key. If the key does not exist in
445    /// the cache, the key will be cloned to create new entry in the cache.
446    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
447    where
448        F: FnOnce() -> Result<V, E>,
449        E: Send + Sync + 'static,
450        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
451    {
452        let hash = self.inner.hash(key);
453        self.inner
454            .select(hash)
455            .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
456            .map(Entry::into_value)
457    }
458
459    /// Inserts a key-value pair into the cache.
460    ///
461    /// If the cache has this key present, the value is updated.
462    pub fn insert(&self, key: K, value: V) {
463        let hash = self.inner.hash(&key);
464        let key = Arc::new(key);
465        self.inner.select(hash).insert_with_hash(key, hash, value);
466    }
467
468    /// Discards any cached value for the key.
469    ///
470    /// If you need to get a the value that has been discarded, use the
471    /// [`remove`](#method.remove) method instead.
472    ///
473    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
474    /// on the borrowed form _must_ match those for the key type.
475    pub fn invalidate<Q>(&self, key: &Q)
476    where
477        Q: Equivalent<K> + Hash + ?Sized,
478    {
479        let hash = self.inner.hash(key);
480        self.inner
481            .select(hash)
482            .invalidate_with_hash(key, hash, false);
483    }
484
485    /// Discards any cached value for the key and returns a clone of the value.
486    ///
487    /// If you do not need to get the value that has been discarded, use the
488    /// [`invalidate`](#method.invalidate) method instead.
489    ///
490    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
491    /// on the borrowed form _must_ match those for the key type.
492    pub fn remove<Q>(&self, key: &Q) -> Option<V>
493    where
494        Q: Equivalent<K> + Hash + ?Sized,
495    {
496        let hash = self.inner.hash(key);
497        self.inner
498            .select(hash)
499            .invalidate_with_hash(key, hash, true)
500    }
501
502    /// Discards all cached values.
503    ///
504    /// This method returns immediately by just setting the current time as the
505    /// invalidation time. `get` and other retrieval methods are guaranteed not to
506    /// return the entries inserted before or at the invalidation time.
507    ///
508    /// The actual removal of the invalidated entries is done as a maintenance task
509    /// driven by a user thread. For more details, see
510    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
511    /// level documentation.
512    ///
513    /// Like the `invalidate` method, this method does not clear the historic
514    /// popularity estimator of keys so that it retains the client activities of
515    /// trying to retrieve an item.
516    pub fn invalidate_all(&self) {
517        for segment in self.inner.segments.iter() {
518            segment.invalidate_all();
519        }
520    }
521
522    /// Discards cached values that satisfy a predicate.
523    ///
524    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
525    /// closure is called against each cached entry inserted before or at the time
526    /// when this method was called. If the closure returns `true` that entry will be
527    /// evicted from the cache.
528    ///
529    /// This method returns immediately by not actually removing the invalidated
530    /// entries. Instead, it just sets the predicate to the cache with the time when
531    /// this method was called. The actual removal of the invalidated entries is done
532    /// as a maintenance task driven by a user thread. For more details, see
533    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
534    /// level documentation.
535    ///
536    /// Also the `get` and other retrieval methods will apply the closure to a cached
537    /// entry to determine if it should have been invalidated. Therefore, it is
538    /// guaranteed that these methods must not return invalidated values.
539    ///
540    /// Note that you must call
541    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
542    /// at the cache creation time as the cache needs to maintain additional internal
543    /// data structures to support this method. Otherwise, calling this method will
544    /// fail with a
545    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
546    ///
547    /// Like the `invalidate` method, this method does not clear the historic
548    /// popularity estimator of keys so that it retains the client activities of
549    /// trying to retrieve an item.
550    ///
551    /// [support-invalidation-closures]:
552    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
553    /// [invalidation-disabled-error]:
554    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
555    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
556    where
557        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
558    {
559        let pred = Arc::new(predicate);
560        for segment in self.inner.segments.iter() {
561            segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
562        }
563        Ok(())
564    }
565
566    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
567    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
568    /// value.
569    ///
570    /// Iterators do not block concurrent reads and writes on the cache. An entry can
571    /// be inserted to, invalidated or evicted from a cache while iterators are alive
572    /// on the same cache.
573    ///
574    /// Unlike the `get` method, visiting entries via an iterator do not update the
575    /// historic popularity estimator or reset idle timers for keys.
576    ///
577    /// # Guarantees
578    ///
579    /// In order to allow concurrent access to the cache, iterator's `next` method
580    /// does _not_ guarantee the following:
581    ///
582    /// - It does not guarantee to return a key-value pair (an entry) if its key has
583    ///   been inserted to the cache _after_ the iterator was created.
584    ///   - Such an entry may or may not be returned depending on key's hash and
585    ///     timing.
586    ///
587    /// and the `next` method guarantees the followings:
588    ///
589    /// - It guarantees not to return the same entry more than once.
590    /// - It guarantees not to return an entry if it has been removed from the cache
591    ///   after the iterator was created.
592    ///     - Note: An entry can be removed by following reasons:
593    ///         - Manually invalidated.
594    ///         - Expired (e.g. time-to-live).
595    ///         - Evicted as the cache capacity exceeded.
596    ///
597    /// # Examples
598    ///
599    /// ```rust
600    /// use moka::sync::SegmentedCache;
601    ///
602    /// let cache = SegmentedCache::new(100, 4);
603    /// cache.insert("Julia", 14);
604    ///
605    /// let mut iter = cache.iter();
606    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
607    /// assert_eq!(*k, "Julia");
608    /// assert_eq!(v, 14);
609    ///
610    /// assert!(iter.next().is_none());
611    /// ```
612    ///
613    pub fn iter(&self) -> Iter<'_, K, V> {
614        let num_cht_segments = self.inner.segments[0].num_cht_segments();
615        let segments = self
616            .inner
617            .segments
618            .iter()
619            .map(|c| c as &dyn ScanningGet<_, _>)
620            .collect::<Vec<_>>()
621            .into_boxed_slice();
622        Iter::with_multiple_cache_segments(segments, num_cht_segments)
623    }
624
625    /// Performs any pending maintenance operations needed by the cache.
626    pub fn run_pending_tasks(&self) {
627        for segment in self.inner.segments.iter() {
628            segment.run_pending_tasks();
629        }
630    }
631}
632
633impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
634where
635    K: Hash + Eq + Send + Sync + 'static,
636    V: Clone + Send + Sync + 'static,
637    S: BuildHasher + Clone + Send + Sync + 'static,
638{
639    type Item = (Arc<K>, V);
640
641    type IntoIter = Iter<'a, K, V>;
642
643    fn into_iter(self) -> Self::IntoIter {
644        self.iter()
645    }
646}
647
648// For unit tests.
649#[cfg(test)]
650impl<K, V, S> SegmentedCache<K, V, S> {
651    fn is_waiter_map_empty(&self) -> bool {
652        self.inner.segments.iter().all(Cache::is_waiter_map_empty)
653    }
654}
655
656#[cfg(test)]
657impl<K, V, S> SegmentedCache<K, V, S>
658where
659    K: Hash + Eq + Send + Sync + 'static,
660    V: Clone + Send + Sync + 'static,
661    S: BuildHasher + Clone + Send + Sync + 'static,
662{
663    fn invalidation_predicate_count(&self) -> usize {
664        self.inner
665            .segments
666            .iter()
667            .map(|seg| seg.invalidation_predicate_count())
668            .sum()
669    }
670
671    fn reconfigure_for_testing(&mut self) {
672        let inner = Arc::get_mut(&mut self.inner)
673            .expect("There are other strong reference to self.inner Arc");
674
675        for segment in inner.segments.iter_mut() {
676            segment.reconfigure_for_testing();
677        }
678    }
679
680    fn key_locks_map_is_empty(&self) -> bool {
681        self.inner
682            .segments
683            .iter()
684            .all(|seg| seg.key_locks_map_is_empty())
685    }
686}
687
688struct Inner<K, V, S> {
689    desired_capacity: Option<u64>,
690    segments: Box<[Cache<K, V, S>]>,
691    build_hasher: S,
692    segment_shift: u32,
693}
694
695impl<K, V, S> Inner<K, V, S>
696where
697    K: Hash + Eq + Send + Sync + 'static,
698    V: Clone + Send + Sync + 'static,
699    S: BuildHasher + Clone + Send + Sync + 'static,
700{
701    /// # Panics
702    ///
703    /// Panics if `num_segments` is 0.
704    #[allow(clippy::too_many_arguments)]
705    fn new(
706        name: Option<String>,
707        max_capacity: Option<u64>,
708        initial_capacity: Option<usize>,
709        num_segments: usize,
710        build_hasher: S,
711        weigher: Option<Weigher<K, V>>,
712        eviction_policy: EvictionPolicy,
713        eviction_listener: Option<EvictionListener<K, V>>,
714        expiration_policy: ExpirationPolicy<K, V>,
715        housekeeper_config: HousekeeperConfig,
716        invalidator_enabled: bool,
717        clock: Clock,
718    ) -> Self {
719        assert!(num_segments > 0);
720
721        let actual_num_segments = num_segments.next_power_of_two();
722        let segment_shift = 64 - actual_num_segments.trailing_zeros();
723        let seg_max_capacity =
724            max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
725        let seg_init_capacity =
726            initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
727        // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]`
728        // because Cache::clone() does not clone its inner but shares the same inner.
729        let segments = (0..actual_num_segments)
730            .map(|_| {
731                Cache::with_everything(
732                    name.clone(),
733                    seg_max_capacity,
734                    seg_init_capacity,
735                    build_hasher.clone(),
736                    weigher.clone(),
737                    eviction_policy.clone(),
738                    eviction_listener.clone(),
739                    expiration_policy.clone(),
740                    housekeeper_config.clone(),
741                    invalidator_enabled,
742                    clock.clone(),
743                )
744            })
745            .collect::<Vec<_>>();
746
747        Self {
748            desired_capacity: max_capacity,
749            segments: segments.into_boxed_slice(),
750            build_hasher,
751            segment_shift,
752        }
753    }
754
755    #[inline]
756    fn hash<Q>(&self, key: &Q) -> u64
757    where
758        Q: Equivalent<K> + Hash + ?Sized,
759    {
760        let mut hasher = self.build_hasher.build_hasher();
761        key.hash(&mut hasher);
762        hasher.finish()
763    }
764
765    #[inline]
766    fn select(&self, hash: u64) -> &Cache<K, V, S> {
767        let index = self.segment_index_from_hash(hash);
768        &self.segments[index]
769    }
770
771    #[inline]
772    fn segment_index_from_hash(&self, hash: u64) -> usize {
773        if self.segment_shift == 64 {
774            0
775        } else {
776            (hash >> self.segment_shift) as usize
777        }
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::SegmentedCache;
784    use crate::notification::RemovalCause;
785    use parking_lot::Mutex;
786    use std::{error::Error, fmt::Display, sync::Arc, time::Duration};
787
788    #[test]
789    fn max_capacity_zero() {
790        let mut cache = SegmentedCache::new(0, 1);
791        cache.reconfigure_for_testing();
792
793        // Make the cache exterior immutable.
794        let cache = cache;
795
796        cache.insert(0, ());
797
798        assert!(!cache.contains_key(&0));
799        assert!(cache.get(&0).is_none());
800        cache.run_pending_tasks();
801        assert!(!cache.contains_key(&0));
802        assert!(cache.get(&0).is_none());
803        assert_eq!(cache.entry_count(), 0)
804    }
805
806    #[test]
807    fn basic_single_thread() {
808        // The following `Vec`s will hold actual and expected notifications.
809        let actual = Arc::new(Mutex::new(Vec::new()));
810        let mut expected = Vec::new();
811
812        // Create an eviction listener.
813        let a1 = Arc::clone(&actual);
814        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
815
816        // Create a cache with the eviction listener.
817        let mut cache = SegmentedCache::builder(1)
818            .max_capacity(3)
819            .eviction_listener(listener)
820            .build();
821        cache.reconfigure_for_testing();
822
823        // Make the cache exterior immutable.
824        let cache = cache;
825
826        cache.insert("a", "alice");
827        cache.insert("b", "bob");
828        assert_eq!(cache.get(&"a"), Some("alice"));
829        assert!(cache.contains_key(&"a"));
830        assert!(cache.contains_key(&"b"));
831        assert_eq!(cache.get(&"b"), Some("bob"));
832        cache.run_pending_tasks();
833        // counts: a -> 1, b -> 1
834
835        cache.insert("c", "cindy");
836        assert_eq!(cache.get(&"c"), Some("cindy"));
837        assert!(cache.contains_key(&"c"));
838        // counts: a -> 1, b -> 1, c -> 1
839        cache.run_pending_tasks();
840
841        assert!(cache.contains_key(&"a"));
842        assert_eq!(cache.get(&"a"), Some("alice"));
843        assert_eq!(cache.get(&"b"), Some("bob"));
844        assert!(cache.contains_key(&"b"));
845        cache.run_pending_tasks();
846        // counts: a -> 2, b -> 2, c -> 1
847
848        // "d" should not be admitted because its frequency is too low.
849        cache.insert("d", "david"); //   count: d -> 0
850        expected.push((Arc::new("d"), "david", RemovalCause::Size));
851        cache.run_pending_tasks();
852        assert_eq!(cache.get(&"d"), None); //   d -> 1
853        assert!(!cache.contains_key(&"d"));
854
855        cache.insert("d", "david");
856        expected.push((Arc::new("d"), "david", RemovalCause::Size));
857        cache.run_pending_tasks();
858        assert!(!cache.contains_key(&"d"));
859        assert_eq!(cache.get(&"d"), None); //   d -> 2
860
861        // "d" should be admitted and "c" should be evicted
862        // because d's frequency is higher than c's.
863        cache.insert("d", "dennis");
864        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
865        cache.run_pending_tasks();
866        assert_eq!(cache.get(&"a"), Some("alice"));
867        assert_eq!(cache.get(&"b"), Some("bob"));
868        assert_eq!(cache.get(&"c"), None);
869        assert_eq!(cache.get(&"d"), Some("dennis"));
870        assert!(cache.contains_key(&"a"));
871        assert!(cache.contains_key(&"b"));
872        assert!(!cache.contains_key(&"c"));
873        assert!(cache.contains_key(&"d"));
874
875        cache.invalidate(&"b");
876        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
877        cache.run_pending_tasks();
878        assert_eq!(cache.get(&"b"), None);
879        assert!(!cache.contains_key(&"b"));
880
881        assert!(cache.remove(&"b").is_none());
882        assert_eq!(cache.remove(&"d"), Some("dennis"));
883        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
884        cache.run_pending_tasks();
885        assert_eq!(cache.get(&"d"), None);
886        assert!(!cache.contains_key(&"d"));
887
888        verify_notification_vec(&cache, actual, &expected);
889        assert!(cache.key_locks_map_is_empty());
890    }
891
892    #[test]
893    fn non_power_of_two_segments() {
894        let mut cache = SegmentedCache::new(100, 5);
895        cache.reconfigure_for_testing();
896
897        // Make the cache exterior immutable.
898        let cache = cache;
899
900        assert_eq!(cache.iter().count(), 0);
901
902        cache.insert("a", "alice");
903        cache.insert("b", "bob");
904        cache.insert("c", "cindy");
905
906        assert_eq!(cache.iter().count(), 3);
907        cache.run_pending_tasks();
908        assert_eq!(cache.iter().count(), 3);
909    }
910
911    #[test]
912    fn size_aware_eviction() {
913        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
914
915        let alice = ("alice", 10);
916        let bob = ("bob", 15);
917        let bill = ("bill", 20);
918        let cindy = ("cindy", 5);
919        let david = ("david", 15);
920        let dennis = ("dennis", 15);
921
922        // The following `Vec`s will hold actual and expected notifications.
923        let actual = Arc::new(Mutex::new(Vec::new()));
924        let mut expected = Vec::new();
925
926        // Create an eviction listener.
927        let a1 = Arc::clone(&actual);
928        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
929
930        // Create a cache with the eviction listener.
931        let mut cache = SegmentedCache::builder(1)
932            .max_capacity(31)
933            .weigher(weigher)
934            .eviction_listener(listener)
935            .build();
936        cache.reconfigure_for_testing();
937
938        // Make the cache exterior immutable.
939        let cache = cache;
940
941        cache.insert("a", alice);
942        cache.insert("b", bob);
943        assert_eq!(cache.get(&"a"), Some(alice));
944        assert!(cache.contains_key(&"a"));
945        assert!(cache.contains_key(&"b"));
946        assert_eq!(cache.get(&"b"), Some(bob));
947        cache.run_pending_tasks();
948        // order (LRU -> MRU) and counts: a -> 1, b -> 1
949
950        cache.insert("c", cindy);
951        assert_eq!(cache.get(&"c"), Some(cindy));
952        assert!(cache.contains_key(&"c"));
953        // order and counts: a -> 1, b -> 1, c -> 1
954        cache.run_pending_tasks();
955
956        assert!(cache.contains_key(&"a"));
957        assert_eq!(cache.get(&"a"), Some(alice));
958        assert_eq!(cache.get(&"b"), Some(bob));
959        assert!(cache.contains_key(&"b"));
960        cache.run_pending_tasks();
961        // order and counts: c -> 1, a -> 2, b -> 2
962
963        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
964        // "d" must have higher count than 3, which is the aggregated count
965        // of "a" and "c".
966        cache.insert("d", david); //   count: d -> 0
967        expected.push((Arc::new("d"), david, RemovalCause::Size));
968        cache.run_pending_tasks();
969        assert_eq!(cache.get(&"d"), None); //   d -> 1
970        assert!(!cache.contains_key(&"d"));
971
972        cache.insert("d", david);
973        expected.push((Arc::new("d"), david, RemovalCause::Size));
974        cache.run_pending_tasks();
975        assert!(!cache.contains_key(&"d"));
976        assert_eq!(cache.get(&"d"), None); //   d -> 2
977
978        cache.insert("d", david);
979        expected.push((Arc::new("d"), david, RemovalCause::Size));
980        cache.run_pending_tasks();
981        assert_eq!(cache.get(&"d"), None); //   d -> 3
982        assert!(!cache.contains_key(&"d"));
983
984        cache.insert("d", david);
985        expected.push((Arc::new("d"), david, RemovalCause::Size));
986        cache.run_pending_tasks();
987        assert!(!cache.contains_key(&"d"));
988        assert_eq!(cache.get(&"d"), None); //   d -> 4
989
990        // Finally "d" should be admitted by evicting "c" and "a".
991        cache.insert("d", dennis);
992        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
993        expected.push((Arc::new("a"), alice, RemovalCause::Size));
994        cache.run_pending_tasks();
995        assert_eq!(cache.get(&"a"), None);
996        assert_eq!(cache.get(&"b"), Some(bob));
997        assert_eq!(cache.get(&"c"), None);
998        assert_eq!(cache.get(&"d"), Some(dennis));
999        assert!(!cache.contains_key(&"a"));
1000        assert!(cache.contains_key(&"b"));
1001        assert!(!cache.contains_key(&"c"));
1002        assert!(cache.contains_key(&"d"));
1003
1004        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
1005        cache.insert("b", bill);
1006        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1007        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1008        cache.run_pending_tasks();
1009        assert_eq!(cache.get(&"b"), Some(bill));
1010        assert_eq!(cache.get(&"d"), None);
1011        assert!(cache.contains_key(&"b"));
1012        assert!(!cache.contains_key(&"d"));
1013
1014        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
1015        cache.insert("a", alice);
1016        cache.insert("b", bob);
1017        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1018        cache.run_pending_tasks();
1019        assert_eq!(cache.get(&"a"), Some(alice));
1020        assert_eq!(cache.get(&"b"), Some(bob));
1021        assert_eq!(cache.get(&"d"), None);
1022        assert!(cache.contains_key(&"a"));
1023        assert!(cache.contains_key(&"b"));
1024        assert!(!cache.contains_key(&"d"));
1025
1026        // Verify the sizes.
1027        assert_eq!(cache.entry_count(), 2);
1028        assert_eq!(cache.weighted_size(), 25);
1029
1030        verify_notification_vec(&cache, actual, &expected);
1031        assert!(cache.key_locks_map_is_empty());
1032    }
1033
1034    #[test]
1035    fn basic_multi_threads() {
1036        let num_threads = 4;
1037
1038        let mut cache = SegmentedCache::new(100, num_threads);
1039        cache.reconfigure_for_testing();
1040
1041        // Make the cache exterior immutable.
1042        let cache = cache;
1043
1044        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1045        #[allow(clippy::needless_collect)]
1046        let handles = (0..num_threads)
1047            .map(|id| {
1048                let cache = cache.clone();
1049                std::thread::spawn(move || {
1050                    cache.insert(10, format!("{id}-100"));
1051                    cache.get(&10);
1052                    cache.run_pending_tasks();
1053                    cache.insert(20, format!("{id}-200"));
1054                    cache.invalidate(&10);
1055                })
1056            })
1057            .collect::<Vec<_>>();
1058
1059        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1060
1061        cache.run_pending_tasks();
1062
1063        assert!(cache.get(&10).is_none());
1064        assert!(cache.get(&20).is_some());
1065        assert!(!cache.contains_key(&10));
1066        assert!(cache.contains_key(&20));
1067    }
1068
1069    #[test]
1070    fn invalidate_all() {
1071        use std::collections::HashMap;
1072
1073        // The following `HashMap`s will hold actual and expected notifications.
1074        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1075        let actual = Arc::new(Mutex::new(HashMap::new()));
1076        let mut expected = HashMap::new();
1077
1078        // Create an eviction listener.
1079        let a1 = Arc::clone(&actual);
1080        let listener = move |k, v, cause| {
1081            a1.lock().insert(k, (v, cause));
1082        };
1083
1084        // Create a cache with the eviction listener.
1085        let mut cache = SegmentedCache::builder(4)
1086            .max_capacity(100)
1087            .eviction_listener(listener)
1088            .build();
1089        cache.reconfigure_for_testing();
1090
1091        // Make the cache exterior immutable.
1092        let cache = cache;
1093
1094        cache.insert("a", "alice");
1095        cache.insert("b", "bob");
1096        cache.insert("c", "cindy");
1097        assert_eq!(cache.get(&"a"), Some("alice"));
1098        assert_eq!(cache.get(&"b"), Some("bob"));
1099        assert_eq!(cache.get(&"c"), Some("cindy"));
1100        assert!(cache.contains_key(&"a"));
1101        assert!(cache.contains_key(&"b"));
1102        assert!(cache.contains_key(&"c"));
1103
1104        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
1105        // modified timestamp of the entries were updated when they were inserted.
1106        // https://github.com/moka-rs/moka/issues/155
1107
1108        cache.invalidate_all();
1109        expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1110        expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1111        expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1112        cache.run_pending_tasks();
1113
1114        cache.insert("d", "david");
1115        cache.run_pending_tasks();
1116
1117        assert!(cache.get(&"a").is_none());
1118        assert!(cache.get(&"b").is_none());
1119        assert!(cache.get(&"c").is_none());
1120        assert_eq!(cache.get(&"d"), Some("david"));
1121        assert!(!cache.contains_key(&"a"));
1122        assert!(!cache.contains_key(&"b"));
1123        assert!(!cache.contains_key(&"c"));
1124        assert!(cache.contains_key(&"d"));
1125
1126        verify_notification_map(&cache, actual, &expected);
1127    }
1128
1129    #[test]
1130    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1131        use std::collections::{HashMap, HashSet};
1132
1133        const SEGMENTS: usize = 4;
1134
1135        // The following `HashMap`s will hold actual and expected notifications.
1136        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1137        let actual = Arc::new(Mutex::new(HashMap::new()));
1138        let mut expected = HashMap::new();
1139
1140        // Create an eviction listener.
1141        let a1 = Arc::clone(&actual);
1142        let listener = move |k, v, cause| {
1143            a1.lock().insert(k, (v, cause));
1144        };
1145
1146        let (clock, mock) = crate::common::time::Clock::mock();
1147
1148        // Create a cache with the eviction listener.
1149        let mut cache = SegmentedCache::builder(SEGMENTS)
1150            .max_capacity(100)
1151            .support_invalidation_closures()
1152            .eviction_listener(listener)
1153            .clock(clock)
1154            .build();
1155        cache.reconfigure_for_testing();
1156
1157        // Make the cache exterior immutable.
1158        let cache = cache;
1159
1160        cache.insert(0, "alice");
1161        cache.insert(1, "bob");
1162        cache.insert(2, "alex");
1163        cache.run_pending_tasks();
1164        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
1165        cache.run_pending_tasks();
1166
1167        assert_eq!(cache.get(&0), Some("alice"));
1168        assert_eq!(cache.get(&1), Some("bob"));
1169        assert_eq!(cache.get(&2), Some("alex"));
1170        assert!(cache.contains_key(&0));
1171        assert!(cache.contains_key(&1));
1172        assert!(cache.contains_key(&2));
1173
1174        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1175        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1176        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1177        expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1178        expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1179
1180        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
1181
1182        cache.insert(3, "alice");
1183
1184        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1185        cache.run_pending_tasks(); // To submit the invalidation task.
1186        std::thread::sleep(Duration::from_millis(200));
1187        cache.run_pending_tasks(); // To process the task result.
1188        std::thread::sleep(Duration::from_millis(200));
1189
1190        assert!(cache.get(&0).is_none());
1191        assert!(cache.get(&2).is_none());
1192        assert_eq!(cache.get(&1), Some("bob"));
1193        // This should survive as it was inserted after calling invalidate_entries_if.
1194        assert_eq!(cache.get(&3), Some("alice"));
1195
1196        assert!(!cache.contains_key(&0));
1197        assert!(cache.contains_key(&1));
1198        assert!(!cache.contains_key(&2));
1199        assert!(cache.contains_key(&3));
1200
1201        assert_eq!(cache.entry_count(), 2);
1202        assert_eq!(cache.invalidation_predicate_count(), 0);
1203
1204        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
1205
1206        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1207        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1208        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1209        expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1210        expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1211
1212        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1213        cache.run_pending_tasks(); // To submit the invalidation task.
1214        std::thread::sleep(Duration::from_millis(200));
1215        cache.run_pending_tasks(); // To process the task result.
1216        std::thread::sleep(Duration::from_millis(200));
1217
1218        assert!(cache.get(&1).is_none());
1219        assert!(cache.get(&3).is_none());
1220
1221        assert!(!cache.contains_key(&1));
1222        assert!(!cache.contains_key(&3));
1223
1224        assert_eq!(cache.entry_count(), 0);
1225        assert_eq!(cache.invalidation_predicate_count(), 0);
1226
1227        verify_notification_map(&cache, actual, &expected);
1228
1229        Ok(())
1230    }
1231
1232    #[test]
1233    fn test_iter() {
1234        const NUM_KEYS: usize = 50;
1235
1236        fn make_value(key: usize) -> String {
1237            format!("val: {key}")
1238        }
1239
1240        // let cache = SegmentedCache::builder(5)
1241        let cache = SegmentedCache::builder(4)
1242            .max_capacity(100)
1243            .time_to_idle(Duration::from_secs(10))
1244            .build();
1245
1246        for key in 0..NUM_KEYS {
1247            cache.insert(key, make_value(key));
1248        }
1249
1250        let mut key_set = std::collections::HashSet::new();
1251
1252        for (key, value) in &cache {
1253            assert_eq!(value, make_value(*key));
1254
1255            key_set.insert(*key);
1256        }
1257
1258        // Ensure there are no missing or duplicate keys in the iteration.
1259        assert_eq!(key_set.len(), NUM_KEYS);
1260    }
1261
1262    /// Runs 16 threads at the same time and ensures no deadlock occurs.
1263    ///
1264    /// - Eight of the threads will update key-values in the cache.
1265    /// - Eight others will iterate the cache.
1266    ///
1267    #[test]
1268    fn test_iter_multi_threads() {
1269        use std::collections::HashSet;
1270
1271        const NUM_KEYS: usize = 1024;
1272        const NUM_THREADS: usize = 16;
1273
1274        fn make_value(key: usize) -> String {
1275            format!("val: {key}")
1276        }
1277
1278        let cache = SegmentedCache::builder(4)
1279            .max_capacity(2048)
1280            .time_to_idle(Duration::from_secs(10))
1281            .build();
1282
1283        // Initialize the cache.
1284        for key in 0..NUM_KEYS {
1285            cache.insert(key, make_value(key));
1286        }
1287
1288        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1289        let write_lock = rw_lock.write().unwrap();
1290
1291        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1292        #[allow(clippy::needless_collect)]
1293        let handles = (0..NUM_THREADS)
1294            .map(|n| {
1295                let cache = cache.clone();
1296                let rw_lock = Arc::clone(&rw_lock);
1297
1298                if n % 2 == 0 {
1299                    // This thread will update the cache.
1300                    std::thread::spawn(move || {
1301                        let read_lock = rw_lock.read().unwrap();
1302                        for key in 0..NUM_KEYS {
1303                            // TODO: Update keys in a random order?
1304                            cache.insert(key, make_value(key));
1305                        }
1306                        std::mem::drop(read_lock);
1307                    })
1308                } else {
1309                    // This thread will iterate the cache.
1310                    std::thread::spawn(move || {
1311                        let read_lock = rw_lock.read().unwrap();
1312                        let mut key_set = HashSet::new();
1313                        for (key, value) in &cache {
1314                            assert_eq!(value, make_value(*key));
1315                            key_set.insert(*key);
1316                        }
1317                        // Ensure there are no missing or duplicate keys in the iteration.
1318                        assert_eq!(key_set.len(), NUM_KEYS);
1319                        std::mem::drop(read_lock);
1320                    })
1321                }
1322            })
1323            .collect::<Vec<_>>();
1324
1325        // Let these threads to run by releasing the write lock.
1326        std::mem::drop(write_lock);
1327
1328        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1329
1330        // Ensure there are no missing or duplicate keys in the iteration.
1331        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1332        assert_eq!(key_set.len(), NUM_KEYS);
1333    }
1334
1335    #[test]
1336    fn get_with() {
1337        use std::thread::{sleep, spawn};
1338
1339        let cache = SegmentedCache::new(100, 4);
1340        const KEY: u32 = 0;
1341
1342        // This test will run five threads:
1343        //
1344        // Thread1 will be the first thread to call `get_with` for a key, so its init
1345        // closure will be evaluated and then a &str value "thread1" will be inserted
1346        // to the cache.
1347        let thread1 = {
1348            let cache1 = cache.clone();
1349            spawn(move || {
1350                // Call `get_with` immediately.
1351                let v = cache1.get_with(KEY, || {
1352                    // Wait for 300 ms and return a &str value.
1353                    sleep(Duration::from_millis(300));
1354                    "thread1"
1355                });
1356                assert_eq!(v, "thread1");
1357            })
1358        };
1359
1360        // Thread2 will be the second thread to call `get_with` for the same key, so
1361        // its init closure will not be evaluated. Once thread1's init closure
1362        // finishes, it will get the value inserted by thread1's init closure.
1363        let thread2 = {
1364            let cache2 = cache.clone();
1365            spawn(move || {
1366                // Wait for 100 ms before calling `get_with`.
1367                sleep(Duration::from_millis(100));
1368                let v = cache2.get_with(KEY, || unreachable!());
1369                assert_eq!(v, "thread1");
1370            })
1371        };
1372
1373        // Thread3 will be the third thread to call `get_with` for the same key. By
1374        // the time it calls, thread1's init closure should have finished already and
1375        // the value should be already inserted to the cache. So its init closure
1376        // will not be evaluated and will get the value insert by thread1's init
1377        // closure immediately.
1378        let thread3 = {
1379            let cache3 = cache.clone();
1380            spawn(move || {
1381                // Wait for 400 ms before calling `get_with`.
1382                sleep(Duration::from_millis(400));
1383                let v = cache3.get_with(KEY, || unreachable!());
1384                assert_eq!(v, "thread1");
1385            })
1386        };
1387
1388        // Thread4 will call `get` for the same key. It will call when thread1's init
1389        // closure is still running, so it will get none for the key.
1390        let thread4 = {
1391            let cache4 = cache.clone();
1392            spawn(move || {
1393                // Wait for 200 ms before calling `get`.
1394                sleep(Duration::from_millis(200));
1395                let maybe_v = cache4.get(&KEY);
1396                assert!(maybe_v.is_none());
1397            })
1398        };
1399
1400        // Thread5 will call `get` for the same key. It will call after thread1's init
1401        // closure finished, so it will get the value insert by thread1's init closure.
1402        let thread5 = {
1403            let cache5 = cache.clone();
1404            spawn(move || {
1405                // Wait for 400 ms before calling `get`.
1406                sleep(Duration::from_millis(400));
1407                let maybe_v = cache5.get(&KEY);
1408                assert_eq!(maybe_v, Some("thread1"));
1409            })
1410        };
1411
1412        for t in [thread1, thread2, thread3, thread4, thread5] {
1413            t.join().expect("Failed to join");
1414        }
1415
1416        assert!(cache.is_waiter_map_empty());
1417    }
1418
1419    #[test]
1420    fn get_with_if() {
1421        use std::thread::{sleep, spawn};
1422
1423        let cache = SegmentedCache::new(100, 4);
1424        const KEY: u32 = 0;
1425
1426        // This test will run seven threads:
1427        //
1428        // Thread1 will be the first thread to call `get_with_if` for a key, so its
1429        // init closure will be evaluated and then a &str value "thread1" will be
1430        // inserted to the cache.
1431        let thread1 = {
1432            let cache1 = cache.clone();
1433            spawn(move || {
1434                // Call `get_with` immediately.
1435                let v = cache1.get_with_if(
1436                    KEY,
1437                    || {
1438                        // Wait for 300 ms and return a &str value.
1439                        sleep(Duration::from_millis(300));
1440                        "thread1"
1441                    },
1442                    |_v| unreachable!(),
1443                );
1444                assert_eq!(v, "thread1");
1445            })
1446        };
1447
1448        // Thread2 will be the second thread to call `get_with_if` for the same key,
1449        // so its init closure will not be evaluated. Once thread1's init closure
1450        // finishes, it will get the value inserted by thread1's init closure.
1451        let thread2 = {
1452            let cache2 = cache.clone();
1453            spawn(move || {
1454                // Wait for 100 ms before calling `get_with`.
1455                sleep(Duration::from_millis(100));
1456                let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1457                assert_eq!(v, "thread1");
1458            })
1459        };
1460
1461        // Thread3 will be the third thread to call `get_with_if` for the same
1462        // key. By the time it calls, thread1's init closure should have finished
1463        // already and the value should be already inserted to the cache. Also
1464        // thread3's `replace_if` closure returns `false`. So its init closure will
1465        // not be evaluated and will get the value inserted by thread1's init closure
1466        // immediately.
1467        let thread3 = {
1468            let cache3 = cache.clone();
1469            spawn(move || {
1470                // Wait for 350 ms before calling `get_with_if`.
1471                sleep(Duration::from_millis(350));
1472                let v = cache3.get_with_if(
1473                    KEY,
1474                    || unreachable!(),
1475                    |v| {
1476                        assert_eq!(v, &"thread1");
1477                        false
1478                    },
1479                );
1480                assert_eq!(v, "thread1");
1481            })
1482        };
1483
1484        // Thread4 will be the fourth thread to call `get_with_if` for the same
1485        // key. The value should have been already inserted to the cache by
1486        // thread1. However thread4's `replace_if` closure returns `true`. So its
1487        // init closure will be evaluated to replace the current value.
1488        let thread4 = {
1489            let cache4 = cache.clone();
1490            spawn(move || {
1491                // Wait for 400 ms before calling `get_with_if`.
1492                sleep(Duration::from_millis(400));
1493                let v = cache4.get_with_if(
1494                    KEY,
1495                    || "thread4",
1496                    |v| {
1497                        assert_eq!(v, &"thread1");
1498                        true
1499                    },
1500                );
1501                assert_eq!(v, "thread4");
1502            })
1503        };
1504
1505        // Thread5 will call `get` for the same key. It will call when thread1's init
1506        // closure is still running, so it will get none for the key.
1507        let thread5 = {
1508            let cache5 = cache.clone();
1509            spawn(move || {
1510                // Wait for 200 ms before calling `get`.
1511                sleep(Duration::from_millis(200));
1512                let maybe_v = cache5.get(&KEY);
1513                assert!(maybe_v.is_none());
1514            })
1515        };
1516
1517        // Thread6 will call `get` for the same key. It will call when thread1's init
1518        // closure is still running, so it will get none for the key.
1519        let thread6 = {
1520            let cache6 = cache.clone();
1521            spawn(move || {
1522                // Wait for 200 ms before calling `get`.
1523                sleep(Duration::from_millis(350));
1524                let maybe_v = cache6.get(&KEY);
1525                assert_eq!(maybe_v, Some("thread1"));
1526            })
1527        };
1528
1529        // Thread7 will call `get` for the same key. It will call after thread1's init
1530        // closure finished, so it will get the value insert by thread1's init closure.
1531        let thread7 = {
1532            let cache7 = cache.clone();
1533            spawn(move || {
1534                // Wait for 400 ms before calling `get`.
1535                sleep(Duration::from_millis(450));
1536                let maybe_v = cache7.get(&KEY);
1537                assert_eq!(maybe_v, Some("thread4"));
1538            })
1539        };
1540
1541        for t in [
1542            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1543        ] {
1544            t.join().expect("Failed to join");
1545        }
1546
1547        assert!(cache.is_waiter_map_empty());
1548    }
1549
1550    #[test]
1551    fn try_get_with() {
1552        use std::{
1553            sync::Arc,
1554            thread::{sleep, spawn},
1555        };
1556
1557        #[derive(Debug)]
1558        pub struct MyError(String);
1559
1560        impl Display for MyError {
1561            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1562                write!(f, "{}", self.0)
1563            }
1564        }
1565
1566        impl Error for MyError {}
1567
1568        type MyResult<T> = Result<T, Arc<MyError>>;
1569
1570        let cache = SegmentedCache::new(100, 4);
1571        const KEY: u32 = 0;
1572
1573        // This test will run eight threads:
1574        //
1575        // Thread1 will be the first thread to call `try_get_with` for a key, so its
1576        // init closure will be evaluated and then an error will be returned. Nothing
1577        // will be inserted to the cache.
1578        let thread1 = {
1579            let cache1 = cache.clone();
1580            spawn(move || {
1581                // Call `try_get_with` immediately.
1582                let v = cache1.try_get_with(KEY, || {
1583                    // Wait for 300 ms and return an error.
1584                    sleep(Duration::from_millis(300));
1585                    Err(MyError("thread1 error".into()))
1586                });
1587                assert!(v.is_err());
1588            })
1589        };
1590
1591        // Thread2 will be the second thread to call `try_get_with` for the same key,
1592        // so its init closure will not be evaluated. Once thread1's init closure
1593        // finishes, it will get the same error value returned by thread1's init
1594        // closure.
1595        let thread2 = {
1596            let cache2 = cache.clone();
1597            spawn(move || {
1598                // Wait for 100 ms before calling `try_get_with`.
1599                sleep(Duration::from_millis(100));
1600                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1601                assert!(v.is_err());
1602            })
1603        };
1604
1605        // Thread3 will be the third thread to call `get_with` for the same key. By
1606        // the time it calls, thread1's init closure should have finished already,
1607        // but the key still does not exist in the cache. So its init closure will be
1608        // evaluated and then an okay &str value will be returned. That value will be
1609        // inserted to the cache.
1610        let thread3 = {
1611            let cache3 = cache.clone();
1612            spawn(move || {
1613                // Wait for 400 ms before calling `try_get_with`.
1614                sleep(Duration::from_millis(400));
1615                let v: MyResult<_> = cache3.try_get_with(KEY, || {
1616                    // Wait for 300 ms and return an Ok(&str) value.
1617                    sleep(Duration::from_millis(300));
1618                    Ok("thread3")
1619                });
1620                assert_eq!(v.unwrap(), "thread3");
1621            })
1622        };
1623
1624        // thread4 will be the fourth thread to call `try_get_with` for the same
1625        // key. So its init closure will not be evaluated. Once thread3's init
1626        // closure finishes, it will get the same okay &str value.
1627        let thread4 = {
1628            let cache4 = cache.clone();
1629            spawn(move || {
1630                // Wait for 500 ms before calling `try_get_with`.
1631                sleep(Duration::from_millis(500));
1632                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1633                assert_eq!(v.unwrap(), "thread3");
1634            })
1635        };
1636
1637        // Thread5 will be the fifth thread to call `try_get_with` for the same
1638        // key. So its init closure will not be evaluated. By the time it calls,
1639        // thread3's init closure should have finished already, so its init closure
1640        // will not be evaluated and will get the value insert by thread3's init
1641        // closure immediately.
1642        let thread5 = {
1643            let cache5 = cache.clone();
1644            spawn(move || {
1645                // Wait for 800 ms before calling `try_get_with`.
1646                sleep(Duration::from_millis(800));
1647                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1648                assert_eq!(v.unwrap(), "thread3");
1649            })
1650        };
1651
1652        // Thread6 will call `get` for the same key. It will call when thread1's init
1653        // closure is still running, so it will get none for the key.
1654        let thread6 = {
1655            let cache6 = cache.clone();
1656            spawn(move || {
1657                // Wait for 200 ms before calling `get`.
1658                sleep(Duration::from_millis(200));
1659                let maybe_v = cache6.get(&KEY);
1660                assert!(maybe_v.is_none());
1661            })
1662        };
1663
1664        // Thread7 will call `get` for the same key. It will call after thread1's init
1665        // closure finished with an error. So it will get none for the key.
1666        let thread7 = {
1667            let cache7 = cache.clone();
1668            spawn(move || {
1669                // Wait for 400 ms before calling `get`.
1670                sleep(Duration::from_millis(400));
1671                let maybe_v = cache7.get(&KEY);
1672                assert!(maybe_v.is_none());
1673            })
1674        };
1675
1676        // Thread8 will call `get` for the same key. It will call after thread3's init
1677        // closure finished, so it will get the value insert by thread3's init closure.
1678        let thread8 = {
1679            let cache8 = cache.clone();
1680            spawn(move || {
1681                // Wait for 800 ms before calling `get`.
1682                sleep(Duration::from_millis(800));
1683                let maybe_v = cache8.get(&KEY);
1684                assert_eq!(maybe_v, Some("thread3"));
1685            })
1686        };
1687
1688        for t in [
1689            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1690        ] {
1691            t.join().expect("Failed to join");
1692        }
1693
1694        assert!(cache.is_waiter_map_empty());
1695    }
1696
1697    #[test]
1698    fn optionally_get_with() {
1699        use std::thread::{sleep, spawn};
1700
1701        let cache = SegmentedCache::new(100, 4);
1702        const KEY: u32 = 0;
1703
1704        // This test will run eight threads:
1705        //
1706        // Thread1 will be the first thread to call `optionally_get_with` for a key, so its
1707        // init closure will be evaluated and then an error will be returned. Nothing
1708        // will be inserted to the cache.
1709        let thread1 = {
1710            let cache1 = cache.clone();
1711            spawn(move || {
1712                // Call `optionally_get_with` immediately.
1713                let v = cache1.optionally_get_with(KEY, || {
1714                    // Wait for 300 ms and return an error.
1715                    sleep(Duration::from_millis(300));
1716                    None
1717                });
1718                assert!(v.is_none());
1719            })
1720        };
1721
1722        // Thread2 will be the second thread to call `optionally_get_with` for the same key,
1723        // so its init closure will not be evaluated. Once thread1's init closure
1724        // finishes, it will get the same error value returned by thread1's init
1725        // closure.
1726        let thread2 = {
1727            let cache2 = cache.clone();
1728            spawn(move || {
1729                // Wait for 100 ms before calling `optionally_get_with`.
1730                sleep(Duration::from_millis(100));
1731                let v = cache2.optionally_get_with(KEY, || unreachable!());
1732                assert!(v.is_none());
1733            })
1734        };
1735
1736        // Thread3 will be the third thread to call `get_with` for the same key. By
1737        // the time it calls, thread1's init closure should have finished already,
1738        // but the key still does not exist in the cache. So its init closure will be
1739        // evaluated and then an okay &str value will be returned. That value will be
1740        // inserted to the cache.
1741        let thread3 = {
1742            let cache3 = cache.clone();
1743            spawn(move || {
1744                // Wait for 400 ms before calling `optionally_get_with`.
1745                sleep(Duration::from_millis(400));
1746                let v = cache3.optionally_get_with(KEY, || {
1747                    // Wait for 300 ms and return an Ok(&str) value.
1748                    sleep(Duration::from_millis(300));
1749                    Some("thread3")
1750                });
1751                assert_eq!(v.unwrap(), "thread3");
1752            })
1753        };
1754
1755        // thread4 will be the fourth thread to call `optionally_get_with` for the same
1756        // key. So its init closure will not be evaluated. Once thread3's init
1757        // closure finishes, it will get the same okay &str value.
1758        let thread4 = {
1759            let cache4 = cache.clone();
1760            spawn(move || {
1761                // Wait for 500 ms before calling `optionally_get_with`.
1762                sleep(Duration::from_millis(500));
1763                let v = cache4.optionally_get_with(KEY, || unreachable!());
1764                assert_eq!(v.unwrap(), "thread3");
1765            })
1766        };
1767
1768        // Thread5 will be the fifth thread to call `optionally_get_with` for the same
1769        // key. So its init closure will not be evaluated. By the time it calls,
1770        // thread3's init closure should have finished already, so its init closure
1771        // will not be evaluated and will get the value insert by thread3's init
1772        // closure immediately.
1773        let thread5 = {
1774            let cache5 = cache.clone();
1775            spawn(move || {
1776                // Wait for 800 ms before calling `optionally_get_with`.
1777                sleep(Duration::from_millis(800));
1778                let v = cache5.optionally_get_with(KEY, || unreachable!());
1779                assert_eq!(v.unwrap(), "thread3");
1780            })
1781        };
1782
1783        // Thread6 will call `get` for the same key. It will call when thread1's init
1784        // closure is still running, so it will get none for the key.
1785        let thread6 = {
1786            let cache6 = cache.clone();
1787            spawn(move || {
1788                // Wait for 200 ms before calling `get`.
1789                sleep(Duration::from_millis(200));
1790                let maybe_v = cache6.get(&KEY);
1791                assert!(maybe_v.is_none());
1792            })
1793        };
1794
1795        // Thread7 will call `get` for the same key. It will call after thread1's init
1796        // closure finished with an error. So it will get none for the key.
1797        let thread7 = {
1798            let cache7 = cache.clone();
1799            spawn(move || {
1800                // Wait for 400 ms before calling `get`.
1801                sleep(Duration::from_millis(400));
1802                let maybe_v = cache7.get(&KEY);
1803                assert!(maybe_v.is_none());
1804            })
1805        };
1806
1807        // Thread8 will call `get` for the same key. It will call after thread3's init
1808        // closure finished, so it will get the value insert by thread3's init closure.
1809        let thread8 = {
1810            let cache8 = cache.clone();
1811            spawn(move || {
1812                // Wait for 800 ms before calling `get`.
1813                sleep(Duration::from_millis(800));
1814                let maybe_v = cache8.get(&KEY);
1815                assert_eq!(maybe_v, Some("thread3"));
1816            })
1817        };
1818
1819        for t in [
1820            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1821        ] {
1822            t.join().expect("Failed to join");
1823        }
1824
1825        assert!(cache.is_waiter_map_empty());
1826    }
1827
1828    // This test ensures that the `contains_key`, `get` and `invalidate` can use
1829    // borrowed form `&[u8]` for key with type `Vec<u8>`.
1830    // https://github.com/moka-rs/moka/issues/166
1831    #[test]
1832    fn borrowed_forms_of_key() {
1833        let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1834
1835        let key = vec![1_u8];
1836        cache.insert(key.clone(), ());
1837
1838        // key as &Vec<u8>
1839        let key_v: &Vec<u8> = &key;
1840        assert!(cache.contains_key(key_v));
1841        assert_eq!(cache.get(key_v), Some(()));
1842        cache.invalidate(key_v);
1843
1844        cache.insert(key, ());
1845
1846        // key as &[u8]
1847        let key_s: &[u8] = &[1_u8];
1848        assert!(cache.contains_key(key_s));
1849        assert_eq!(cache.get(key_s), Some(()));
1850        cache.invalidate(key_s);
1851    }
1852
1853    // Ignored by default. This test becomes unstable when run in parallel with
1854    // other tests.
1855    #[test]
1856    #[ignore]
1857    fn drop_value_immediately_after_eviction() {
1858        use crate::common::test_utils::{Counters, Value};
1859
1860        const NUM_SEGMENTS: usize = 1;
1861        const MAX_CAPACITY: u32 = 500;
1862        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1863
1864        let counters = Arc::new(Counters::default());
1865        let counters1 = Arc::clone(&counters);
1866
1867        let listener = move |_k, _v, cause| match cause {
1868            RemovalCause::Size => counters1.incl_evicted(),
1869            RemovalCause::Explicit => counters1.incl_invalidated(),
1870            _ => (),
1871        };
1872
1873        let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1874            .max_capacity(MAX_CAPACITY as u64)
1875            .eviction_listener(listener)
1876            .build();
1877        cache.reconfigure_for_testing();
1878
1879        // Make the cache exterior immutable.
1880        let cache = cache;
1881
1882        for key in 0..KEYS {
1883            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1884            cache.insert(key, value);
1885            counters.incl_inserted();
1886            cache.run_pending_tasks();
1887        }
1888
1889        let eviction_count = KEYS - MAX_CAPACITY;
1890
1891        cache.run_pending_tasks();
1892        assert_eq!(counters.inserted(), KEYS, "inserted");
1893        assert_eq!(counters.value_created(), KEYS, "value_created");
1894        assert_eq!(counters.evicted(), eviction_count, "evicted");
1895        assert_eq!(counters.invalidated(), 0, "invalidated");
1896        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1897
1898        for key in 0..KEYS {
1899            cache.invalidate(&key);
1900            cache.run_pending_tasks();
1901        }
1902
1903        cache.run_pending_tasks();
1904        assert_eq!(counters.inserted(), KEYS, "inserted");
1905        assert_eq!(counters.value_created(), KEYS, "value_created");
1906        assert_eq!(counters.evicted(), eviction_count, "evicted");
1907        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1908        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1909
1910        std::mem::drop(cache);
1911        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1912    }
1913
1914    #[test]
1915    fn test_debug_format() {
1916        let cache = SegmentedCache::new(10, 4);
1917        cache.insert('a', "alice");
1918        cache.insert('b', "bob");
1919        cache.insert('c', "cindy");
1920
1921        let debug_str = format!("{cache:?}");
1922        assert!(debug_str.starts_with('{'));
1923        assert!(debug_str.contains(r#"'a': "alice""#));
1924        assert!(debug_str.contains(r#"'b': "bob""#));
1925        assert!(debug_str.contains(r#"'c': "cindy""#));
1926        assert!(debug_str.ends_with('}'));
1927    }
1928
1929    type NotificationPair<V> = (V, RemovalCause);
1930    type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1931
1932    fn verify_notification_vec<K, V, S>(
1933        cache: &SegmentedCache<K, V, S>,
1934        actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1935        expected: &[NotificationTriple<K, V>],
1936    ) where
1937        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1938        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1939        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1940    {
1941        // Retries will be needed when testing in a QEMU VM.
1942        const MAX_RETRIES: usize = 5;
1943        let mut retries = 0;
1944        loop {
1945            // Ensure all scheduled notifications have been processed.
1946            std::thread::sleep(Duration::from_millis(500));
1947
1948            let actual = &*actual.lock();
1949            if actual.len() != expected.len() {
1950                if retries <= MAX_RETRIES {
1951                    retries += 1;
1952                    cache.run_pending_tasks();
1953                    continue;
1954                } else {
1955                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1956                }
1957            }
1958
1959            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1960                assert_eq!(actual, expected, "expected[{i}]");
1961            }
1962
1963            break;
1964        }
1965    }
1966
1967    fn verify_notification_map<K, V, S>(
1968        cache: &SegmentedCache<K, V, S>,
1969        actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1970        expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1971    ) where
1972        K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1973        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1974        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1975    {
1976        // Retries will be needed when testing in a QEMU VM.
1977        const MAX_RETRIES: usize = 5;
1978        let mut retries = 0;
1979        loop {
1980            // Ensure all scheduled notifications have been processed.
1981            std::thread::sleep(Duration::from_millis(500));
1982
1983            let actual = &*actual.lock();
1984            if actual.len() != expected.len() {
1985                if retries <= MAX_RETRIES {
1986                    retries += 1;
1987                    cache.run_pending_tasks();
1988                    continue;
1989                } else {
1990                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1991                }
1992            }
1993
1994            for actual_key in actual.keys() {
1995                assert_eq!(
1996                    actual.get(actual_key),
1997                    expected.get(actual_key),
1998                    "expected[{actual_key}]",
1999                );
2000            }
2001
2002            break;
2003        }
2004    }
2005}