1use equivalent::Equivalent;
2
3use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
4use crate::common::concurrent::Weigher;
5use crate::common::time::Clock;
6use crate::{
7 common::{
8 iter::{Iter, ScanningGet},
9 HousekeeperConfig,
10 },
11 notification::EvictionListener,
12 policy::{EvictionPolicy, ExpirationPolicy},
13 Entry, Policy, PredicateError,
14};
15
16use std::{
17 collections::hash_map::RandomState,
18 fmt,
19 hash::{BuildHasher, Hash},
20 sync::Arc,
21};
22
23pub struct SegmentedCache<K, V, S = RandomState> {
34 inner: Arc<Inner<K, V, S>>,
35}
36
37unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
38where
39 K: Send + Sync,
40 V: Send + Sync,
41 S: Send,
42{
43}
44
45unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
46where
47 K: Send + Sync,
48 V: Send + Sync,
49 S: Sync,
50{
51}
52
53impl<K, V, S> Clone for SegmentedCache<K, V, S> {
54 fn clone(&self) -> Self {
59 Self {
60 inner: Arc::clone(&self.inner),
61 }
62 }
63}
64
65impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
66where
67 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
68 V: fmt::Debug + Clone + Send + Sync + 'static,
69 S: BuildHasher + Clone + Send + Sync + 'static,
71{
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 let mut d_map = f.debug_map();
74
75 for (k, v) in self {
76 d_map.entry(&k, &v);
77 }
78
79 d_map.finish()
80 }
81}
82
83impl<K, V> SegmentedCache<K, V, RandomState>
84where
85 K: Hash + Eq + Send + Sync + 'static,
86 V: Clone + Send + Sync + 'static,
87{
88 pub fn new(max_capacity: u64, num_segments: usize) -> Self {
100 let build_hasher = RandomState::default();
101 Self::with_everything(
102 None,
103 Some(max_capacity),
104 None,
105 num_segments,
106 build_hasher,
107 None,
108 EvictionPolicy::default(),
109 None,
110 ExpirationPolicy::default(),
111 HousekeeperConfig::default(),
112 false,
113 Clock::default(),
114 )
115 }
116
117 pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
122 CacheBuilder::default().segments(num_segments)
123 }
124}
125
126impl<K, V, S> SegmentedCache<K, V, S> {
127 pub fn name(&self) -> Option<&str> {
129 self.inner.segments[0].name()
130 }
131
132 pub fn policy(&self) -> Policy {
137 let mut policy = self.inner.segments[0].policy();
138 policy.set_max_capacity(self.inner.desired_capacity);
139 policy.set_num_segments(self.inner.segments.len());
140 policy
141 }
142
143 pub fn entry_count(&self) -> u64 {
177 self.inner
178 .segments
179 .iter()
180 .map(|seg| seg.entry_count())
181 .sum()
182 }
183
184 pub fn weighted_size(&self) -> u64 {
192 self.inner
193 .segments
194 .iter()
195 .map(|seg| seg.weighted_size())
196 .sum()
197 }
198}
199
200impl<K, V, S> SegmentedCache<K, V, S>
201where
202 K: Hash + Eq + Send + Sync + 'static,
203 V: Clone + Send + Sync + 'static,
204 S: BuildHasher + Clone + Send + Sync + 'static,
205{
206 #[allow(clippy::too_many_arguments)]
210 pub(crate) fn with_everything(
211 name: Option<String>,
212 max_capacity: Option<u64>,
213 initial_capacity: Option<usize>,
214 num_segments: usize,
215 build_hasher: S,
216 weigher: Option<Weigher<K, V>>,
217 eviction_policy: EvictionPolicy,
218 eviction_listener: Option<EvictionListener<K, V>>,
219 expiration_policy: ExpirationPolicy<K, V>,
220 housekeeper_config: HousekeeperConfig,
221 invalidator_enabled: bool,
222 clock: Clock,
223 ) -> Self {
224 Self {
225 inner: Arc::new(Inner::new(
226 name,
227 max_capacity,
228 initial_capacity,
229 num_segments,
230 build_hasher,
231 weigher,
232 eviction_policy,
233 eviction_listener,
234 expiration_policy,
235 housekeeper_config,
236 invalidator_enabled,
237 clock,
238 )),
239 }
240 }
241
242 pub fn contains_key<Q>(&self, key: &Q) -> bool
251 where
252 Q: Equivalent<K> + Hash + ?Sized,
253 {
254 let hash = self.inner.hash(key);
255 self.inner.select(hash).contains_key_with_hash(key, hash)
256 }
257
258 pub fn get<Q>(&self, key: &Q) -> Option<V>
269 where
270 Q: Equivalent<K> + Hash + Eq + ?Sized,
271 {
272 let hash = self.inner.hash(key);
273 self.inner
274 .select(hash)
275 .get_with_hash(key, hash, false)
276 .map(Entry::into_value)
277 }
278
279 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280 where
281 K: Hash + Eq,
282 {
283 let hash = self.inner.hash(&key);
284 let cache = self.inner.select(hash);
285 OwnedKeyEntrySelector::new(key, hash, cache)
286 }
287
288 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289 where
290 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
291 {
292 let hash = self.inner.hash(key);
293 let cache = self.inner.select(hash);
294 RefKeyEntrySelector::new(key, hash, cache)
295 }
296
297 #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
300 pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
301 self.get_with(key, init)
302 }
303
304 #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
307 pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
308 where
309 F: FnOnce() -> Result<V, E>,
310 E: Send + Sync + 'static,
311 {
312 self.try_get_with(key, init)
313 }
314
315 pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
327 let hash = self.inner.hash(&key);
328 let key = Arc::new(key);
329 let replace_if = None as Option<fn(&V) -> bool>;
330 self.inner
331 .select(hash)
332 .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
333 .into_value()
334 }
335
336 pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
340 where
341 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
342 {
343 let hash = self.inner.hash(key);
344 let replace_if = None as Option<fn(&V) -> bool>;
345 self.inner
346 .select(hash)
347 .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
348 .into_value()
349 }
350
351 pub fn get_with_if(
360 &self,
361 key: K,
362 init: impl FnOnce() -> V,
363 replace_if: impl FnMut(&V) -> bool,
364 ) -> V {
365 let hash = self.inner.hash(&key);
366 let key = Arc::new(key);
367 self.inner
368 .select(hash)
369 .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
370 .into_value()
371 }
372
373 pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
387 where
388 F: FnOnce() -> Option<V>,
389 {
390 let hash = self.inner.hash(&key);
391 let key = Arc::new(key);
392 self.inner
393 .select(hash)
394 .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
395 .map(Entry::into_value)
396 }
397
398 pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
403 where
404 F: FnOnce() -> Option<V>,
405 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
406 {
407 let hash = self.inner.hash(key);
408 self.inner
409 .select(hash)
410 .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
411 .map(Entry::into_value)
412 }
413
414 pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
431 where
432 F: FnOnce() -> Result<V, E>,
433 E: Send + Sync + 'static,
434 {
435 let hash = self.inner.hash(&key);
436 let key = Arc::new(key);
437 self.inner
438 .select(hash)
439 .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
440 .map(Entry::into_value)
441 }
442
443 pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
447 where
448 F: FnOnce() -> Result<V, E>,
449 E: Send + Sync + 'static,
450 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
451 {
452 let hash = self.inner.hash(key);
453 self.inner
454 .select(hash)
455 .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
456 .map(Entry::into_value)
457 }
458
459 pub fn insert(&self, key: K, value: V) {
463 let hash = self.inner.hash(&key);
464 let key = Arc::new(key);
465 self.inner.select(hash).insert_with_hash(key, hash, value);
466 }
467
468 pub fn invalidate<Q>(&self, key: &Q)
476 where
477 Q: Equivalent<K> + Hash + ?Sized,
478 {
479 let hash = self.inner.hash(key);
480 self.inner
481 .select(hash)
482 .invalidate_with_hash(key, hash, false);
483 }
484
485 pub fn remove<Q>(&self, key: &Q) -> Option<V>
493 where
494 Q: Equivalent<K> + Hash + ?Sized,
495 {
496 let hash = self.inner.hash(key);
497 self.inner
498 .select(hash)
499 .invalidate_with_hash(key, hash, true)
500 }
501
502 pub fn invalidate_all(&self) {
517 for segment in self.inner.segments.iter() {
518 segment.invalidate_all();
519 }
520 }
521
522 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
556 where
557 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
558 {
559 let pred = Arc::new(predicate);
560 for segment in self.inner.segments.iter() {
561 segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
562 }
563 Ok(())
564 }
565
566 pub fn iter(&self) -> Iter<'_, K, V> {
614 let num_cht_segments = self.inner.segments[0].num_cht_segments();
615 let segments = self
616 .inner
617 .segments
618 .iter()
619 .map(|c| c as &dyn ScanningGet<_, _>)
620 .collect::<Vec<_>>()
621 .into_boxed_slice();
622 Iter::with_multiple_cache_segments(segments, num_cht_segments)
623 }
624
625 pub fn run_pending_tasks(&self) {
627 for segment in self.inner.segments.iter() {
628 segment.run_pending_tasks();
629 }
630 }
631}
632
633impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
634where
635 K: Hash + Eq + Send + Sync + 'static,
636 V: Clone + Send + Sync + 'static,
637 S: BuildHasher + Clone + Send + Sync + 'static,
638{
639 type Item = (Arc<K>, V);
640
641 type IntoIter = Iter<'a, K, V>;
642
643 fn into_iter(self) -> Self::IntoIter {
644 self.iter()
645 }
646}
647
648#[cfg(test)]
650impl<K, V, S> SegmentedCache<K, V, S> {
651 fn is_waiter_map_empty(&self) -> bool {
652 self.inner.segments.iter().all(Cache::is_waiter_map_empty)
653 }
654}
655
656#[cfg(test)]
657impl<K, V, S> SegmentedCache<K, V, S>
658where
659 K: Hash + Eq + Send + Sync + 'static,
660 V: Clone + Send + Sync + 'static,
661 S: BuildHasher + Clone + Send + Sync + 'static,
662{
663 fn invalidation_predicate_count(&self) -> usize {
664 self.inner
665 .segments
666 .iter()
667 .map(|seg| seg.invalidation_predicate_count())
668 .sum()
669 }
670
671 fn reconfigure_for_testing(&mut self) {
672 let inner = Arc::get_mut(&mut self.inner)
673 .expect("There are other strong reference to self.inner Arc");
674
675 for segment in inner.segments.iter_mut() {
676 segment.reconfigure_for_testing();
677 }
678 }
679
680 fn key_locks_map_is_empty(&self) -> bool {
681 self.inner
682 .segments
683 .iter()
684 .all(|seg| seg.key_locks_map_is_empty())
685 }
686}
687
688struct Inner<K, V, S> {
689 desired_capacity: Option<u64>,
690 segments: Box<[Cache<K, V, S>]>,
691 build_hasher: S,
692 segment_shift: u32,
693}
694
695impl<K, V, S> Inner<K, V, S>
696where
697 K: Hash + Eq + Send + Sync + 'static,
698 V: Clone + Send + Sync + 'static,
699 S: BuildHasher + Clone + Send + Sync + 'static,
700{
701 #[allow(clippy::too_many_arguments)]
705 fn new(
706 name: Option<String>,
707 max_capacity: Option<u64>,
708 initial_capacity: Option<usize>,
709 num_segments: usize,
710 build_hasher: S,
711 weigher: Option<Weigher<K, V>>,
712 eviction_policy: EvictionPolicy,
713 eviction_listener: Option<EvictionListener<K, V>>,
714 expiration_policy: ExpirationPolicy<K, V>,
715 housekeeper_config: HousekeeperConfig,
716 invalidator_enabled: bool,
717 clock: Clock,
718 ) -> Self {
719 assert!(num_segments > 0);
720
721 let actual_num_segments = num_segments.next_power_of_two();
722 let segment_shift = 64 - actual_num_segments.trailing_zeros();
723 let seg_max_capacity =
724 max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
725 let seg_init_capacity =
726 initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
727 let segments = (0..actual_num_segments)
730 .map(|_| {
731 Cache::with_everything(
732 name.clone(),
733 seg_max_capacity,
734 seg_init_capacity,
735 build_hasher.clone(),
736 weigher.clone(),
737 eviction_policy.clone(),
738 eviction_listener.clone(),
739 expiration_policy.clone(),
740 housekeeper_config.clone(),
741 invalidator_enabled,
742 clock.clone(),
743 )
744 })
745 .collect::<Vec<_>>();
746
747 Self {
748 desired_capacity: max_capacity,
749 segments: segments.into_boxed_slice(),
750 build_hasher,
751 segment_shift,
752 }
753 }
754
755 #[inline]
756 fn hash<Q>(&self, key: &Q) -> u64
757 where
758 Q: Equivalent<K> + Hash + ?Sized,
759 {
760 self.build_hasher.hash_one(key)
761 }
762
763 #[inline]
764 fn select(&self, hash: u64) -> &Cache<K, V, S> {
765 let index = self.segment_index_from_hash(hash);
766 &self.segments[index]
767 }
768
769 #[inline]
770 fn segment_index_from_hash(&self, hash: u64) -> usize {
771 if self.segment_shift == 64 {
772 0
773 } else {
774 (hash >> self.segment_shift) as usize
775 }
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use super::SegmentedCache;
782 use crate::notification::RemovalCause;
783 use parking_lot::Mutex;
784 use std::{error::Error, fmt::Display, sync::Arc, time::Duration};
785
786 #[test]
787 fn max_capacity_zero() {
788 let mut cache = SegmentedCache::new(0, 1);
789 cache.reconfigure_for_testing();
790
791 let cache = cache;
793
794 cache.insert(0, ());
795
796 assert!(!cache.contains_key(&0));
797 assert!(cache.get(&0).is_none());
798 cache.run_pending_tasks();
799 assert!(!cache.contains_key(&0));
800 assert!(cache.get(&0).is_none());
801 assert_eq!(cache.entry_count(), 0)
802 }
803
804 #[test]
805 fn basic_single_thread() {
806 let actual = Arc::new(Mutex::new(Vec::new()));
808 let mut expected = Vec::new();
809
810 let a1 = Arc::clone(&actual);
812 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
813
814 let mut cache = SegmentedCache::builder(1)
816 .max_capacity(3)
817 .eviction_listener(listener)
818 .build();
819 cache.reconfigure_for_testing();
820
821 let cache = cache;
823
824 cache.insert("a", "alice");
825 cache.insert("b", "bob");
826 assert_eq!(cache.get(&"a"), Some("alice"));
827 assert!(cache.contains_key(&"a"));
828 assert!(cache.contains_key(&"b"));
829 assert_eq!(cache.get(&"b"), Some("bob"));
830 cache.run_pending_tasks();
831 cache.insert("c", "cindy");
834 assert_eq!(cache.get(&"c"), Some("cindy"));
835 assert!(cache.contains_key(&"c"));
836 cache.run_pending_tasks();
838
839 assert!(cache.contains_key(&"a"));
840 assert_eq!(cache.get(&"a"), Some("alice"));
841 assert_eq!(cache.get(&"b"), Some("bob"));
842 assert!(cache.contains_key(&"b"));
843 cache.run_pending_tasks();
844 cache.insert("d", "david"); expected.push((Arc::new("d"), "david", RemovalCause::Size));
849 cache.run_pending_tasks();
850 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
852
853 cache.insert("d", "david");
854 expected.push((Arc::new("d"), "david", RemovalCause::Size));
855 cache.run_pending_tasks();
856 assert!(!cache.contains_key(&"d"));
857 assert_eq!(cache.get(&"d"), None); cache.insert("d", "dennis");
862 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
863 cache.run_pending_tasks();
864 assert_eq!(cache.get(&"a"), Some("alice"));
865 assert_eq!(cache.get(&"b"), Some("bob"));
866 assert_eq!(cache.get(&"c"), None);
867 assert_eq!(cache.get(&"d"), Some("dennis"));
868 assert!(cache.contains_key(&"a"));
869 assert!(cache.contains_key(&"b"));
870 assert!(!cache.contains_key(&"c"));
871 assert!(cache.contains_key(&"d"));
872
873 cache.invalidate(&"b");
874 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
875 cache.run_pending_tasks();
876 assert_eq!(cache.get(&"b"), None);
877 assert!(!cache.contains_key(&"b"));
878
879 assert!(cache.remove(&"b").is_none());
880 assert_eq!(cache.remove(&"d"), Some("dennis"));
881 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
882 cache.run_pending_tasks();
883 assert_eq!(cache.get(&"d"), None);
884 assert!(!cache.contains_key(&"d"));
885
886 verify_notification_vec(&cache, actual, &expected);
887 assert!(cache.key_locks_map_is_empty());
888 }
889
890 #[test]
891 fn non_power_of_two_segments() {
892 let mut cache = SegmentedCache::new(100, 5);
893 cache.reconfigure_for_testing();
894
895 let cache = cache;
897
898 assert_eq!(cache.iter().count(), 0);
899
900 cache.insert("a", "alice");
901 cache.insert("b", "bob");
902 cache.insert("c", "cindy");
903
904 assert_eq!(cache.iter().count(), 3);
905 cache.run_pending_tasks();
906 assert_eq!(cache.iter().count(), 3);
907 }
908
909 #[test]
910 fn size_aware_eviction() {
911 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
912
913 let alice = ("alice", 10);
914 let bob = ("bob", 15);
915 let bill = ("bill", 20);
916 let cindy = ("cindy", 5);
917 let david = ("david", 15);
918 let dennis = ("dennis", 15);
919
920 let actual = Arc::new(Mutex::new(Vec::new()));
922 let mut expected = Vec::new();
923
924 let a1 = Arc::clone(&actual);
926 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
927
928 let mut cache = SegmentedCache::builder(1)
930 .max_capacity(31)
931 .weigher(weigher)
932 .eviction_listener(listener)
933 .build();
934 cache.reconfigure_for_testing();
935
936 let cache = cache;
938
939 cache.insert("a", alice);
940 cache.insert("b", bob);
941 assert_eq!(cache.get(&"a"), Some(alice));
942 assert!(cache.contains_key(&"a"));
943 assert!(cache.contains_key(&"b"));
944 assert_eq!(cache.get(&"b"), Some(bob));
945 cache.run_pending_tasks();
946 cache.insert("c", cindy);
949 assert_eq!(cache.get(&"c"), Some(cindy));
950 assert!(cache.contains_key(&"c"));
951 cache.run_pending_tasks();
953
954 assert!(cache.contains_key(&"a"));
955 assert_eq!(cache.get(&"a"), Some(alice));
956 assert_eq!(cache.get(&"b"), Some(bob));
957 assert!(cache.contains_key(&"b"));
958 cache.run_pending_tasks();
959 cache.insert("d", david); expected.push((Arc::new("d"), david, RemovalCause::Size));
966 cache.run_pending_tasks();
967 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
969
970 cache.insert("d", david);
971 expected.push((Arc::new("d"), david, RemovalCause::Size));
972 cache.run_pending_tasks();
973 assert!(!cache.contains_key(&"d"));
974 assert_eq!(cache.get(&"d"), None); cache.insert("d", david);
977 expected.push((Arc::new("d"), david, RemovalCause::Size));
978 cache.run_pending_tasks();
979 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
981
982 cache.insert("d", david);
983 expected.push((Arc::new("d"), david, RemovalCause::Size));
984 cache.run_pending_tasks();
985 assert!(!cache.contains_key(&"d"));
986 assert_eq!(cache.get(&"d"), None); cache.insert("d", dennis);
990 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
991 expected.push((Arc::new("a"), alice, RemovalCause::Size));
992 cache.run_pending_tasks();
993 assert_eq!(cache.get(&"a"), None);
994 assert_eq!(cache.get(&"b"), Some(bob));
995 assert_eq!(cache.get(&"c"), None);
996 assert_eq!(cache.get(&"d"), Some(dennis));
997 assert!(!cache.contains_key(&"a"));
998 assert!(cache.contains_key(&"b"));
999 assert!(!cache.contains_key(&"c"));
1000 assert!(cache.contains_key(&"d"));
1001
1002 cache.insert("b", bill);
1004 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1005 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1006 cache.run_pending_tasks();
1007 assert_eq!(cache.get(&"b"), Some(bill));
1008 assert_eq!(cache.get(&"d"), None);
1009 assert!(cache.contains_key(&"b"));
1010 assert!(!cache.contains_key(&"d"));
1011
1012 cache.insert("a", alice);
1014 cache.insert("b", bob);
1015 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1016 cache.run_pending_tasks();
1017 assert_eq!(cache.get(&"a"), Some(alice));
1018 assert_eq!(cache.get(&"b"), Some(bob));
1019 assert_eq!(cache.get(&"d"), None);
1020 assert!(cache.contains_key(&"a"));
1021 assert!(cache.contains_key(&"b"));
1022 assert!(!cache.contains_key(&"d"));
1023
1024 assert_eq!(cache.entry_count(), 2);
1026 assert_eq!(cache.weighted_size(), 25);
1027
1028 verify_notification_vec(&cache, actual, &expected);
1029 assert!(cache.key_locks_map_is_empty());
1030 }
1031
1032 #[test]
1033 fn basic_multi_threads() {
1034 let num_threads = 4;
1035
1036 let mut cache = SegmentedCache::new(100, num_threads);
1037 cache.reconfigure_for_testing();
1038
1039 let cache = cache;
1041
1042 #[allow(clippy::needless_collect)]
1044 let handles = (0..num_threads)
1045 .map(|id| {
1046 let cache = cache.clone();
1047 std::thread::spawn(move || {
1048 cache.insert(10, format!("{id}-100"));
1049 cache.get(&10);
1050 cache.run_pending_tasks();
1051 cache.insert(20, format!("{id}-200"));
1052 cache.invalidate(&10);
1053 })
1054 })
1055 .collect::<Vec<_>>();
1056
1057 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1058
1059 cache.run_pending_tasks();
1060
1061 assert!(cache.get(&10).is_none());
1062 assert!(cache.get(&20).is_some());
1063 assert!(!cache.contains_key(&10));
1064 assert!(cache.contains_key(&20));
1065 }
1066
1067 #[test]
1068 fn invalidate_all() {
1069 use std::collections::HashMap;
1070
1071 let actual = Arc::new(Mutex::new(HashMap::new()));
1074 let mut expected = HashMap::new();
1075
1076 let a1 = Arc::clone(&actual);
1078 let listener = move |k, v, cause| {
1079 a1.lock().insert(k, (v, cause));
1080 };
1081
1082 let mut cache = SegmentedCache::builder(4)
1084 .max_capacity(100)
1085 .eviction_listener(listener)
1086 .build();
1087 cache.reconfigure_for_testing();
1088
1089 let cache = cache;
1091
1092 cache.insert("a", "alice");
1093 cache.insert("b", "bob");
1094 cache.insert("c", "cindy");
1095 assert_eq!(cache.get(&"a"), Some("alice"));
1096 assert_eq!(cache.get(&"b"), Some("bob"));
1097 assert_eq!(cache.get(&"c"), Some("cindy"));
1098 assert!(cache.contains_key(&"a"));
1099 assert!(cache.contains_key(&"b"));
1100 assert!(cache.contains_key(&"c"));
1101
1102 cache.invalidate_all();
1107 expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1108 expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1109 expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1110 cache.run_pending_tasks();
1111
1112 cache.insert("d", "david");
1113 cache.run_pending_tasks();
1114
1115 assert!(cache.get(&"a").is_none());
1116 assert!(cache.get(&"b").is_none());
1117 assert!(cache.get(&"c").is_none());
1118 assert_eq!(cache.get(&"d"), Some("david"));
1119 assert!(!cache.contains_key(&"a"));
1120 assert!(!cache.contains_key(&"b"));
1121 assert!(!cache.contains_key(&"c"));
1122 assert!(cache.contains_key(&"d"));
1123
1124 verify_notification_map(&cache, actual, &expected);
1125 }
1126
1127 #[test]
1128 fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1129 use std::collections::{HashMap, HashSet};
1130
1131 const SEGMENTS: usize = 4;
1132
1133 let actual = Arc::new(Mutex::new(HashMap::new()));
1136 let mut expected = HashMap::new();
1137
1138 let a1 = Arc::clone(&actual);
1140 let listener = move |k, v, cause| {
1141 a1.lock().insert(k, (v, cause));
1142 };
1143
1144 let (clock, mock) = crate::common::time::Clock::mock();
1145
1146 let mut cache = SegmentedCache::builder(SEGMENTS)
1148 .max_capacity(100)
1149 .support_invalidation_closures()
1150 .eviction_listener(listener)
1151 .clock(clock)
1152 .build();
1153 cache.reconfigure_for_testing();
1154
1155 let cache = cache;
1157
1158 cache.insert(0, "alice");
1159 cache.insert(1, "bob");
1160 cache.insert(2, "alex");
1161 cache.run_pending_tasks();
1162 mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
1164
1165 assert_eq!(cache.get(&0), Some("alice"));
1166 assert_eq!(cache.get(&1), Some("bob"));
1167 assert_eq!(cache.get(&2), Some("alex"));
1168 assert!(cache.contains_key(&0));
1169 assert!(cache.contains_key(&1));
1170 assert!(cache.contains_key(&2));
1171
1172 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1173 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1174 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1175 expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1176 expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1177
1178 mock.increment(Duration::from_secs(5)); cache.insert(3, "alice");
1181
1182 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1185 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1187
1188 assert!(cache.get(&0).is_none());
1189 assert!(cache.get(&2).is_none());
1190 assert_eq!(cache.get(&1), Some("bob"));
1191 assert_eq!(cache.get(&3), Some("alice"));
1193
1194 assert!(!cache.contains_key(&0));
1195 assert!(cache.contains_key(&1));
1196 assert!(!cache.contains_key(&2));
1197 assert!(cache.contains_key(&3));
1198
1199 assert_eq!(cache.entry_count(), 2);
1200 assert_eq!(cache.invalidation_predicate_count(), 0);
1201
1202 mock.increment(Duration::from_secs(5)); cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1205 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1206 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1207 expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1208 expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1209
1210 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1213 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1215
1216 assert!(cache.get(&1).is_none());
1217 assert!(cache.get(&3).is_none());
1218
1219 assert!(!cache.contains_key(&1));
1220 assert!(!cache.contains_key(&3));
1221
1222 assert_eq!(cache.entry_count(), 0);
1223 assert_eq!(cache.invalidation_predicate_count(), 0);
1224
1225 verify_notification_map(&cache, actual, &expected);
1226
1227 Ok(())
1228 }
1229
1230 #[test]
1231 fn test_iter() {
1232 const NUM_KEYS: usize = 50;
1233
1234 fn make_value(key: usize) -> String {
1235 format!("val: {key}")
1236 }
1237
1238 let cache = SegmentedCache::builder(4)
1240 .max_capacity(100)
1241 .time_to_idle(Duration::from_secs(10))
1242 .build();
1243
1244 for key in 0..NUM_KEYS {
1245 cache.insert(key, make_value(key));
1246 }
1247
1248 let mut key_set = std::collections::HashSet::new();
1249
1250 for (key, value) in &cache {
1251 assert_eq!(value, make_value(*key));
1252
1253 key_set.insert(*key);
1254 }
1255
1256 assert_eq!(key_set.len(), NUM_KEYS);
1258 }
1259
1260 #[test]
1266 fn test_iter_multi_threads() {
1267 use std::collections::HashSet;
1268
1269 const NUM_KEYS: usize = 1024;
1270 const NUM_THREADS: usize = 16;
1271
1272 fn make_value(key: usize) -> String {
1273 format!("val: {key}")
1274 }
1275
1276 let cache = SegmentedCache::builder(4)
1277 .max_capacity(2048)
1278 .time_to_idle(Duration::from_secs(10))
1279 .build();
1280
1281 for key in 0..NUM_KEYS {
1283 cache.insert(key, make_value(key));
1284 }
1285
1286 let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1287 let write_lock = rw_lock.write().unwrap();
1288
1289 #[allow(clippy::needless_collect)]
1291 let handles = (0..NUM_THREADS)
1292 .map(|n| {
1293 let cache = cache.clone();
1294 let rw_lock = Arc::clone(&rw_lock);
1295
1296 if n % 2 == 0 {
1297 std::thread::spawn(move || {
1299 let read_lock = rw_lock.read().unwrap();
1300 for key in 0..NUM_KEYS {
1301 cache.insert(key, make_value(key));
1303 }
1304 std::mem::drop(read_lock);
1305 })
1306 } else {
1307 std::thread::spawn(move || {
1309 let read_lock = rw_lock.read().unwrap();
1310 let mut key_set = HashSet::new();
1311 for (key, value) in &cache {
1312 assert_eq!(value, make_value(*key));
1313 key_set.insert(*key);
1314 }
1315 assert_eq!(key_set.len(), NUM_KEYS);
1317 std::mem::drop(read_lock);
1318 })
1319 }
1320 })
1321 .collect::<Vec<_>>();
1322
1323 std::mem::drop(write_lock);
1325
1326 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1327
1328 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1330 assert_eq!(key_set.len(), NUM_KEYS);
1331 }
1332
1333 #[test]
1334 fn get_with() {
1335 use std::thread::{sleep, spawn};
1336
1337 let cache = SegmentedCache::new(100, 4);
1338 const KEY: u32 = 0;
1339
1340 let thread1 = {
1346 let cache1 = cache.clone();
1347 spawn(move || {
1348 let v = cache1.get_with(KEY, || {
1350 sleep(Duration::from_millis(300));
1352 "thread1"
1353 });
1354 assert_eq!(v, "thread1");
1355 })
1356 };
1357
1358 let thread2 = {
1362 let cache2 = cache.clone();
1363 spawn(move || {
1364 sleep(Duration::from_millis(100));
1366 let v = cache2.get_with(KEY, || unreachable!());
1367 assert_eq!(v, "thread1");
1368 })
1369 };
1370
1371 let thread3 = {
1377 let cache3 = cache.clone();
1378 spawn(move || {
1379 sleep(Duration::from_millis(400));
1381 let v = cache3.get_with(KEY, || unreachable!());
1382 assert_eq!(v, "thread1");
1383 })
1384 };
1385
1386 let thread4 = {
1389 let cache4 = cache.clone();
1390 spawn(move || {
1391 sleep(Duration::from_millis(200));
1393 let maybe_v = cache4.get(&KEY);
1394 assert!(maybe_v.is_none());
1395 })
1396 };
1397
1398 let thread5 = {
1401 let cache5 = cache.clone();
1402 spawn(move || {
1403 sleep(Duration::from_millis(400));
1405 let maybe_v = cache5.get(&KEY);
1406 assert_eq!(maybe_v, Some("thread1"));
1407 })
1408 };
1409
1410 for t in [thread1, thread2, thread3, thread4, thread5] {
1411 t.join().expect("Failed to join");
1412 }
1413
1414 assert!(cache.is_waiter_map_empty());
1415 }
1416
1417 #[test]
1418 fn get_with_if() {
1419 use std::thread::{sleep, spawn};
1420
1421 let cache = SegmentedCache::new(100, 4);
1422 const KEY: u32 = 0;
1423
1424 let thread1 = {
1430 let cache1 = cache.clone();
1431 spawn(move || {
1432 let v = cache1.get_with_if(
1434 KEY,
1435 || {
1436 sleep(Duration::from_millis(300));
1438 "thread1"
1439 },
1440 |_v| unreachable!(),
1441 );
1442 assert_eq!(v, "thread1");
1443 })
1444 };
1445
1446 let thread2 = {
1450 let cache2 = cache.clone();
1451 spawn(move || {
1452 sleep(Duration::from_millis(100));
1454 let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1455 assert_eq!(v, "thread1");
1456 })
1457 };
1458
1459 let thread3 = {
1466 let cache3 = cache.clone();
1467 spawn(move || {
1468 sleep(Duration::from_millis(350));
1470 let v = cache3.get_with_if(
1471 KEY,
1472 || unreachable!(),
1473 |v| {
1474 assert_eq!(v, &"thread1");
1475 false
1476 },
1477 );
1478 assert_eq!(v, "thread1");
1479 })
1480 };
1481
1482 let thread4 = {
1487 let cache4 = cache.clone();
1488 spawn(move || {
1489 sleep(Duration::from_millis(400));
1491 let v = cache4.get_with_if(
1492 KEY,
1493 || "thread4",
1494 |v| {
1495 assert_eq!(v, &"thread1");
1496 true
1497 },
1498 );
1499 assert_eq!(v, "thread4");
1500 })
1501 };
1502
1503 let thread5 = {
1506 let cache5 = cache.clone();
1507 spawn(move || {
1508 sleep(Duration::from_millis(200));
1510 let maybe_v = cache5.get(&KEY);
1511 assert!(maybe_v.is_none());
1512 })
1513 };
1514
1515 let thread6 = {
1518 let cache6 = cache.clone();
1519 spawn(move || {
1520 sleep(Duration::from_millis(350));
1522 let maybe_v = cache6.get(&KEY);
1523 assert_eq!(maybe_v, Some("thread1"));
1524 })
1525 };
1526
1527 let thread7 = {
1530 let cache7 = cache.clone();
1531 spawn(move || {
1532 sleep(Duration::from_millis(450));
1534 let maybe_v = cache7.get(&KEY);
1535 assert_eq!(maybe_v, Some("thread4"));
1536 })
1537 };
1538
1539 for t in [
1540 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1541 ] {
1542 t.join().expect("Failed to join");
1543 }
1544
1545 assert!(cache.is_waiter_map_empty());
1546 }
1547
1548 #[test]
1549 fn try_get_with() {
1550 use std::{
1551 sync::Arc,
1552 thread::{sleep, spawn},
1553 };
1554
1555 #[derive(Debug)]
1556 pub struct MyError(String);
1557
1558 impl Display for MyError {
1559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1560 write!(f, "{}", self.0)
1561 }
1562 }
1563
1564 impl Error for MyError {}
1565
1566 type MyResult<T> = Result<T, Arc<MyError>>;
1567
1568 let cache = SegmentedCache::new(100, 4);
1569 const KEY: u32 = 0;
1570
1571 let thread1 = {
1577 let cache1 = cache.clone();
1578 spawn(move || {
1579 let v = cache1.try_get_with(KEY, || {
1581 sleep(Duration::from_millis(300));
1583 Err(MyError("thread1 error".into()))
1584 });
1585 assert!(v.is_err());
1586 })
1587 };
1588
1589 let thread2 = {
1594 let cache2 = cache.clone();
1595 spawn(move || {
1596 sleep(Duration::from_millis(100));
1598 let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1599 assert!(v.is_err());
1600 })
1601 };
1602
1603 let thread3 = {
1609 let cache3 = cache.clone();
1610 spawn(move || {
1611 sleep(Duration::from_millis(400));
1613 let v: MyResult<_> = cache3.try_get_with(KEY, || {
1614 sleep(Duration::from_millis(300));
1616 Ok("thread3")
1617 });
1618 assert_eq!(v.unwrap(), "thread3");
1619 })
1620 };
1621
1622 let thread4 = {
1626 let cache4 = cache.clone();
1627 spawn(move || {
1628 sleep(Duration::from_millis(500));
1630 let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1631 assert_eq!(v.unwrap(), "thread3");
1632 })
1633 };
1634
1635 let thread5 = {
1641 let cache5 = cache.clone();
1642 spawn(move || {
1643 sleep(Duration::from_millis(800));
1645 let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1646 assert_eq!(v.unwrap(), "thread3");
1647 })
1648 };
1649
1650 let thread6 = {
1653 let cache6 = cache.clone();
1654 spawn(move || {
1655 sleep(Duration::from_millis(200));
1657 let maybe_v = cache6.get(&KEY);
1658 assert!(maybe_v.is_none());
1659 })
1660 };
1661
1662 let thread7 = {
1665 let cache7 = cache.clone();
1666 spawn(move || {
1667 sleep(Duration::from_millis(400));
1669 let maybe_v = cache7.get(&KEY);
1670 assert!(maybe_v.is_none());
1671 })
1672 };
1673
1674 let thread8 = {
1677 let cache8 = cache.clone();
1678 spawn(move || {
1679 sleep(Duration::from_millis(800));
1681 let maybe_v = cache8.get(&KEY);
1682 assert_eq!(maybe_v, Some("thread3"));
1683 })
1684 };
1685
1686 for t in [
1687 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1688 ] {
1689 t.join().expect("Failed to join");
1690 }
1691
1692 assert!(cache.is_waiter_map_empty());
1693 }
1694
1695 #[test]
1696 fn optionally_get_with() {
1697 use std::thread::{sleep, spawn};
1698
1699 let cache = SegmentedCache::new(100, 4);
1700 const KEY: u32 = 0;
1701
1702 let thread1 = {
1708 let cache1 = cache.clone();
1709 spawn(move || {
1710 let v = cache1.optionally_get_with(KEY, || {
1712 sleep(Duration::from_millis(300));
1714 None
1715 });
1716 assert!(v.is_none());
1717 })
1718 };
1719
1720 let thread2 = {
1725 let cache2 = cache.clone();
1726 spawn(move || {
1727 sleep(Duration::from_millis(100));
1729 let v = cache2.optionally_get_with(KEY, || unreachable!());
1730 assert!(v.is_none());
1731 })
1732 };
1733
1734 let thread3 = {
1740 let cache3 = cache.clone();
1741 spawn(move || {
1742 sleep(Duration::from_millis(400));
1744 let v = cache3.optionally_get_with(KEY, || {
1745 sleep(Duration::from_millis(300));
1747 Some("thread3")
1748 });
1749 assert_eq!(v.unwrap(), "thread3");
1750 })
1751 };
1752
1753 let thread4 = {
1757 let cache4 = cache.clone();
1758 spawn(move || {
1759 sleep(Duration::from_millis(500));
1761 let v = cache4.optionally_get_with(KEY, || unreachable!());
1762 assert_eq!(v.unwrap(), "thread3");
1763 })
1764 };
1765
1766 let thread5 = {
1772 let cache5 = cache.clone();
1773 spawn(move || {
1774 sleep(Duration::from_millis(800));
1776 let v = cache5.optionally_get_with(KEY, || unreachable!());
1777 assert_eq!(v.unwrap(), "thread3");
1778 })
1779 };
1780
1781 let thread6 = {
1784 let cache6 = cache.clone();
1785 spawn(move || {
1786 sleep(Duration::from_millis(200));
1788 let maybe_v = cache6.get(&KEY);
1789 assert!(maybe_v.is_none());
1790 })
1791 };
1792
1793 let thread7 = {
1796 let cache7 = cache.clone();
1797 spawn(move || {
1798 sleep(Duration::from_millis(400));
1800 let maybe_v = cache7.get(&KEY);
1801 assert!(maybe_v.is_none());
1802 })
1803 };
1804
1805 let thread8 = {
1808 let cache8 = cache.clone();
1809 spawn(move || {
1810 sleep(Duration::from_millis(800));
1812 let maybe_v = cache8.get(&KEY);
1813 assert_eq!(maybe_v, Some("thread3"));
1814 })
1815 };
1816
1817 for t in [
1818 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1819 ] {
1820 t.join().expect("Failed to join");
1821 }
1822
1823 assert!(cache.is_waiter_map_empty());
1824 }
1825
1826 #[test]
1830 fn borrowed_forms_of_key() {
1831 let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1832
1833 let key = vec![1_u8];
1834 cache.insert(key.clone(), ());
1835
1836 let key_v: &Vec<u8> = &key;
1838 assert!(cache.contains_key(key_v));
1839 assert_eq!(cache.get(key_v), Some(()));
1840 cache.invalidate(key_v);
1841
1842 cache.insert(key, ());
1843
1844 let key_s: &[u8] = &[1_u8];
1846 assert!(cache.contains_key(key_s));
1847 assert_eq!(cache.get(key_s), Some(()));
1848 cache.invalidate(key_s);
1849 }
1850
1851 #[test]
1854 #[ignore]
1855 fn drop_value_immediately_after_eviction() {
1856 use crate::common::test_utils::{Counters, Value};
1857
1858 const NUM_SEGMENTS: usize = 1;
1859 const MAX_CAPACITY: u32 = 500;
1860 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1861
1862 let counters = Arc::new(Counters::default());
1863 let counters1 = Arc::clone(&counters);
1864
1865 let listener = move |_k, _v, cause| match cause {
1866 RemovalCause::Size => counters1.incl_evicted(),
1867 RemovalCause::Explicit => counters1.incl_invalidated(),
1868 _ => (),
1869 };
1870
1871 let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1872 .max_capacity(MAX_CAPACITY as u64)
1873 .eviction_listener(listener)
1874 .build();
1875 cache.reconfigure_for_testing();
1876
1877 let cache = cache;
1879
1880 for key in 0..KEYS {
1881 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1882 cache.insert(key, value);
1883 counters.incl_inserted();
1884 cache.run_pending_tasks();
1885 }
1886
1887 let eviction_count = KEYS - MAX_CAPACITY;
1888
1889 cache.run_pending_tasks();
1890 assert_eq!(counters.inserted(), KEYS, "inserted");
1891 assert_eq!(counters.value_created(), KEYS, "value_created");
1892 assert_eq!(counters.evicted(), eviction_count, "evicted");
1893 assert_eq!(counters.invalidated(), 0, "invalidated");
1894 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1895
1896 for key in 0..KEYS {
1897 cache.invalidate(&key);
1898 cache.run_pending_tasks();
1899 }
1900
1901 cache.run_pending_tasks();
1902 assert_eq!(counters.inserted(), KEYS, "inserted");
1903 assert_eq!(counters.value_created(), KEYS, "value_created");
1904 assert_eq!(counters.evicted(), eviction_count, "evicted");
1905 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1906 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1907
1908 std::mem::drop(cache);
1909 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1910 }
1911
1912 #[test]
1913 fn test_debug_format() {
1914 let cache = SegmentedCache::new(10, 4);
1915 cache.insert('a', "alice");
1916 cache.insert('b', "bob");
1917 cache.insert('c', "cindy");
1918
1919 let debug_str = format!("{cache:?}");
1920 assert!(debug_str.starts_with('{'));
1921 assert!(debug_str.contains(r#"'a': "alice""#));
1922 assert!(debug_str.contains(r#"'b': "bob""#));
1923 assert!(debug_str.contains(r#"'c': "cindy""#));
1924 assert!(debug_str.ends_with('}'));
1925 }
1926
1927 type NotificationPair<V> = (V, RemovalCause);
1928 type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1929
1930 fn verify_notification_vec<K, V, S>(
1931 cache: &SegmentedCache<K, V, S>,
1932 actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1933 expected: &[NotificationTriple<K, V>],
1934 ) where
1935 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1936 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1937 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1938 {
1939 const MAX_RETRIES: usize = 5;
1941 let mut retries = 0;
1942 loop {
1943 std::thread::sleep(Duration::from_millis(500));
1945
1946 let actual = &*actual.lock();
1947 if actual.len() != expected.len() {
1948 if retries <= MAX_RETRIES {
1949 retries += 1;
1950 cache.run_pending_tasks();
1951 continue;
1952 } else {
1953 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1954 }
1955 }
1956
1957 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1958 assert_eq!(actual, expected, "expected[{i}]");
1959 }
1960
1961 break;
1962 }
1963 }
1964
1965 fn verify_notification_map<K, V, S>(
1966 cache: &SegmentedCache<K, V, S>,
1967 actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1968 expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1969 ) where
1970 K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1971 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1972 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1973 {
1974 const MAX_RETRIES: usize = 5;
1976 let mut retries = 0;
1977 loop {
1978 std::thread::sleep(Duration::from_millis(500));
1980
1981 let actual = &*actual.lock();
1982 if actual.len() != expected.len() {
1983 if retries <= MAX_RETRIES {
1984 retries += 1;
1985 cache.run_pending_tasks();
1986 continue;
1987 } else {
1988 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1989 }
1990 }
1991
1992 for actual_key in actual.keys() {
1993 assert_eq!(
1994 actual.get(actual_key),
1995 expected.get(actual_key),
1996 "expected[{actual_key}]",
1997 );
1998 }
1999
2000 break;
2001 }
2002 }
2003}