moka/sync/
segment.rs

1use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
2use crate::common::concurrent::Weigher;
3use crate::common::time::Clock;
4use crate::{
5    common::HousekeeperConfig,
6    notification::EvictionListener,
7    policy::{EvictionPolicy, ExpirationPolicy},
8    sync_base::iter::{Iter, ScanningGet},
9    Entry, Policy, PredicateError,
10};
11
12use std::{
13    borrow::Borrow,
14    collections::hash_map::RandomState,
15    fmt,
16    hash::{BuildHasher, Hash, Hasher},
17    sync::Arc,
18};
19
20/// A thread-safe concurrent in-memory cache, with multiple internal segments.
21///
22/// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for
23/// increased concurrent update performance. However, it has little overheads on
24/// retrievals and updates for managing these segments.
25///
26/// For usage examples, see the document of the [`Cache`][cache-struct].
27///
28/// [cache-struct]: ./struct.Cache.html
29///
30pub struct SegmentedCache<K, V, S = RandomState> {
31    inner: Arc<Inner<K, V, S>>,
32}
33
34// TODO: https://github.com/moka-rs/moka/issues/54
35#[allow(clippy::non_send_fields_in_send_ty)]
36unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
37where
38    K: Send + Sync,
39    V: Send + Sync,
40    S: Send,
41{
42}
43
44unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
45where
46    K: Send + Sync,
47    V: Send + Sync,
48    S: Sync,
49{
50}
51
52impl<K, V, S> Clone for SegmentedCache<K, V, S> {
53    /// Makes a clone of this shared cache.
54    ///
55    /// This operation is cheap as it only creates thread-safe reference counted
56    /// pointers to the shared internal data structures.
57    fn clone(&self) -> Self {
58        Self {
59            inner: Arc::clone(&self.inner),
60        }
61    }
62}
63
64impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
65where
66    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
67    V: fmt::Debug + Clone + Send + Sync + 'static,
68    // TODO: Remove these bounds from S.
69    S: BuildHasher + Clone + Send + Sync + 'static,
70{
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        let mut d_map = f.debug_map();
73
74        for (k, v) in self {
75            d_map.entry(&k, &v);
76        }
77
78        d_map.finish()
79    }
80}
81
82impl<K, V> SegmentedCache<K, V, RandomState>
83where
84    K: Hash + Eq + Send + Sync + 'static,
85    V: Clone + Send + Sync + 'static,
86{
87    /// Constructs a new `SegmentedCache<K, V>` that has multiple internal
88    /// segments and will store up to the `max_capacity`.
89    ///
90    /// To adjust various configuration knobs such as `initial_capacity` or
91    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
92    ///
93    /// [builder-struct]: ./struct.CacheBuilder.html
94    ///
95    /// # Panics
96    ///
97    /// Panics if `num_segments` is 0.
98    pub fn new(max_capacity: u64, num_segments: usize) -> Self {
99        let build_hasher = RandomState::default();
100        Self::with_everything(
101            None,
102            Some(max_capacity),
103            None,
104            num_segments,
105            build_hasher,
106            None,
107            EvictionPolicy::default(),
108            None,
109            ExpirationPolicy::default(),
110            HousekeeperConfig::default(),
111            false,
112            Clock::default(),
113        )
114    }
115
116    /// Returns a [`CacheBuilder`][builder-struct], which can builds a
117    /// `SegmentedCache` with various configuration knobs.
118    ///
119    /// [builder-struct]: ./struct.CacheBuilder.html
120    pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
121        CacheBuilder::default().segments(num_segments)
122    }
123}
124
125impl<K, V, S> SegmentedCache<K, V, S> {
126    /// Returns cache’s name.
127    pub fn name(&self) -> Option<&str> {
128        self.inner.segments[0].name()
129    }
130
131    /// Returns a read-only cache policy of this cache.
132    ///
133    /// At this time, cache policy cannot be modified after cache creation.
134    /// A future version may support to modify it.
135    pub fn policy(&self) -> Policy {
136        let mut policy = self.inner.segments[0].policy();
137        policy.set_max_capacity(self.inner.desired_capacity);
138        policy.set_num_segments(self.inner.segments.len());
139        policy
140    }
141
142    /// Returns an approximate number of entries in this cache.
143    ///
144    /// The value returned is _an estimate_; the actual count may differ if there are
145    /// concurrent insertions or removals, or if some entries are pending removal due
146    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
147    /// first.
148    ///
149    /// # Example
150    ///
151    /// ```rust
152    /// use moka::sync::SegmentedCache;
153    ///
154    /// let cache = SegmentedCache::new(10, 4);
155    /// cache.insert('n', "Netherland Dwarf");
156    /// cache.insert('l', "Lop Eared");
157    /// cache.insert('d', "Dutch");
158    ///
159    /// // Ensure an entry exists.
160    /// assert!(cache.contains_key(&'n'));
161    ///
162    /// // However, followings may print stale number zeros instead of threes.
163    /// println!("{}", cache.entry_count());   // -> 0
164    /// println!("{}", cache.weighted_size()); // -> 0
165    ///
166    /// // To mitigate the inaccuracy, call `run_pending_tasks` method to run
167    /// // pending internal tasks.
168    /// cache.run_pending_tasks();
169    ///
170    /// // Followings will print the actual numbers.
171    /// println!("{}", cache.entry_count());   // -> 3
172    /// println!("{}", cache.weighted_size()); // -> 3
173    /// ```
174    ///
175    pub fn entry_count(&self) -> u64 {
176        self.inner
177            .segments
178            .iter()
179            .map(|seg| seg.entry_count())
180            .sum()
181    }
182
183    /// Returns an approximate total weighted size of entries in this cache.
184    ///
185    /// The value returned is _an estimate_; the actual size may differ if there are
186    /// concurrent insertions or removals, or if some entries are pending removal due
187    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
188    /// first. See [`entry_count`](#method.entry_count) for a sample code.
189    pub fn weighted_size(&self) -> u64 {
190        self.inner
191            .segments
192            .iter()
193            .map(|seg| seg.weighted_size())
194            .sum()
195    }
196}
197
198impl<K, V, S> SegmentedCache<K, V, S>
199where
200    K: Hash + Eq + Send + Sync + 'static,
201    V: Clone + Send + Sync + 'static,
202    S: BuildHasher + Clone + Send + Sync + 'static,
203{
204    /// # Panics
205    ///
206    /// Panics if `num_segments` is 0.
207    #[allow(clippy::too_many_arguments)]
208    pub(crate) fn with_everything(
209        name: Option<String>,
210        max_capacity: Option<u64>,
211        initial_capacity: Option<usize>,
212        num_segments: usize,
213        build_hasher: S,
214        weigher: Option<Weigher<K, V>>,
215        eviction_policy: EvictionPolicy,
216        eviction_listener: Option<EvictionListener<K, V>>,
217        expiration_policy: ExpirationPolicy<K, V>,
218        housekeeper_config: HousekeeperConfig,
219        invalidator_enabled: bool,
220        clock: Clock,
221    ) -> Self {
222        Self {
223            inner: Arc::new(Inner::new(
224                name,
225                max_capacity,
226                initial_capacity,
227                num_segments,
228                build_hasher,
229                weigher,
230                eviction_policy,
231                eviction_listener,
232                expiration_policy,
233                housekeeper_config,
234                invalidator_enabled,
235                clock,
236            )),
237        }
238    }
239
240    /// Returns `true` if the cache contains a value for the key.
241    ///
242    /// Unlike the `get` method, this method is not considered a cache read operation,
243    /// so it does not update the historic popularity estimator or reset the idle
244    /// timer for the key.
245    ///
246    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
247    /// on the borrowed form _must_ match those for the key type.
248    pub fn contains_key<Q>(&self, key: &Q) -> bool
249    where
250        K: Borrow<Q>,
251        Q: Hash + Eq + ?Sized,
252    {
253        let hash = self.inner.hash(key);
254        self.inner.select(hash).contains_key_with_hash(key, hash)
255    }
256
257    /// Returns a _clone_ of the value corresponding to the key.
258    ///
259    /// If you want to store values that will be expensive to clone, wrap them by
260    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
261    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
262    ///
263    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
264    /// on the borrowed form _must_ match those for the key type.
265    ///
266    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
267    pub fn get<Q>(&self, key: &Q) -> Option<V>
268    where
269        K: Borrow<Q>,
270        Q: Hash + Eq + ?Sized,
271    {
272        let hash = self.inner.hash(key);
273        self.inner
274            .select(hash)
275            .get_with_hash(key, hash, false)
276            .map(Entry::into_value)
277    }
278
279    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280    where
281        K: Hash + Eq,
282    {
283        let hash = self.inner.hash(&key);
284        let cache = self.inner.select(hash);
285        OwnedKeyEntrySelector::new(key, hash, cache)
286    }
287
288    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289    where
290        K: Borrow<Q>,
291        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
292    {
293        let hash = self.inner.hash(key);
294        let cache = self.inner.select(hash);
295        RefKeyEntrySelector::new(key, hash, cache)
296    }
297
298    /// TODO: Remove this in v0.13.0.
299    /// Deprecated, replaced with [`get_with`](#method.get_with)
300    #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
301    pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
302        self.get_with(key, init)
303    }
304
305    /// TODO: Remove this in v0.13.0.
306    /// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
307    #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
308    pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
309    where
310        F: FnOnce() -> Result<V, E>,
311        E: Send + Sync + 'static,
312    {
313        self.try_get_with(key, init)
314    }
315
316    /// Returns a _clone_ of the value corresponding to the key. If the value does
317    /// not exist, evaluates the `init` closure and inserts the output.
318    ///
319    /// # Concurrent calls on the same key
320    ///
321    /// This method guarantees that concurrent calls on the same not-existing key are
322    /// coalesced into one evaluation of the `init` closure. Only one of the calls
323    /// evaluates its closure, and other calls wait for that closure to complete. See
324    /// [`Cache::get_with`][get-with-method] for more details.
325    ///
326    /// [get-with-method]: ./struct.Cache.html#method.get_with
327    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
328        let hash = self.inner.hash(&key);
329        let key = Arc::new(key);
330        let replace_if = None as Option<fn(&V) -> bool>;
331        self.inner
332            .select(hash)
333            .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
334            .into_value()
335    }
336
337    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
338    /// key, you can pass a reference to the key. If the key does not exist in the
339    /// cache, the key will be cloned to create new entry in the cache.
340    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
341    where
342        K: Borrow<Q>,
343        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
344    {
345        let hash = self.inner.hash(key);
346        let replace_if = None as Option<fn(&V) -> bool>;
347        self.inner
348            .select(hash)
349            .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
350            .into_value()
351    }
352
353    /// Works like [`get_with`](#method.get_with), but takes an additional
354    /// `replace_if` closure.
355    ///
356    /// This method will evaluate the `init` closure and insert the output to the
357    /// cache when:
358    ///
359    /// - The key does not exist.
360    /// - Or, `replace_if` closure returns `true`.
361    pub fn get_with_if(
362        &self,
363        key: K,
364        init: impl FnOnce() -> V,
365        replace_if: impl FnMut(&V) -> bool,
366    ) -> V {
367        let hash = self.inner.hash(&key);
368        let key = Arc::new(key);
369        self.inner
370            .select(hash)
371            .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
372            .into_value()
373    }
374
375    /// Returns a _clone_ of the value corresponding to the key. If the value does
376    /// not exist, evaluates the `init` closure, and inserts the value if
377    /// `Some(value)` was returned. If `None` was returned from the closure, this
378    /// method does not insert a value and returns `None`.
379    ///
380    /// # Concurrent calls on the same key
381    ///
382    /// This method guarantees that concurrent calls on the same not-existing key are
383    /// coalesced into one evaluation of the `init` closure. Only one of the calls
384    /// evaluates its closure, and other calls wait for that closure to complete.
385    /// See [`Cache::optionally_get_with`][opt-get-with-method] for more details.
386    ///
387    /// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with
388    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
389    where
390        F: FnOnce() -> Option<V>,
391    {
392        let hash = self.inner.hash(&key);
393        let key = Arc::new(key);
394        self.inner
395            .select(hash)
396            .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
397            .map(Entry::into_value)
398    }
399
400    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
401    /// of passing an owned key, you can pass a reference to the key. If the key does
402    /// not exist in the cache, the key will be cloned to create new entry in the
403    /// cache.
404    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
405    where
406        F: FnOnce() -> Option<V>,
407        K: Borrow<Q>,
408        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
409    {
410        let hash = self.inner.hash(key);
411        self.inner
412            .select(hash)
413            .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
414            .map(Entry::into_value)
415    }
416
417    /// Returns a _clone_ of the value corresponding to the key. If the value does
418    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
419    /// was returned. If `Err(_)` was returned from the closure, this method does not
420    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
421    ///
422    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
423    ///
424    /// # Concurrent calls on the same key
425    ///
426    /// This method guarantees that concurrent calls on the same not-existing key are
427    /// coalesced into one evaluation of the `init` closure (as long as these
428    /// closures return the same error type). Only one of the calls evaluates its
429    /// closure, and other calls wait for that closure to complete. See
430    /// [`Cache::try_get_with`][try-get-with-method] for more details.
431    ///
432    /// [try-get-with-method]: ./struct.Cache.html#method.try_get_with
433    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
434    where
435        F: FnOnce() -> Result<V, E>,
436        E: Send + Sync + 'static,
437    {
438        let hash = self.inner.hash(&key);
439        let key = Arc::new(key);
440        self.inner
441            .select(hash)
442            .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
443            .map(Entry::into_value)
444    }
445
446    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
447    /// owned key, you can pass a reference to the key. If the key does not exist in
448    /// the cache, the key will be cloned to create new entry in the cache.
449    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
450    where
451        F: FnOnce() -> Result<V, E>,
452        E: Send + Sync + 'static,
453        K: Borrow<Q>,
454        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
455    {
456        let hash = self.inner.hash(key);
457        self.inner
458            .select(hash)
459            .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
460            .map(Entry::into_value)
461    }
462
463    /// Inserts a key-value pair into the cache.
464    ///
465    /// If the cache has this key present, the value is updated.
466    pub fn insert(&self, key: K, value: V) {
467        let hash = self.inner.hash(&key);
468        let key = Arc::new(key);
469        self.inner.select(hash).insert_with_hash(key, hash, value);
470    }
471
472    /// Discards any cached value for the key.
473    ///
474    /// If you need to get a the value that has been discarded, use the
475    /// [`remove`](#method.remove) method instead.
476    ///
477    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
478    /// on the borrowed form _must_ match those for the key type.
479    pub fn invalidate<Q>(&self, key: &Q)
480    where
481        K: Borrow<Q>,
482        Q: Hash + Eq + ?Sized,
483    {
484        let hash = self.inner.hash(key);
485        self.inner
486            .select(hash)
487            .invalidate_with_hash(key, hash, false);
488    }
489
490    /// Discards any cached value for the key and returns a clone of the value.
491    ///
492    /// If you do not need to get the value that has been discarded, use the
493    /// [`invalidate`](#method.invalidate) method instead.
494    ///
495    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
496    /// on the borrowed form _must_ match those for the key type.
497    pub fn remove<Q>(&self, key: &Q) -> Option<V>
498    where
499        K: Borrow<Q>,
500        Q: Hash + Eq + ?Sized,
501    {
502        let hash = self.inner.hash(key);
503        self.inner
504            .select(hash)
505            .invalidate_with_hash(key, hash, true)
506    }
507
508    /// Discards all cached values.
509    ///
510    /// This method returns immediately by just setting the current time as the
511    /// invalidation time. `get` and other retrieval methods are guaranteed not to
512    /// return the entries inserted before or at the invalidation time.
513    ///
514    /// The actual removal of the invalidated entries is done as a maintenance task
515    /// driven by a user thread. For more details, see
516    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
517    /// level documentation.
518    ///
519    /// Like the `invalidate` method, this method does not clear the historic
520    /// popularity estimator of keys so that it retains the client activities of
521    /// trying to retrieve an item.
522    pub fn invalidate_all(&self) {
523        for segment in self.inner.segments.iter() {
524            segment.invalidate_all();
525        }
526    }
527
528    /// Discards cached values that satisfy a predicate.
529    ///
530    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
531    /// closure is called against each cached entry inserted before or at the time
532    /// when this method was called. If the closure returns `true` that entry will be
533    /// evicted from the cache.
534    ///
535    /// This method returns immediately by not actually removing the invalidated
536    /// entries. Instead, it just sets the predicate to the cache with the time when
537    /// this method was called. The actual removal of the invalidated entries is done
538    /// as a maintenance task driven by a user thread. For more details, see
539    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
540    /// level documentation.
541    ///
542    /// Also the `get` and other retrieval methods will apply the closure to a cached
543    /// entry to determine if it should have been invalidated. Therefore, it is
544    /// guaranteed that these methods must not return invalidated values.
545    ///
546    /// Note that you must call
547    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
548    /// at the cache creation time as the cache needs to maintain additional internal
549    /// data structures to support this method. Otherwise, calling this method will
550    /// fail with a
551    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
552    ///
553    /// Like the `invalidate` method, this method does not clear the historic
554    /// popularity estimator of keys so that it retains the client activities of
555    /// trying to retrieve an item.
556    ///
557    /// [support-invalidation-closures]:
558    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
559    /// [invalidation-disabled-error]:
560    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
561    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
562    where
563        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
564    {
565        let pred = Arc::new(predicate);
566        for segment in self.inner.segments.iter() {
567            segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
568        }
569        Ok(())
570    }
571
572    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
573    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
574    /// value.
575    ///
576    /// Iterators do not block concurrent reads and writes on the cache. An entry can
577    /// be inserted to, invalidated or evicted from a cache while iterators are alive
578    /// on the same cache.
579    ///
580    /// Unlike the `get` method, visiting entries via an iterator do not update the
581    /// historic popularity estimator or reset idle timers for keys.
582    ///
583    /// # Guarantees
584    ///
585    /// In order to allow concurrent access to the cache, iterator's `next` method
586    /// does _not_ guarantee the following:
587    ///
588    /// - It does not guarantee to return a key-value pair (an entry) if its key has
589    ///   been inserted to the cache _after_ the iterator was created.
590    ///   - Such an entry may or may not be returned depending on key's hash and
591    ///     timing.
592    ///
593    /// and the `next` method guarantees the followings:
594    ///
595    /// - It guarantees not to return the same entry more than once.
596    /// - It guarantees not to return an entry if it has been removed from the cache
597    ///   after the iterator was created.
598    ///     - Note: An entry can be removed by following reasons:
599    ///         - Manually invalidated.
600    ///         - Expired (e.g. time-to-live).
601    ///         - Evicted as the cache capacity exceeded.
602    ///
603    /// # Examples
604    ///
605    /// ```rust
606    /// use moka::sync::SegmentedCache;
607    ///
608    /// let cache = SegmentedCache::new(100, 4);
609    /// cache.insert("Julia", 14);
610    ///
611    /// let mut iter = cache.iter();
612    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
613    /// assert_eq!(*k, "Julia");
614    /// assert_eq!(v, 14);
615    ///
616    /// assert!(iter.next().is_none());
617    /// ```
618    ///
619    pub fn iter(&self) -> Iter<'_, K, V> {
620        let num_cht_segments = self.inner.segments[0].num_cht_segments();
621        let segments = self
622            .inner
623            .segments
624            .iter()
625            .map(|c| c as &dyn ScanningGet<_, _>)
626            .collect::<Vec<_>>()
627            .into_boxed_slice();
628        Iter::with_multiple_cache_segments(segments, num_cht_segments)
629    }
630
631    /// Performs any pending maintenance operations needed by the cache.
632    pub fn run_pending_tasks(&self) {
633        for segment in self.inner.segments.iter() {
634            segment.run_pending_tasks();
635        }
636    }
637
638    // /// This is used by unit tests to get consistent result.
639    // #[cfg(test)]
640    // pub(crate) fn reconfigure_for_testing(&mut self) {
641    //     // Stop the housekeeping job that may cause sync() method to return earlier.
642    //     for segment in self.inner.segments.iter_mut() {
643    //         segment.reconfigure_for_testing()
644    //     }
645    // }
646}
647
648impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
649where
650    K: Hash + Eq + Send + Sync + 'static,
651    V: Clone + Send + Sync + 'static,
652    S: BuildHasher + Clone + Send + Sync + 'static,
653{
654    type Item = (Arc<K>, V);
655
656    type IntoIter = Iter<'a, K, V>;
657
658    fn into_iter(self) -> Self::IntoIter {
659        self.iter()
660    }
661}
662
663// For unit tests.
664#[cfg(test)]
665impl<K, V, S> SegmentedCache<K, V, S> {
666    fn is_waiter_map_empty(&self) -> bool {
667        self.inner.segments.iter().all(Cache::is_waiter_map_empty)
668    }
669}
670
671#[cfg(test)]
672impl<K, V, S> SegmentedCache<K, V, S>
673where
674    K: Hash + Eq + Send + Sync + 'static,
675    V: Clone + Send + Sync + 'static,
676    S: BuildHasher + Clone + Send + Sync + 'static,
677{
678    fn invalidation_predicate_count(&self) -> usize {
679        self.inner
680            .segments
681            .iter()
682            .map(|seg| seg.invalidation_predicate_count())
683            .sum()
684    }
685
686    fn reconfigure_for_testing(&mut self) {
687        let inner = Arc::get_mut(&mut self.inner)
688            .expect("There are other strong reference to self.inner Arc");
689
690        for segment in inner.segments.iter_mut() {
691            segment.reconfigure_for_testing();
692        }
693    }
694
695    fn key_locks_map_is_empty(&self) -> bool {
696        self.inner
697            .segments
698            .iter()
699            .all(|seg| seg.key_locks_map_is_empty())
700    }
701}
702
703struct Inner<K, V, S> {
704    desired_capacity: Option<u64>,
705    segments: Box<[Cache<K, V, S>]>,
706    build_hasher: S,
707    segment_shift: u32,
708}
709
710impl<K, V, S> Inner<K, V, S>
711where
712    K: Hash + Eq + Send + Sync + 'static,
713    V: Clone + Send + Sync + 'static,
714    S: BuildHasher + Clone + Send + Sync + 'static,
715{
716    /// # Panics
717    ///
718    /// Panics if `num_segments` is 0.
719    #[allow(clippy::too_many_arguments)]
720    fn new(
721        name: Option<String>,
722        max_capacity: Option<u64>,
723        initial_capacity: Option<usize>,
724        num_segments: usize,
725        build_hasher: S,
726        weigher: Option<Weigher<K, V>>,
727        eviction_policy: EvictionPolicy,
728        eviction_listener: Option<EvictionListener<K, V>>,
729        expiration_policy: ExpirationPolicy<K, V>,
730        housekeeper_config: HousekeeperConfig,
731        invalidator_enabled: bool,
732        clock: Clock,
733    ) -> Self {
734        assert!(num_segments > 0);
735
736        let actual_num_segments = num_segments.next_power_of_two();
737        let segment_shift = 64 - actual_num_segments.trailing_zeros();
738        let seg_max_capacity =
739            max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
740        let seg_init_capacity =
741            initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
742        // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]`
743        // because Cache::clone() does not clone its inner but shares the same inner.
744        let segments = (0..actual_num_segments)
745            .map(|_| {
746                Cache::with_everything(
747                    name.clone(),
748                    seg_max_capacity,
749                    seg_init_capacity,
750                    build_hasher.clone(),
751                    weigher.clone(),
752                    eviction_policy.clone(),
753                    eviction_listener.clone(),
754                    expiration_policy.clone(),
755                    housekeeper_config.clone(),
756                    invalidator_enabled,
757                    clock.clone(),
758                )
759            })
760            .collect::<Vec<_>>();
761
762        Self {
763            desired_capacity: max_capacity,
764            segments: segments.into_boxed_slice(),
765            build_hasher,
766            segment_shift,
767        }
768    }
769
770    #[inline]
771    fn hash<Q>(&self, key: &Q) -> u64
772    where
773        K: Borrow<Q>,
774        Q: Hash + Eq + ?Sized,
775    {
776        let mut hasher = self.build_hasher.build_hasher();
777        key.hash(&mut hasher);
778        hasher.finish()
779    }
780
781    #[inline]
782    fn select(&self, hash: u64) -> &Cache<K, V, S> {
783        let index = self.segment_index_from_hash(hash);
784        &self.segments[index]
785    }
786
787    #[inline]
788    fn segment_index_from_hash(&self, hash: u64) -> usize {
789        if self.segment_shift == 64 {
790            0
791        } else {
792            (hash >> self.segment_shift) as usize
793        }
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::SegmentedCache;
800    use crate::notification::RemovalCause;
801    use parking_lot::Mutex;
802    use std::{sync::Arc, time::Duration};
803
804    #[test]
805    fn max_capacity_zero() {
806        let mut cache = SegmentedCache::new(0, 1);
807        cache.reconfigure_for_testing();
808
809        // Make the cache exterior immutable.
810        let cache = cache;
811
812        cache.insert(0, ());
813
814        assert!(!cache.contains_key(&0));
815        assert!(cache.get(&0).is_none());
816        cache.run_pending_tasks();
817        assert!(!cache.contains_key(&0));
818        assert!(cache.get(&0).is_none());
819        assert_eq!(cache.entry_count(), 0)
820    }
821
822    #[test]
823    fn basic_single_thread() {
824        // The following `Vec`s will hold actual and expected notifications.
825        let actual = Arc::new(Mutex::new(Vec::new()));
826        let mut expected = Vec::new();
827
828        // Create an eviction listener.
829        let a1 = Arc::clone(&actual);
830        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
831
832        // Create a cache with the eviction listener.
833        let mut cache = SegmentedCache::builder(1)
834            .max_capacity(3)
835            .eviction_listener(listener)
836            .build();
837        cache.reconfigure_for_testing();
838
839        // Make the cache exterior immutable.
840        let cache = cache;
841
842        cache.insert("a", "alice");
843        cache.insert("b", "bob");
844        assert_eq!(cache.get(&"a"), Some("alice"));
845        assert!(cache.contains_key(&"a"));
846        assert!(cache.contains_key(&"b"));
847        assert_eq!(cache.get(&"b"), Some("bob"));
848        cache.run_pending_tasks();
849        // counts: a -> 1, b -> 1
850
851        cache.insert("c", "cindy");
852        assert_eq!(cache.get(&"c"), Some("cindy"));
853        assert!(cache.contains_key(&"c"));
854        // counts: a -> 1, b -> 1, c -> 1
855        cache.run_pending_tasks();
856
857        assert!(cache.contains_key(&"a"));
858        assert_eq!(cache.get(&"a"), Some("alice"));
859        assert_eq!(cache.get(&"b"), Some("bob"));
860        assert!(cache.contains_key(&"b"));
861        cache.run_pending_tasks();
862        // counts: a -> 2, b -> 2, c -> 1
863
864        // "d" should not be admitted because its frequency is too low.
865        cache.insert("d", "david"); //   count: d -> 0
866        expected.push((Arc::new("d"), "david", RemovalCause::Size));
867        cache.run_pending_tasks();
868        assert_eq!(cache.get(&"d"), None); //   d -> 1
869        assert!(!cache.contains_key(&"d"));
870
871        cache.insert("d", "david");
872        expected.push((Arc::new("d"), "david", RemovalCause::Size));
873        cache.run_pending_tasks();
874        assert!(!cache.contains_key(&"d"));
875        assert_eq!(cache.get(&"d"), None); //   d -> 2
876
877        // "d" should be admitted and "c" should be evicted
878        // because d's frequency is higher than c's.
879        cache.insert("d", "dennis");
880        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
881        cache.run_pending_tasks();
882        assert_eq!(cache.get(&"a"), Some("alice"));
883        assert_eq!(cache.get(&"b"), Some("bob"));
884        assert_eq!(cache.get(&"c"), None);
885        assert_eq!(cache.get(&"d"), Some("dennis"));
886        assert!(cache.contains_key(&"a"));
887        assert!(cache.contains_key(&"b"));
888        assert!(!cache.contains_key(&"c"));
889        assert!(cache.contains_key(&"d"));
890
891        cache.invalidate(&"b");
892        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
893        cache.run_pending_tasks();
894        assert_eq!(cache.get(&"b"), None);
895        assert!(!cache.contains_key(&"b"));
896
897        assert!(cache.remove(&"b").is_none());
898        assert_eq!(cache.remove(&"d"), Some("dennis"));
899        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
900        cache.run_pending_tasks();
901        assert_eq!(cache.get(&"d"), None);
902        assert!(!cache.contains_key(&"d"));
903
904        verify_notification_vec(&cache, actual, &expected);
905        assert!(cache.key_locks_map_is_empty());
906    }
907
908    #[test]
909    fn non_power_of_two_segments() {
910        let mut cache = SegmentedCache::new(100, 5);
911        cache.reconfigure_for_testing();
912
913        // Make the cache exterior immutable.
914        let cache = cache;
915
916        assert_eq!(cache.iter().count(), 0);
917
918        cache.insert("a", "alice");
919        cache.insert("b", "bob");
920        cache.insert("c", "cindy");
921
922        assert_eq!(cache.iter().count(), 3);
923        cache.run_pending_tasks();
924        assert_eq!(cache.iter().count(), 3);
925    }
926
927    #[test]
928    fn size_aware_eviction() {
929        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
930
931        let alice = ("alice", 10);
932        let bob = ("bob", 15);
933        let bill = ("bill", 20);
934        let cindy = ("cindy", 5);
935        let david = ("david", 15);
936        let dennis = ("dennis", 15);
937
938        // The following `Vec`s will hold actual and expected notifications.
939        let actual = Arc::new(Mutex::new(Vec::new()));
940        let mut expected = Vec::new();
941
942        // Create an eviction listener.
943        let a1 = Arc::clone(&actual);
944        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
945
946        // Create a cache with the eviction listener.
947        let mut cache = SegmentedCache::builder(1)
948            .max_capacity(31)
949            .weigher(weigher)
950            .eviction_listener(listener)
951            .build();
952        cache.reconfigure_for_testing();
953
954        // Make the cache exterior immutable.
955        let cache = cache;
956
957        cache.insert("a", alice);
958        cache.insert("b", bob);
959        assert_eq!(cache.get(&"a"), Some(alice));
960        assert!(cache.contains_key(&"a"));
961        assert!(cache.contains_key(&"b"));
962        assert_eq!(cache.get(&"b"), Some(bob));
963        cache.run_pending_tasks();
964        // order (LRU -> MRU) and counts: a -> 1, b -> 1
965
966        cache.insert("c", cindy);
967        assert_eq!(cache.get(&"c"), Some(cindy));
968        assert!(cache.contains_key(&"c"));
969        // order and counts: a -> 1, b -> 1, c -> 1
970        cache.run_pending_tasks();
971
972        assert!(cache.contains_key(&"a"));
973        assert_eq!(cache.get(&"a"), Some(alice));
974        assert_eq!(cache.get(&"b"), Some(bob));
975        assert!(cache.contains_key(&"b"));
976        cache.run_pending_tasks();
977        // order and counts: c -> 1, a -> 2, b -> 2
978
979        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
980        // "d" must have higher count than 3, which is the aggregated count
981        // of "a" and "c".
982        cache.insert("d", david); //   count: d -> 0
983        expected.push((Arc::new("d"), david, RemovalCause::Size));
984        cache.run_pending_tasks();
985        assert_eq!(cache.get(&"d"), None); //   d -> 1
986        assert!(!cache.contains_key(&"d"));
987
988        cache.insert("d", david);
989        expected.push((Arc::new("d"), david, RemovalCause::Size));
990        cache.run_pending_tasks();
991        assert!(!cache.contains_key(&"d"));
992        assert_eq!(cache.get(&"d"), None); //   d -> 2
993
994        cache.insert("d", david);
995        expected.push((Arc::new("d"), david, RemovalCause::Size));
996        cache.run_pending_tasks();
997        assert_eq!(cache.get(&"d"), None); //   d -> 3
998        assert!(!cache.contains_key(&"d"));
999
1000        cache.insert("d", david);
1001        expected.push((Arc::new("d"), david, RemovalCause::Size));
1002        cache.run_pending_tasks();
1003        assert!(!cache.contains_key(&"d"));
1004        assert_eq!(cache.get(&"d"), None); //   d -> 4
1005
1006        // Finally "d" should be admitted by evicting "c" and "a".
1007        cache.insert("d", dennis);
1008        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
1009        expected.push((Arc::new("a"), alice, RemovalCause::Size));
1010        cache.run_pending_tasks();
1011        assert_eq!(cache.get(&"a"), None);
1012        assert_eq!(cache.get(&"b"), Some(bob));
1013        assert_eq!(cache.get(&"c"), None);
1014        assert_eq!(cache.get(&"d"), Some(dennis));
1015        assert!(!cache.contains_key(&"a"));
1016        assert!(cache.contains_key(&"b"));
1017        assert!(!cache.contains_key(&"c"));
1018        assert!(cache.contains_key(&"d"));
1019
1020        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
1021        cache.insert("b", bill);
1022        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1023        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1024        cache.run_pending_tasks();
1025        assert_eq!(cache.get(&"b"), Some(bill));
1026        assert_eq!(cache.get(&"d"), None);
1027        assert!(cache.contains_key(&"b"));
1028        assert!(!cache.contains_key(&"d"));
1029
1030        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
1031        cache.insert("a", alice);
1032        cache.insert("b", bob);
1033        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1034        cache.run_pending_tasks();
1035        assert_eq!(cache.get(&"a"), Some(alice));
1036        assert_eq!(cache.get(&"b"), Some(bob));
1037        assert_eq!(cache.get(&"d"), None);
1038        assert!(cache.contains_key(&"a"));
1039        assert!(cache.contains_key(&"b"));
1040        assert!(!cache.contains_key(&"d"));
1041
1042        // Verify the sizes.
1043        assert_eq!(cache.entry_count(), 2);
1044        assert_eq!(cache.weighted_size(), 25);
1045
1046        verify_notification_vec(&cache, actual, &expected);
1047        assert!(cache.key_locks_map_is_empty());
1048    }
1049
1050    #[test]
1051    fn basic_multi_threads() {
1052        let num_threads = 4;
1053
1054        let mut cache = SegmentedCache::new(100, num_threads);
1055        cache.reconfigure_for_testing();
1056
1057        // Make the cache exterior immutable.
1058        let cache = cache;
1059
1060        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1061        #[allow(clippy::needless_collect)]
1062        let handles = (0..num_threads)
1063            .map(|id| {
1064                let cache = cache.clone();
1065                std::thread::spawn(move || {
1066                    cache.insert(10, format!("{id}-100"));
1067                    cache.get(&10);
1068                    cache.run_pending_tasks();
1069                    cache.insert(20, format!("{id}-200"));
1070                    cache.invalidate(&10);
1071                })
1072            })
1073            .collect::<Vec<_>>();
1074
1075        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1076
1077        cache.run_pending_tasks();
1078
1079        assert!(cache.get(&10).is_none());
1080        assert!(cache.get(&20).is_some());
1081        assert!(!cache.contains_key(&10));
1082        assert!(cache.contains_key(&20));
1083    }
1084
1085    #[test]
1086    fn invalidate_all() {
1087        use std::collections::HashMap;
1088
1089        // The following `HashMap`s will hold actual and expected notifications.
1090        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1091        let actual = Arc::new(Mutex::new(HashMap::new()));
1092        let mut expected = HashMap::new();
1093
1094        // Create an eviction listener.
1095        let a1 = Arc::clone(&actual);
1096        let listener = move |k, v, cause| {
1097            a1.lock().insert(k, (v, cause));
1098        };
1099
1100        // Create a cache with the eviction listener.
1101        let mut cache = SegmentedCache::builder(4)
1102            .max_capacity(100)
1103            .eviction_listener(listener)
1104            .build();
1105        cache.reconfigure_for_testing();
1106
1107        // Make the cache exterior immutable.
1108        let cache = cache;
1109
1110        cache.insert("a", "alice");
1111        cache.insert("b", "bob");
1112        cache.insert("c", "cindy");
1113        assert_eq!(cache.get(&"a"), Some("alice"));
1114        assert_eq!(cache.get(&"b"), Some("bob"));
1115        assert_eq!(cache.get(&"c"), Some("cindy"));
1116        assert!(cache.contains_key(&"a"));
1117        assert!(cache.contains_key(&"b"));
1118        assert!(cache.contains_key(&"c"));
1119
1120        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
1121        // modified timestamp of the entries were updated when they were inserted.
1122        // https://github.com/moka-rs/moka/issues/155
1123
1124        cache.invalidate_all();
1125        expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1126        expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1127        expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1128        cache.run_pending_tasks();
1129
1130        cache.insert("d", "david");
1131        cache.run_pending_tasks();
1132
1133        assert!(cache.get(&"a").is_none());
1134        assert!(cache.get(&"b").is_none());
1135        assert!(cache.get(&"c").is_none());
1136        assert_eq!(cache.get(&"d"), Some("david"));
1137        assert!(!cache.contains_key(&"a"));
1138        assert!(!cache.contains_key(&"b"));
1139        assert!(!cache.contains_key(&"c"));
1140        assert!(cache.contains_key(&"d"));
1141
1142        verify_notification_map(&cache, actual, &expected);
1143    }
1144
1145    #[test]
1146    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1147        use std::collections::{HashMap, HashSet};
1148
1149        const SEGMENTS: usize = 4;
1150
1151        // The following `HashMap`s will hold actual and expected notifications.
1152        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1153        let actual = Arc::new(Mutex::new(HashMap::new()));
1154        let mut expected = HashMap::new();
1155
1156        // Create an eviction listener.
1157        let a1 = Arc::clone(&actual);
1158        let listener = move |k, v, cause| {
1159            a1.lock().insert(k, (v, cause));
1160        };
1161
1162        let (clock, mock) = crate::common::time::Clock::mock();
1163
1164        // Create a cache with the eviction listener.
1165        let mut cache = SegmentedCache::builder(SEGMENTS)
1166            .max_capacity(100)
1167            .support_invalidation_closures()
1168            .eviction_listener(listener)
1169            .clock(clock)
1170            .build();
1171        cache.reconfigure_for_testing();
1172
1173        // Make the cache exterior immutable.
1174        let cache = cache;
1175
1176        cache.insert(0, "alice");
1177        cache.insert(1, "bob");
1178        cache.insert(2, "alex");
1179        cache.run_pending_tasks();
1180        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
1181        cache.run_pending_tasks();
1182
1183        assert_eq!(cache.get(&0), Some("alice"));
1184        assert_eq!(cache.get(&1), Some("bob"));
1185        assert_eq!(cache.get(&2), Some("alex"));
1186        assert!(cache.contains_key(&0));
1187        assert!(cache.contains_key(&1));
1188        assert!(cache.contains_key(&2));
1189
1190        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1191        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1192        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1193        expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1194        expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1195
1196        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
1197
1198        cache.insert(3, "alice");
1199
1200        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1201        cache.run_pending_tasks(); // To submit the invalidation task.
1202        std::thread::sleep(Duration::from_millis(200));
1203        cache.run_pending_tasks(); // To process the task result.
1204        std::thread::sleep(Duration::from_millis(200));
1205
1206        assert!(cache.get(&0).is_none());
1207        assert!(cache.get(&2).is_none());
1208        assert_eq!(cache.get(&1), Some("bob"));
1209        // This should survive as it was inserted after calling invalidate_entries_if.
1210        assert_eq!(cache.get(&3), Some("alice"));
1211
1212        assert!(!cache.contains_key(&0));
1213        assert!(cache.contains_key(&1));
1214        assert!(!cache.contains_key(&2));
1215        assert!(cache.contains_key(&3));
1216
1217        assert_eq!(cache.entry_count(), 2);
1218        assert_eq!(cache.invalidation_predicate_count(), 0);
1219
1220        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
1221
1222        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1223        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1224        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1225        expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1226        expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1227
1228        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1229        cache.run_pending_tasks(); // To submit the invalidation task.
1230        std::thread::sleep(Duration::from_millis(200));
1231        cache.run_pending_tasks(); // To process the task result.
1232        std::thread::sleep(Duration::from_millis(200));
1233
1234        assert!(cache.get(&1).is_none());
1235        assert!(cache.get(&3).is_none());
1236
1237        assert!(!cache.contains_key(&1));
1238        assert!(!cache.contains_key(&3));
1239
1240        assert_eq!(cache.entry_count(), 0);
1241        assert_eq!(cache.invalidation_predicate_count(), 0);
1242
1243        verify_notification_map(&cache, actual, &expected);
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn test_iter() {
1250        const NUM_KEYS: usize = 50;
1251
1252        fn make_value(key: usize) -> String {
1253            format!("val: {key}")
1254        }
1255
1256        // let cache = SegmentedCache::builder(5)
1257        let cache = SegmentedCache::builder(4)
1258            .max_capacity(100)
1259            .time_to_idle(Duration::from_secs(10))
1260            .build();
1261
1262        for key in 0..NUM_KEYS {
1263            cache.insert(key, make_value(key));
1264        }
1265
1266        let mut key_set = std::collections::HashSet::new();
1267
1268        for (key, value) in &cache {
1269            assert_eq!(value, make_value(*key));
1270
1271            key_set.insert(*key);
1272        }
1273
1274        // Ensure there are no missing or duplicate keys in the iteration.
1275        assert_eq!(key_set.len(), NUM_KEYS);
1276    }
1277
1278    /// Runs 16 threads at the same time and ensures no deadlock occurs.
1279    ///
1280    /// - Eight of the threads will update key-values in the cache.
1281    /// - Eight others will iterate the cache.
1282    ///
1283    #[test]
1284    fn test_iter_multi_threads() {
1285        use std::collections::HashSet;
1286
1287        const NUM_KEYS: usize = 1024;
1288        const NUM_THREADS: usize = 16;
1289
1290        fn make_value(key: usize) -> String {
1291            format!("val: {key}")
1292        }
1293
1294        let cache = SegmentedCache::builder(4)
1295            .max_capacity(2048)
1296            .time_to_idle(Duration::from_secs(10))
1297            .build();
1298
1299        // Initialize the cache.
1300        for key in 0..NUM_KEYS {
1301            cache.insert(key, make_value(key));
1302        }
1303
1304        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1305        let write_lock = rw_lock.write().unwrap();
1306
1307        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1308        #[allow(clippy::needless_collect)]
1309        let handles = (0..NUM_THREADS)
1310            .map(|n| {
1311                let cache = cache.clone();
1312                let rw_lock = Arc::clone(&rw_lock);
1313
1314                if n % 2 == 0 {
1315                    // This thread will update the cache.
1316                    std::thread::spawn(move || {
1317                        let read_lock = rw_lock.read().unwrap();
1318                        for key in 0..NUM_KEYS {
1319                            // TODO: Update keys in a random order?
1320                            cache.insert(key, make_value(key));
1321                        }
1322                        std::mem::drop(read_lock);
1323                    })
1324                } else {
1325                    // This thread will iterate the cache.
1326                    std::thread::spawn(move || {
1327                        let read_lock = rw_lock.read().unwrap();
1328                        let mut key_set = HashSet::new();
1329                        for (key, value) in &cache {
1330                            assert_eq!(value, make_value(*key));
1331                            key_set.insert(*key);
1332                        }
1333                        // Ensure there are no missing or duplicate keys in the iteration.
1334                        assert_eq!(key_set.len(), NUM_KEYS);
1335                        std::mem::drop(read_lock);
1336                    })
1337                }
1338            })
1339            .collect::<Vec<_>>();
1340
1341        // Let these threads to run by releasing the write lock.
1342        std::mem::drop(write_lock);
1343
1344        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1345
1346        // Ensure there are no missing or duplicate keys in the iteration.
1347        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1348        assert_eq!(key_set.len(), NUM_KEYS);
1349    }
1350
1351    #[test]
1352    fn get_with() {
1353        use std::thread::{sleep, spawn};
1354
1355        let cache = SegmentedCache::new(100, 4);
1356        const KEY: u32 = 0;
1357
1358        // This test will run five threads:
1359        //
1360        // Thread1 will be the first thread to call `get_with` for a key, so its init
1361        // closure will be evaluated and then a &str value "thread1" will be inserted
1362        // to the cache.
1363        let thread1 = {
1364            let cache1 = cache.clone();
1365            spawn(move || {
1366                // Call `get_with` immediately.
1367                let v = cache1.get_with(KEY, || {
1368                    // Wait for 300 ms and return a &str value.
1369                    sleep(Duration::from_millis(300));
1370                    "thread1"
1371                });
1372                assert_eq!(v, "thread1");
1373            })
1374        };
1375
1376        // Thread2 will be the second thread to call `get_with` for the same key, so
1377        // its init closure will not be evaluated. Once thread1's init closure
1378        // finishes, it will get the value inserted by thread1's init closure.
1379        let thread2 = {
1380            let cache2 = cache.clone();
1381            spawn(move || {
1382                // Wait for 100 ms before calling `get_with`.
1383                sleep(Duration::from_millis(100));
1384                let v = cache2.get_with(KEY, || unreachable!());
1385                assert_eq!(v, "thread1");
1386            })
1387        };
1388
1389        // Thread3 will be the third thread to call `get_with` for the same key. By
1390        // the time it calls, thread1's init closure should have finished already and
1391        // the value should be already inserted to the cache. So its init closure
1392        // will not be evaluated and will get the value insert by thread1's init
1393        // closure immediately.
1394        let thread3 = {
1395            let cache3 = cache.clone();
1396            spawn(move || {
1397                // Wait for 400 ms before calling `get_with`.
1398                sleep(Duration::from_millis(400));
1399                let v = cache3.get_with(KEY, || unreachable!());
1400                assert_eq!(v, "thread1");
1401            })
1402        };
1403
1404        // Thread4 will call `get` for the same key. It will call when thread1's init
1405        // closure is still running, so it will get none for the key.
1406        let thread4 = {
1407            let cache4 = cache.clone();
1408            spawn(move || {
1409                // Wait for 200 ms before calling `get`.
1410                sleep(Duration::from_millis(200));
1411                let maybe_v = cache4.get(&KEY);
1412                assert!(maybe_v.is_none());
1413            })
1414        };
1415
1416        // Thread5 will call `get` for the same key. It will call after thread1's init
1417        // closure finished, so it will get the value insert by thread1's init closure.
1418        let thread5 = {
1419            let cache5 = cache.clone();
1420            spawn(move || {
1421                // Wait for 400 ms before calling `get`.
1422                sleep(Duration::from_millis(400));
1423                let maybe_v = cache5.get(&KEY);
1424                assert_eq!(maybe_v, Some("thread1"));
1425            })
1426        };
1427
1428        for t in [thread1, thread2, thread3, thread4, thread5] {
1429            t.join().expect("Failed to join");
1430        }
1431
1432        assert!(cache.is_waiter_map_empty());
1433    }
1434
1435    #[test]
1436    fn get_with_if() {
1437        use std::thread::{sleep, spawn};
1438
1439        let cache = SegmentedCache::new(100, 4);
1440        const KEY: u32 = 0;
1441
1442        // This test will run seven threads:
1443        //
1444        // Thread1 will be the first thread to call `get_with_if` for a key, so its
1445        // init closure will be evaluated and then a &str value "thread1" will be
1446        // inserted to the cache.
1447        let thread1 = {
1448            let cache1 = cache.clone();
1449            spawn(move || {
1450                // Call `get_with` immediately.
1451                let v = cache1.get_with_if(
1452                    KEY,
1453                    || {
1454                        // Wait for 300 ms and return a &str value.
1455                        sleep(Duration::from_millis(300));
1456                        "thread1"
1457                    },
1458                    |_v| unreachable!(),
1459                );
1460                assert_eq!(v, "thread1");
1461            })
1462        };
1463
1464        // Thread2 will be the second thread to call `get_with_if` for the same key,
1465        // so its init closure will not be evaluated. Once thread1's init closure
1466        // finishes, it will get the value inserted by thread1's init closure.
1467        let thread2 = {
1468            let cache2 = cache.clone();
1469            spawn(move || {
1470                // Wait for 100 ms before calling `get_with`.
1471                sleep(Duration::from_millis(100));
1472                let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1473                assert_eq!(v, "thread1");
1474            })
1475        };
1476
1477        // Thread3 will be the third thread to call `get_with_if` for the same
1478        // key. By the time it calls, thread1's init closure should have finished
1479        // already and the value should be already inserted to the cache. Also
1480        // thread3's `replace_if` closure returns `false`. So its init closure will
1481        // not be evaluated and will get the value inserted by thread1's init closure
1482        // immediately.
1483        let thread3 = {
1484            let cache3 = cache.clone();
1485            spawn(move || {
1486                // Wait for 350 ms before calling `get_with_if`.
1487                sleep(Duration::from_millis(350));
1488                let v = cache3.get_with_if(
1489                    KEY,
1490                    || unreachable!(),
1491                    |v| {
1492                        assert_eq!(v, &"thread1");
1493                        false
1494                    },
1495                );
1496                assert_eq!(v, "thread1");
1497            })
1498        };
1499
1500        // Thread4 will be the fourth thread to call `get_with_if` for the same
1501        // key. The value should have been already inserted to the cache by
1502        // thread1. However thread4's `replace_if` closure returns `true`. So its
1503        // init closure will be evaluated to replace the current value.
1504        let thread4 = {
1505            let cache4 = cache.clone();
1506            spawn(move || {
1507                // Wait for 400 ms before calling `get_with_if`.
1508                sleep(Duration::from_millis(400));
1509                let v = cache4.get_with_if(
1510                    KEY,
1511                    || "thread4",
1512                    |v| {
1513                        assert_eq!(v, &"thread1");
1514                        true
1515                    },
1516                );
1517                assert_eq!(v, "thread4");
1518            })
1519        };
1520
1521        // Thread5 will call `get` for the same key. It will call when thread1's init
1522        // closure is still running, so it will get none for the key.
1523        let thread5 = {
1524            let cache5 = cache.clone();
1525            spawn(move || {
1526                // Wait for 200 ms before calling `get`.
1527                sleep(Duration::from_millis(200));
1528                let maybe_v = cache5.get(&KEY);
1529                assert!(maybe_v.is_none());
1530            })
1531        };
1532
1533        // Thread6 will call `get` for the same key. It will call when thread1's init
1534        // closure is still running, so it will get none for the key.
1535        let thread6 = {
1536            let cache6 = cache.clone();
1537            spawn(move || {
1538                // Wait for 200 ms before calling `get`.
1539                sleep(Duration::from_millis(350));
1540                let maybe_v = cache6.get(&KEY);
1541                assert_eq!(maybe_v, Some("thread1"));
1542            })
1543        };
1544
1545        // Thread7 will call `get` for the same key. It will call after thread1's init
1546        // closure finished, so it will get the value insert by thread1's init closure.
1547        let thread7 = {
1548            let cache7 = cache.clone();
1549            spawn(move || {
1550                // Wait for 400 ms before calling `get`.
1551                sleep(Duration::from_millis(450));
1552                let maybe_v = cache7.get(&KEY);
1553                assert_eq!(maybe_v, Some("thread4"));
1554            })
1555        };
1556
1557        for t in [
1558            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1559        ] {
1560            t.join().expect("Failed to join");
1561        }
1562
1563        assert!(cache.is_waiter_map_empty());
1564    }
1565
1566    #[test]
1567    fn try_get_with() {
1568        use std::{
1569            sync::Arc,
1570            thread::{sleep, spawn},
1571        };
1572
1573        #[derive(thiserror::Error, Debug)]
1574        #[error("{}", _0)]
1575        pub struct MyError(String);
1576
1577        type MyResult<T> = Result<T, Arc<MyError>>;
1578
1579        let cache = SegmentedCache::new(100, 4);
1580        const KEY: u32 = 0;
1581
1582        // This test will run eight threads:
1583        //
1584        // Thread1 will be the first thread to call `try_get_with` for a key, so its
1585        // init closure will be evaluated and then an error will be returned. Nothing
1586        // will be inserted to the cache.
1587        let thread1 = {
1588            let cache1 = cache.clone();
1589            spawn(move || {
1590                // Call `try_get_with` immediately.
1591                let v = cache1.try_get_with(KEY, || {
1592                    // Wait for 300 ms and return an error.
1593                    sleep(Duration::from_millis(300));
1594                    Err(MyError("thread1 error".into()))
1595                });
1596                assert!(v.is_err());
1597            })
1598        };
1599
1600        // Thread2 will be the second thread to call `try_get_with` for the same key,
1601        // so its init closure will not be evaluated. Once thread1's init closure
1602        // finishes, it will get the same error value returned by thread1's init
1603        // closure.
1604        let thread2 = {
1605            let cache2 = cache.clone();
1606            spawn(move || {
1607                // Wait for 100 ms before calling `try_get_with`.
1608                sleep(Duration::from_millis(100));
1609                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1610                assert!(v.is_err());
1611            })
1612        };
1613
1614        // Thread3 will be the third thread to call `get_with` for the same key. By
1615        // the time it calls, thread1's init closure should have finished already,
1616        // but the key still does not exist in the cache. So its init closure will be
1617        // evaluated and then an okay &str value will be returned. That value will be
1618        // inserted to the cache.
1619        let thread3 = {
1620            let cache3 = cache.clone();
1621            spawn(move || {
1622                // Wait for 400 ms before calling `try_get_with`.
1623                sleep(Duration::from_millis(400));
1624                let v: MyResult<_> = cache3.try_get_with(KEY, || {
1625                    // Wait for 300 ms and return an Ok(&str) value.
1626                    sleep(Duration::from_millis(300));
1627                    Ok("thread3")
1628                });
1629                assert_eq!(v.unwrap(), "thread3");
1630            })
1631        };
1632
1633        // thread4 will be the fourth thread to call `try_get_with` for the same
1634        // key. So its init closure will not be evaluated. Once thread3's init
1635        // closure finishes, it will get the same okay &str value.
1636        let thread4 = {
1637            let cache4 = cache.clone();
1638            spawn(move || {
1639                // Wait for 500 ms before calling `try_get_with`.
1640                sleep(Duration::from_millis(500));
1641                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1642                assert_eq!(v.unwrap(), "thread3");
1643            })
1644        };
1645
1646        // Thread5 will be the fifth thread to call `try_get_with` for the same
1647        // key. So its init closure will not be evaluated. By the time it calls,
1648        // thread3's init closure should have finished already, so its init closure
1649        // will not be evaluated and will get the value insert by thread3's init
1650        // closure immediately.
1651        let thread5 = {
1652            let cache5 = cache.clone();
1653            spawn(move || {
1654                // Wait for 800 ms before calling `try_get_with`.
1655                sleep(Duration::from_millis(800));
1656                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1657                assert_eq!(v.unwrap(), "thread3");
1658            })
1659        };
1660
1661        // Thread6 will call `get` for the same key. It will call when thread1's init
1662        // closure is still running, so it will get none for the key.
1663        let thread6 = {
1664            let cache6 = cache.clone();
1665            spawn(move || {
1666                // Wait for 200 ms before calling `get`.
1667                sleep(Duration::from_millis(200));
1668                let maybe_v = cache6.get(&KEY);
1669                assert!(maybe_v.is_none());
1670            })
1671        };
1672
1673        // Thread7 will call `get` for the same key. It will call after thread1's init
1674        // closure finished with an error. So it will get none for the key.
1675        let thread7 = {
1676            let cache7 = cache.clone();
1677            spawn(move || {
1678                // Wait for 400 ms before calling `get`.
1679                sleep(Duration::from_millis(400));
1680                let maybe_v = cache7.get(&KEY);
1681                assert!(maybe_v.is_none());
1682            })
1683        };
1684
1685        // Thread8 will call `get` for the same key. It will call after thread3's init
1686        // closure finished, so it will get the value insert by thread3's init closure.
1687        let thread8 = {
1688            let cache8 = cache.clone();
1689            spawn(move || {
1690                // Wait for 800 ms before calling `get`.
1691                sleep(Duration::from_millis(800));
1692                let maybe_v = cache8.get(&KEY);
1693                assert_eq!(maybe_v, Some("thread3"));
1694            })
1695        };
1696
1697        for t in [
1698            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1699        ] {
1700            t.join().expect("Failed to join");
1701        }
1702
1703        assert!(cache.is_waiter_map_empty());
1704    }
1705
1706    #[test]
1707    fn optionally_get_with() {
1708        use std::thread::{sleep, spawn};
1709
1710        let cache = SegmentedCache::new(100, 4);
1711        const KEY: u32 = 0;
1712
1713        // This test will run eight threads:
1714        //
1715        // Thread1 will be the first thread to call `optionally_get_with` for a key, so its
1716        // init closure will be evaluated and then an error will be returned. Nothing
1717        // will be inserted to the cache.
1718        let thread1 = {
1719            let cache1 = cache.clone();
1720            spawn(move || {
1721                // Call `optionally_get_with` immediately.
1722                let v = cache1.optionally_get_with(KEY, || {
1723                    // Wait for 300 ms and return an error.
1724                    sleep(Duration::from_millis(300));
1725                    None
1726                });
1727                assert!(v.is_none());
1728            })
1729        };
1730
1731        // Thread2 will be the second thread to call `optionally_get_with` for the same key,
1732        // so its init closure will not be evaluated. Once thread1's init closure
1733        // finishes, it will get the same error value returned by thread1's init
1734        // closure.
1735        let thread2 = {
1736            let cache2 = cache.clone();
1737            spawn(move || {
1738                // Wait for 100 ms before calling `optionally_get_with`.
1739                sleep(Duration::from_millis(100));
1740                let v = cache2.optionally_get_with(KEY, || unreachable!());
1741                assert!(v.is_none());
1742            })
1743        };
1744
1745        // Thread3 will be the third thread to call `get_with` for the same key. By
1746        // the time it calls, thread1's init closure should have finished already,
1747        // but the key still does not exist in the cache. So its init closure will be
1748        // evaluated and then an okay &str value will be returned. That value will be
1749        // inserted to the cache.
1750        let thread3 = {
1751            let cache3 = cache.clone();
1752            spawn(move || {
1753                // Wait for 400 ms before calling `optionally_get_with`.
1754                sleep(Duration::from_millis(400));
1755                let v = cache3.optionally_get_with(KEY, || {
1756                    // Wait for 300 ms and return an Ok(&str) value.
1757                    sleep(Duration::from_millis(300));
1758                    Some("thread3")
1759                });
1760                assert_eq!(v.unwrap(), "thread3");
1761            })
1762        };
1763
1764        // thread4 will be the fourth thread to call `optionally_get_with` for the same
1765        // key. So its init closure will not be evaluated. Once thread3's init
1766        // closure finishes, it will get the same okay &str value.
1767        let thread4 = {
1768            let cache4 = cache.clone();
1769            spawn(move || {
1770                // Wait for 500 ms before calling `optionally_get_with`.
1771                sleep(Duration::from_millis(500));
1772                let v = cache4.optionally_get_with(KEY, || unreachable!());
1773                assert_eq!(v.unwrap(), "thread3");
1774            })
1775        };
1776
1777        // Thread5 will be the fifth thread to call `optionally_get_with` for the same
1778        // key. So its init closure will not be evaluated. By the time it calls,
1779        // thread3's init closure should have finished already, so its init closure
1780        // will not be evaluated and will get the value insert by thread3's init
1781        // closure immediately.
1782        let thread5 = {
1783            let cache5 = cache.clone();
1784            spawn(move || {
1785                // Wait for 800 ms before calling `optionally_get_with`.
1786                sleep(Duration::from_millis(800));
1787                let v = cache5.optionally_get_with(KEY, || unreachable!());
1788                assert_eq!(v.unwrap(), "thread3");
1789            })
1790        };
1791
1792        // Thread6 will call `get` for the same key. It will call when thread1's init
1793        // closure is still running, so it will get none for the key.
1794        let thread6 = {
1795            let cache6 = cache.clone();
1796            spawn(move || {
1797                // Wait for 200 ms before calling `get`.
1798                sleep(Duration::from_millis(200));
1799                let maybe_v = cache6.get(&KEY);
1800                assert!(maybe_v.is_none());
1801            })
1802        };
1803
1804        // Thread7 will call `get` for the same key. It will call after thread1's init
1805        // closure finished with an error. So it will get none for the key.
1806        let thread7 = {
1807            let cache7 = cache.clone();
1808            spawn(move || {
1809                // Wait for 400 ms before calling `get`.
1810                sleep(Duration::from_millis(400));
1811                let maybe_v = cache7.get(&KEY);
1812                assert!(maybe_v.is_none());
1813            })
1814        };
1815
1816        // Thread8 will call `get` for the same key. It will call after thread3's init
1817        // closure finished, so it will get the value insert by thread3's init closure.
1818        let thread8 = {
1819            let cache8 = cache.clone();
1820            spawn(move || {
1821                // Wait for 800 ms before calling `get`.
1822                sleep(Duration::from_millis(800));
1823                let maybe_v = cache8.get(&KEY);
1824                assert_eq!(maybe_v, Some("thread3"));
1825            })
1826        };
1827
1828        for t in [
1829            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1830        ] {
1831            t.join().expect("Failed to join");
1832        }
1833
1834        assert!(cache.is_waiter_map_empty());
1835    }
1836
1837    // This test ensures that the `contains_key`, `get` and `invalidate` can use
1838    // borrowed form `&[u8]` for key with type `Vec<u8>`.
1839    // https://github.com/moka-rs/moka/issues/166
1840    #[test]
1841    fn borrowed_forms_of_key() {
1842        let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1843
1844        let key = vec![1_u8];
1845        cache.insert(key.clone(), ());
1846
1847        // key as &Vec<u8>
1848        let key_v: &Vec<u8> = &key;
1849        assert!(cache.contains_key(key_v));
1850        assert_eq!(cache.get(key_v), Some(()));
1851        cache.invalidate(key_v);
1852
1853        cache.insert(key, ());
1854
1855        // key as &[u8]
1856        let key_s: &[u8] = &[1_u8];
1857        assert!(cache.contains_key(key_s));
1858        assert_eq!(cache.get(key_s), Some(()));
1859        cache.invalidate(key_s);
1860    }
1861
1862    // Ignored by default. This test becomes unstable when run in parallel with
1863    // other tests.
1864    #[test]
1865    #[ignore]
1866    fn drop_value_immediately_after_eviction() {
1867        use crate::common::test_utils::{Counters, Value};
1868
1869        const NUM_SEGMENTS: usize = 1;
1870        const MAX_CAPACITY: u32 = 500;
1871        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1872
1873        let counters = Arc::new(Counters::default());
1874        let counters1 = Arc::clone(&counters);
1875
1876        let listener = move |_k, _v, cause| match cause {
1877            RemovalCause::Size => counters1.incl_evicted(),
1878            RemovalCause::Explicit => counters1.incl_invalidated(),
1879            _ => (),
1880        };
1881
1882        let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1883            .max_capacity(MAX_CAPACITY as u64)
1884            .eviction_listener(listener)
1885            .build();
1886        cache.reconfigure_for_testing();
1887
1888        // Make the cache exterior immutable.
1889        let cache = cache;
1890
1891        for key in 0..KEYS {
1892            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1893            cache.insert(key, value);
1894            counters.incl_inserted();
1895            cache.run_pending_tasks();
1896        }
1897
1898        let eviction_count = KEYS - MAX_CAPACITY;
1899
1900        cache.run_pending_tasks();
1901        assert_eq!(counters.inserted(), KEYS, "inserted");
1902        assert_eq!(counters.value_created(), KEYS, "value_created");
1903        assert_eq!(counters.evicted(), eviction_count, "evicted");
1904        assert_eq!(counters.invalidated(), 0, "invalidated");
1905        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1906
1907        for key in 0..KEYS {
1908            cache.invalidate(&key);
1909            cache.run_pending_tasks();
1910        }
1911
1912        cache.run_pending_tasks();
1913        assert_eq!(counters.inserted(), KEYS, "inserted");
1914        assert_eq!(counters.value_created(), KEYS, "value_created");
1915        assert_eq!(counters.evicted(), eviction_count, "evicted");
1916        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1917        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1918
1919        std::mem::drop(cache);
1920        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1921    }
1922
1923    #[test]
1924    fn test_debug_format() {
1925        let cache = SegmentedCache::new(10, 4);
1926        cache.insert('a', "alice");
1927        cache.insert('b', "bob");
1928        cache.insert('c', "cindy");
1929
1930        let debug_str = format!("{cache:?}");
1931        assert!(debug_str.starts_with('{'));
1932        assert!(debug_str.contains(r#"'a': "alice""#));
1933        assert!(debug_str.contains(r#"'b': "bob""#));
1934        assert!(debug_str.contains(r#"'c': "cindy""#));
1935        assert!(debug_str.ends_with('}'));
1936    }
1937
1938    type NotificationPair<V> = (V, RemovalCause);
1939    type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1940
1941    fn verify_notification_vec<K, V, S>(
1942        cache: &SegmentedCache<K, V, S>,
1943        actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1944        expected: &[NotificationTriple<K, V>],
1945    ) where
1946        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1947        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1948        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1949    {
1950        // Retries will be needed when testing in a QEMU VM.
1951        const MAX_RETRIES: usize = 5;
1952        let mut retries = 0;
1953        loop {
1954            // Ensure all scheduled notifications have been processed.
1955            std::thread::sleep(Duration::from_millis(500));
1956
1957            let actual = &*actual.lock();
1958            if actual.len() != expected.len() {
1959                if retries <= MAX_RETRIES {
1960                    retries += 1;
1961                    cache.run_pending_tasks();
1962                    continue;
1963                } else {
1964                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1965                }
1966            }
1967
1968            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1969                assert_eq!(actual, expected, "expected[{i}]");
1970            }
1971
1972            break;
1973        }
1974    }
1975
1976    fn verify_notification_map<K, V, S>(
1977        cache: &SegmentedCache<K, V, S>,
1978        actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1979        expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1980    ) where
1981        K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1982        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1983        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1984    {
1985        // Retries will be needed when testing in a QEMU VM.
1986        const MAX_RETRIES: usize = 5;
1987        let mut retries = 0;
1988        loop {
1989            // Ensure all scheduled notifications have been processed.
1990            std::thread::sleep(Duration::from_millis(500));
1991
1992            let actual = &*actual.lock();
1993            if actual.len() != expected.len() {
1994                if retries <= MAX_RETRIES {
1995                    retries += 1;
1996                    cache.run_pending_tasks();
1997                    continue;
1998                } else {
1999                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
2000                }
2001            }
2002
2003            for actual_key in actual.keys() {
2004                assert_eq!(
2005                    actual.get(actual_key),
2006                    expected.get(actual_key),
2007                    "expected[{actual_key}]",
2008                );
2009            }
2010
2011            break;
2012        }
2013    }
2014}