1use equivalent::Equivalent;
2
3use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
4use crate::common::concurrent::Weigher;
5use crate::common::time::Clock;
6use crate::{
7 common::{
8 iter::{Iter, ScanningGet},
9 HousekeeperConfig,
10 },
11 notification::EvictionListener,
12 policy::{EvictionPolicy, ExpirationPolicy},
13 Entry, Policy, PredicateError,
14};
15
16use std::{
17 collections::hash_map::RandomState,
18 fmt,
19 hash::{BuildHasher, Hash, Hasher},
20 sync::Arc,
21};
22
23pub struct SegmentedCache<K, V, S = RandomState> {
34 inner: Arc<Inner<K, V, S>>,
35}
36
37unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
38where
39 K: Send + Sync,
40 V: Send + Sync,
41 S: Send,
42{
43}
44
45unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
46where
47 K: Send + Sync,
48 V: Send + Sync,
49 S: Sync,
50{
51}
52
53impl<K, V, S> Clone for SegmentedCache<K, V, S> {
54 fn clone(&self) -> Self {
59 Self {
60 inner: Arc::clone(&self.inner),
61 }
62 }
63}
64
65impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
66where
67 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
68 V: fmt::Debug + Clone + Send + Sync + 'static,
69 S: BuildHasher + Clone + Send + Sync + 'static,
71{
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 let mut d_map = f.debug_map();
74
75 for (k, v) in self {
76 d_map.entry(&k, &v);
77 }
78
79 d_map.finish()
80 }
81}
82
83impl<K, V> SegmentedCache<K, V, RandomState>
84where
85 K: Hash + Eq + Send + Sync + 'static,
86 V: Clone + Send + Sync + 'static,
87{
88 pub fn new(max_capacity: u64, num_segments: usize) -> Self {
100 let build_hasher = RandomState::default();
101 Self::with_everything(
102 None,
103 Some(max_capacity),
104 None,
105 num_segments,
106 build_hasher,
107 None,
108 EvictionPolicy::default(),
109 None,
110 ExpirationPolicy::default(),
111 HousekeeperConfig::default(),
112 false,
113 Clock::default(),
114 )
115 }
116
117 pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
122 CacheBuilder::default().segments(num_segments)
123 }
124}
125
126impl<K, V, S> SegmentedCache<K, V, S> {
127 pub fn name(&self) -> Option<&str> {
129 self.inner.segments[0].name()
130 }
131
132 pub fn policy(&self) -> Policy {
137 let mut policy = self.inner.segments[0].policy();
138 policy.set_max_capacity(self.inner.desired_capacity);
139 policy.set_num_segments(self.inner.segments.len());
140 policy
141 }
142
143 pub fn entry_count(&self) -> u64 {
177 self.inner
178 .segments
179 .iter()
180 .map(|seg| seg.entry_count())
181 .sum()
182 }
183
184 pub fn weighted_size(&self) -> u64 {
192 self.inner
193 .segments
194 .iter()
195 .map(|seg| seg.weighted_size())
196 .sum()
197 }
198}
199
200impl<K, V, S> SegmentedCache<K, V, S>
201where
202 K: Hash + Eq + Send + Sync + 'static,
203 V: Clone + Send + Sync + 'static,
204 S: BuildHasher + Clone + Send + Sync + 'static,
205{
206 #[allow(clippy::too_many_arguments)]
210 pub(crate) fn with_everything(
211 name: Option<String>,
212 max_capacity: Option<u64>,
213 initial_capacity: Option<usize>,
214 num_segments: usize,
215 build_hasher: S,
216 weigher: Option<Weigher<K, V>>,
217 eviction_policy: EvictionPolicy,
218 eviction_listener: Option<EvictionListener<K, V>>,
219 expiration_policy: ExpirationPolicy<K, V>,
220 housekeeper_config: HousekeeperConfig,
221 invalidator_enabled: bool,
222 clock: Clock,
223 ) -> Self {
224 Self {
225 inner: Arc::new(Inner::new(
226 name,
227 max_capacity,
228 initial_capacity,
229 num_segments,
230 build_hasher,
231 weigher,
232 eviction_policy,
233 eviction_listener,
234 expiration_policy,
235 housekeeper_config,
236 invalidator_enabled,
237 clock,
238 )),
239 }
240 }
241
242 pub fn contains_key<Q>(&self, key: &Q) -> bool
251 where
252 Q: Equivalent<K> + Hash + ?Sized,
253 {
254 let hash = self.inner.hash(key);
255 self.inner.select(hash).contains_key_with_hash(key, hash)
256 }
257
258 pub fn get<Q>(&self, key: &Q) -> Option<V>
269 where
270 Q: Equivalent<K> + Hash + Eq + ?Sized,
271 {
272 let hash = self.inner.hash(key);
273 self.inner
274 .select(hash)
275 .get_with_hash(key, hash, false)
276 .map(Entry::into_value)
277 }
278
279 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280 where
281 K: Hash + Eq,
282 {
283 let hash = self.inner.hash(&key);
284 let cache = self.inner.select(hash);
285 OwnedKeyEntrySelector::new(key, hash, cache)
286 }
287
288 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289 where
290 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
291 {
292 let hash = self.inner.hash(key);
293 let cache = self.inner.select(hash);
294 RefKeyEntrySelector::new(key, hash, cache)
295 }
296
297 #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
300 pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
301 self.get_with(key, init)
302 }
303
304 #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
307 pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
308 where
309 F: FnOnce() -> Result<V, E>,
310 E: Send + Sync + 'static,
311 {
312 self.try_get_with(key, init)
313 }
314
315 pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
327 let hash = self.inner.hash(&key);
328 let key = Arc::new(key);
329 let replace_if = None as Option<fn(&V) -> bool>;
330 self.inner
331 .select(hash)
332 .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
333 .into_value()
334 }
335
336 pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
340 where
341 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
342 {
343 let hash = self.inner.hash(key);
344 let replace_if = None as Option<fn(&V) -> bool>;
345 self.inner
346 .select(hash)
347 .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
348 .into_value()
349 }
350
351 pub fn get_with_if(
360 &self,
361 key: K,
362 init: impl FnOnce() -> V,
363 replace_if: impl FnMut(&V) -> bool,
364 ) -> V {
365 let hash = self.inner.hash(&key);
366 let key = Arc::new(key);
367 self.inner
368 .select(hash)
369 .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
370 .into_value()
371 }
372
373 pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
387 where
388 F: FnOnce() -> Option<V>,
389 {
390 let hash = self.inner.hash(&key);
391 let key = Arc::new(key);
392 self.inner
393 .select(hash)
394 .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
395 .map(Entry::into_value)
396 }
397
398 pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
403 where
404 F: FnOnce() -> Option<V>,
405 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
406 {
407 let hash = self.inner.hash(key);
408 self.inner
409 .select(hash)
410 .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
411 .map(Entry::into_value)
412 }
413
414 pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
431 where
432 F: FnOnce() -> Result<V, E>,
433 E: Send + Sync + 'static,
434 {
435 let hash = self.inner.hash(&key);
436 let key = Arc::new(key);
437 self.inner
438 .select(hash)
439 .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
440 .map(Entry::into_value)
441 }
442
443 pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
447 where
448 F: FnOnce() -> Result<V, E>,
449 E: Send + Sync + 'static,
450 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
451 {
452 let hash = self.inner.hash(key);
453 self.inner
454 .select(hash)
455 .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
456 .map(Entry::into_value)
457 }
458
459 pub fn insert(&self, key: K, value: V) {
463 let hash = self.inner.hash(&key);
464 let key = Arc::new(key);
465 self.inner.select(hash).insert_with_hash(key, hash, value);
466 }
467
468 pub fn invalidate<Q>(&self, key: &Q)
476 where
477 Q: Equivalent<K> + Hash + ?Sized,
478 {
479 let hash = self.inner.hash(key);
480 self.inner
481 .select(hash)
482 .invalidate_with_hash(key, hash, false);
483 }
484
485 pub fn remove<Q>(&self, key: &Q) -> Option<V>
493 where
494 Q: Equivalent<K> + Hash + ?Sized,
495 {
496 let hash = self.inner.hash(key);
497 self.inner
498 .select(hash)
499 .invalidate_with_hash(key, hash, true)
500 }
501
502 pub fn invalidate_all(&self) {
517 for segment in self.inner.segments.iter() {
518 segment.invalidate_all();
519 }
520 }
521
522 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
556 where
557 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
558 {
559 let pred = Arc::new(predicate);
560 for segment in self.inner.segments.iter() {
561 segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
562 }
563 Ok(())
564 }
565
566 pub fn iter(&self) -> Iter<'_, K, V> {
614 let num_cht_segments = self.inner.segments[0].num_cht_segments();
615 let segments = self
616 .inner
617 .segments
618 .iter()
619 .map(|c| c as &dyn ScanningGet<_, _>)
620 .collect::<Vec<_>>()
621 .into_boxed_slice();
622 Iter::with_multiple_cache_segments(segments, num_cht_segments)
623 }
624
625 pub fn run_pending_tasks(&self) {
627 for segment in self.inner.segments.iter() {
628 segment.run_pending_tasks();
629 }
630 }
631}
632
633impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
634where
635 K: Hash + Eq + Send + Sync + 'static,
636 V: Clone + Send + Sync + 'static,
637 S: BuildHasher + Clone + Send + Sync + 'static,
638{
639 type Item = (Arc<K>, V);
640
641 type IntoIter = Iter<'a, K, V>;
642
643 fn into_iter(self) -> Self::IntoIter {
644 self.iter()
645 }
646}
647
648#[cfg(test)]
650impl<K, V, S> SegmentedCache<K, V, S> {
651 fn is_waiter_map_empty(&self) -> bool {
652 self.inner.segments.iter().all(Cache::is_waiter_map_empty)
653 }
654}
655
656#[cfg(test)]
657impl<K, V, S> SegmentedCache<K, V, S>
658where
659 K: Hash + Eq + Send + Sync + 'static,
660 V: Clone + Send + Sync + 'static,
661 S: BuildHasher + Clone + Send + Sync + 'static,
662{
663 fn invalidation_predicate_count(&self) -> usize {
664 self.inner
665 .segments
666 .iter()
667 .map(|seg| seg.invalidation_predicate_count())
668 .sum()
669 }
670
671 fn reconfigure_for_testing(&mut self) {
672 let inner = Arc::get_mut(&mut self.inner)
673 .expect("There are other strong reference to self.inner Arc");
674
675 for segment in inner.segments.iter_mut() {
676 segment.reconfigure_for_testing();
677 }
678 }
679
680 fn key_locks_map_is_empty(&self) -> bool {
681 self.inner
682 .segments
683 .iter()
684 .all(|seg| seg.key_locks_map_is_empty())
685 }
686}
687
688struct Inner<K, V, S> {
689 desired_capacity: Option<u64>,
690 segments: Box<[Cache<K, V, S>]>,
691 build_hasher: S,
692 segment_shift: u32,
693}
694
695impl<K, V, S> Inner<K, V, S>
696where
697 K: Hash + Eq + Send + Sync + 'static,
698 V: Clone + Send + Sync + 'static,
699 S: BuildHasher + Clone + Send + Sync + 'static,
700{
701 #[allow(clippy::too_many_arguments)]
705 fn new(
706 name: Option<String>,
707 max_capacity: Option<u64>,
708 initial_capacity: Option<usize>,
709 num_segments: usize,
710 build_hasher: S,
711 weigher: Option<Weigher<K, V>>,
712 eviction_policy: EvictionPolicy,
713 eviction_listener: Option<EvictionListener<K, V>>,
714 expiration_policy: ExpirationPolicy<K, V>,
715 housekeeper_config: HousekeeperConfig,
716 invalidator_enabled: bool,
717 clock: Clock,
718 ) -> Self {
719 assert!(num_segments > 0);
720
721 let actual_num_segments = num_segments.next_power_of_two();
722 let segment_shift = 64 - actual_num_segments.trailing_zeros();
723 let seg_max_capacity =
724 max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
725 let seg_init_capacity =
726 initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
727 let segments = (0..actual_num_segments)
730 .map(|_| {
731 Cache::with_everything(
732 name.clone(),
733 seg_max_capacity,
734 seg_init_capacity,
735 build_hasher.clone(),
736 weigher.clone(),
737 eviction_policy.clone(),
738 eviction_listener.clone(),
739 expiration_policy.clone(),
740 housekeeper_config.clone(),
741 invalidator_enabled,
742 clock.clone(),
743 )
744 })
745 .collect::<Vec<_>>();
746
747 Self {
748 desired_capacity: max_capacity,
749 segments: segments.into_boxed_slice(),
750 build_hasher,
751 segment_shift,
752 }
753 }
754
755 #[inline]
756 fn hash<Q>(&self, key: &Q) -> u64
757 where
758 Q: Equivalent<K> + Hash + ?Sized,
759 {
760 let mut hasher = self.build_hasher.build_hasher();
761 key.hash(&mut hasher);
762 hasher.finish()
763 }
764
765 #[inline]
766 fn select(&self, hash: u64) -> &Cache<K, V, S> {
767 let index = self.segment_index_from_hash(hash);
768 &self.segments[index]
769 }
770
771 #[inline]
772 fn segment_index_from_hash(&self, hash: u64) -> usize {
773 if self.segment_shift == 64 {
774 0
775 } else {
776 (hash >> self.segment_shift) as usize
777 }
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::SegmentedCache;
784 use crate::notification::RemovalCause;
785 use parking_lot::Mutex;
786 use std::{error::Error, fmt::Display, sync::Arc, time::Duration};
787
788 #[test]
789 fn max_capacity_zero() {
790 let mut cache = SegmentedCache::new(0, 1);
791 cache.reconfigure_for_testing();
792
793 let cache = cache;
795
796 cache.insert(0, ());
797
798 assert!(!cache.contains_key(&0));
799 assert!(cache.get(&0).is_none());
800 cache.run_pending_tasks();
801 assert!(!cache.contains_key(&0));
802 assert!(cache.get(&0).is_none());
803 assert_eq!(cache.entry_count(), 0)
804 }
805
806 #[test]
807 fn basic_single_thread() {
808 let actual = Arc::new(Mutex::new(Vec::new()));
810 let mut expected = Vec::new();
811
812 let a1 = Arc::clone(&actual);
814 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
815
816 let mut cache = SegmentedCache::builder(1)
818 .max_capacity(3)
819 .eviction_listener(listener)
820 .build();
821 cache.reconfigure_for_testing();
822
823 let cache = cache;
825
826 cache.insert("a", "alice");
827 cache.insert("b", "bob");
828 assert_eq!(cache.get(&"a"), Some("alice"));
829 assert!(cache.contains_key(&"a"));
830 assert!(cache.contains_key(&"b"));
831 assert_eq!(cache.get(&"b"), Some("bob"));
832 cache.run_pending_tasks();
833 cache.insert("c", "cindy");
836 assert_eq!(cache.get(&"c"), Some("cindy"));
837 assert!(cache.contains_key(&"c"));
838 cache.run_pending_tasks();
840
841 assert!(cache.contains_key(&"a"));
842 assert_eq!(cache.get(&"a"), Some("alice"));
843 assert_eq!(cache.get(&"b"), Some("bob"));
844 assert!(cache.contains_key(&"b"));
845 cache.run_pending_tasks();
846 cache.insert("d", "david"); expected.push((Arc::new("d"), "david", RemovalCause::Size));
851 cache.run_pending_tasks();
852 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
854
855 cache.insert("d", "david");
856 expected.push((Arc::new("d"), "david", RemovalCause::Size));
857 cache.run_pending_tasks();
858 assert!(!cache.contains_key(&"d"));
859 assert_eq!(cache.get(&"d"), None); cache.insert("d", "dennis");
864 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
865 cache.run_pending_tasks();
866 assert_eq!(cache.get(&"a"), Some("alice"));
867 assert_eq!(cache.get(&"b"), Some("bob"));
868 assert_eq!(cache.get(&"c"), None);
869 assert_eq!(cache.get(&"d"), Some("dennis"));
870 assert!(cache.contains_key(&"a"));
871 assert!(cache.contains_key(&"b"));
872 assert!(!cache.contains_key(&"c"));
873 assert!(cache.contains_key(&"d"));
874
875 cache.invalidate(&"b");
876 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
877 cache.run_pending_tasks();
878 assert_eq!(cache.get(&"b"), None);
879 assert!(!cache.contains_key(&"b"));
880
881 assert!(cache.remove(&"b").is_none());
882 assert_eq!(cache.remove(&"d"), Some("dennis"));
883 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
884 cache.run_pending_tasks();
885 assert_eq!(cache.get(&"d"), None);
886 assert!(!cache.contains_key(&"d"));
887
888 verify_notification_vec(&cache, actual, &expected);
889 assert!(cache.key_locks_map_is_empty());
890 }
891
892 #[test]
893 fn non_power_of_two_segments() {
894 let mut cache = SegmentedCache::new(100, 5);
895 cache.reconfigure_for_testing();
896
897 let cache = cache;
899
900 assert_eq!(cache.iter().count(), 0);
901
902 cache.insert("a", "alice");
903 cache.insert("b", "bob");
904 cache.insert("c", "cindy");
905
906 assert_eq!(cache.iter().count(), 3);
907 cache.run_pending_tasks();
908 assert_eq!(cache.iter().count(), 3);
909 }
910
911 #[test]
912 fn size_aware_eviction() {
913 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
914
915 let alice = ("alice", 10);
916 let bob = ("bob", 15);
917 let bill = ("bill", 20);
918 let cindy = ("cindy", 5);
919 let david = ("david", 15);
920 let dennis = ("dennis", 15);
921
922 let actual = Arc::new(Mutex::new(Vec::new()));
924 let mut expected = Vec::new();
925
926 let a1 = Arc::clone(&actual);
928 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
929
930 let mut cache = SegmentedCache::builder(1)
932 .max_capacity(31)
933 .weigher(weigher)
934 .eviction_listener(listener)
935 .build();
936 cache.reconfigure_for_testing();
937
938 let cache = cache;
940
941 cache.insert("a", alice);
942 cache.insert("b", bob);
943 assert_eq!(cache.get(&"a"), Some(alice));
944 assert!(cache.contains_key(&"a"));
945 assert!(cache.contains_key(&"b"));
946 assert_eq!(cache.get(&"b"), Some(bob));
947 cache.run_pending_tasks();
948 cache.insert("c", cindy);
951 assert_eq!(cache.get(&"c"), Some(cindy));
952 assert!(cache.contains_key(&"c"));
953 cache.run_pending_tasks();
955
956 assert!(cache.contains_key(&"a"));
957 assert_eq!(cache.get(&"a"), Some(alice));
958 assert_eq!(cache.get(&"b"), Some(bob));
959 assert!(cache.contains_key(&"b"));
960 cache.run_pending_tasks();
961 cache.insert("d", david); expected.push((Arc::new("d"), david, RemovalCause::Size));
968 cache.run_pending_tasks();
969 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
971
972 cache.insert("d", david);
973 expected.push((Arc::new("d"), david, RemovalCause::Size));
974 cache.run_pending_tasks();
975 assert!(!cache.contains_key(&"d"));
976 assert_eq!(cache.get(&"d"), None); cache.insert("d", david);
979 expected.push((Arc::new("d"), david, RemovalCause::Size));
980 cache.run_pending_tasks();
981 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
983
984 cache.insert("d", david);
985 expected.push((Arc::new("d"), david, RemovalCause::Size));
986 cache.run_pending_tasks();
987 assert!(!cache.contains_key(&"d"));
988 assert_eq!(cache.get(&"d"), None); cache.insert("d", dennis);
992 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
993 expected.push((Arc::new("a"), alice, RemovalCause::Size));
994 cache.run_pending_tasks();
995 assert_eq!(cache.get(&"a"), None);
996 assert_eq!(cache.get(&"b"), Some(bob));
997 assert_eq!(cache.get(&"c"), None);
998 assert_eq!(cache.get(&"d"), Some(dennis));
999 assert!(!cache.contains_key(&"a"));
1000 assert!(cache.contains_key(&"b"));
1001 assert!(!cache.contains_key(&"c"));
1002 assert!(cache.contains_key(&"d"));
1003
1004 cache.insert("b", bill);
1006 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1007 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1008 cache.run_pending_tasks();
1009 assert_eq!(cache.get(&"b"), Some(bill));
1010 assert_eq!(cache.get(&"d"), None);
1011 assert!(cache.contains_key(&"b"));
1012 assert!(!cache.contains_key(&"d"));
1013
1014 cache.insert("a", alice);
1016 cache.insert("b", bob);
1017 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1018 cache.run_pending_tasks();
1019 assert_eq!(cache.get(&"a"), Some(alice));
1020 assert_eq!(cache.get(&"b"), Some(bob));
1021 assert_eq!(cache.get(&"d"), None);
1022 assert!(cache.contains_key(&"a"));
1023 assert!(cache.contains_key(&"b"));
1024 assert!(!cache.contains_key(&"d"));
1025
1026 assert_eq!(cache.entry_count(), 2);
1028 assert_eq!(cache.weighted_size(), 25);
1029
1030 verify_notification_vec(&cache, actual, &expected);
1031 assert!(cache.key_locks_map_is_empty());
1032 }
1033
1034 #[test]
1035 fn basic_multi_threads() {
1036 let num_threads = 4;
1037
1038 let mut cache = SegmentedCache::new(100, num_threads);
1039 cache.reconfigure_for_testing();
1040
1041 let cache = cache;
1043
1044 #[allow(clippy::needless_collect)]
1046 let handles = (0..num_threads)
1047 .map(|id| {
1048 let cache = cache.clone();
1049 std::thread::spawn(move || {
1050 cache.insert(10, format!("{id}-100"));
1051 cache.get(&10);
1052 cache.run_pending_tasks();
1053 cache.insert(20, format!("{id}-200"));
1054 cache.invalidate(&10);
1055 })
1056 })
1057 .collect::<Vec<_>>();
1058
1059 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1060
1061 cache.run_pending_tasks();
1062
1063 assert!(cache.get(&10).is_none());
1064 assert!(cache.get(&20).is_some());
1065 assert!(!cache.contains_key(&10));
1066 assert!(cache.contains_key(&20));
1067 }
1068
1069 #[test]
1070 fn invalidate_all() {
1071 use std::collections::HashMap;
1072
1073 let actual = Arc::new(Mutex::new(HashMap::new()));
1076 let mut expected = HashMap::new();
1077
1078 let a1 = Arc::clone(&actual);
1080 let listener = move |k, v, cause| {
1081 a1.lock().insert(k, (v, cause));
1082 };
1083
1084 let mut cache = SegmentedCache::builder(4)
1086 .max_capacity(100)
1087 .eviction_listener(listener)
1088 .build();
1089 cache.reconfigure_for_testing();
1090
1091 let cache = cache;
1093
1094 cache.insert("a", "alice");
1095 cache.insert("b", "bob");
1096 cache.insert("c", "cindy");
1097 assert_eq!(cache.get(&"a"), Some("alice"));
1098 assert_eq!(cache.get(&"b"), Some("bob"));
1099 assert_eq!(cache.get(&"c"), Some("cindy"));
1100 assert!(cache.contains_key(&"a"));
1101 assert!(cache.contains_key(&"b"));
1102 assert!(cache.contains_key(&"c"));
1103
1104 cache.invalidate_all();
1109 expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1110 expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1111 expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1112 cache.run_pending_tasks();
1113
1114 cache.insert("d", "david");
1115 cache.run_pending_tasks();
1116
1117 assert!(cache.get(&"a").is_none());
1118 assert!(cache.get(&"b").is_none());
1119 assert!(cache.get(&"c").is_none());
1120 assert_eq!(cache.get(&"d"), Some("david"));
1121 assert!(!cache.contains_key(&"a"));
1122 assert!(!cache.contains_key(&"b"));
1123 assert!(!cache.contains_key(&"c"));
1124 assert!(cache.contains_key(&"d"));
1125
1126 verify_notification_map(&cache, actual, &expected);
1127 }
1128
1129 #[test]
1130 fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1131 use std::collections::{HashMap, HashSet};
1132
1133 const SEGMENTS: usize = 4;
1134
1135 let actual = Arc::new(Mutex::new(HashMap::new()));
1138 let mut expected = HashMap::new();
1139
1140 let a1 = Arc::clone(&actual);
1142 let listener = move |k, v, cause| {
1143 a1.lock().insert(k, (v, cause));
1144 };
1145
1146 let (clock, mock) = crate::common::time::Clock::mock();
1147
1148 let mut cache = SegmentedCache::builder(SEGMENTS)
1150 .max_capacity(100)
1151 .support_invalidation_closures()
1152 .eviction_listener(listener)
1153 .clock(clock)
1154 .build();
1155 cache.reconfigure_for_testing();
1156
1157 let cache = cache;
1159
1160 cache.insert(0, "alice");
1161 cache.insert(1, "bob");
1162 cache.insert(2, "alex");
1163 cache.run_pending_tasks();
1164 mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
1166
1167 assert_eq!(cache.get(&0), Some("alice"));
1168 assert_eq!(cache.get(&1), Some("bob"));
1169 assert_eq!(cache.get(&2), Some("alex"));
1170 assert!(cache.contains_key(&0));
1171 assert!(cache.contains_key(&1));
1172 assert!(cache.contains_key(&2));
1173
1174 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1175 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1176 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1177 expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1178 expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1179
1180 mock.increment(Duration::from_secs(5)); cache.insert(3, "alice");
1183
1184 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1187 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1189
1190 assert!(cache.get(&0).is_none());
1191 assert!(cache.get(&2).is_none());
1192 assert_eq!(cache.get(&1), Some("bob"));
1193 assert_eq!(cache.get(&3), Some("alice"));
1195
1196 assert!(!cache.contains_key(&0));
1197 assert!(cache.contains_key(&1));
1198 assert!(!cache.contains_key(&2));
1199 assert!(cache.contains_key(&3));
1200
1201 assert_eq!(cache.entry_count(), 2);
1202 assert_eq!(cache.invalidation_predicate_count(), 0);
1203
1204 mock.increment(Duration::from_secs(5)); cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1207 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1208 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1209 expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1210 expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1211
1212 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1215 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1217
1218 assert!(cache.get(&1).is_none());
1219 assert!(cache.get(&3).is_none());
1220
1221 assert!(!cache.contains_key(&1));
1222 assert!(!cache.contains_key(&3));
1223
1224 assert_eq!(cache.entry_count(), 0);
1225 assert_eq!(cache.invalidation_predicate_count(), 0);
1226
1227 verify_notification_map(&cache, actual, &expected);
1228
1229 Ok(())
1230 }
1231
1232 #[test]
1233 fn test_iter() {
1234 const NUM_KEYS: usize = 50;
1235
1236 fn make_value(key: usize) -> String {
1237 format!("val: {key}")
1238 }
1239
1240 let cache = SegmentedCache::builder(4)
1242 .max_capacity(100)
1243 .time_to_idle(Duration::from_secs(10))
1244 .build();
1245
1246 for key in 0..NUM_KEYS {
1247 cache.insert(key, make_value(key));
1248 }
1249
1250 let mut key_set = std::collections::HashSet::new();
1251
1252 for (key, value) in &cache {
1253 assert_eq!(value, make_value(*key));
1254
1255 key_set.insert(*key);
1256 }
1257
1258 assert_eq!(key_set.len(), NUM_KEYS);
1260 }
1261
1262 #[test]
1268 fn test_iter_multi_threads() {
1269 use std::collections::HashSet;
1270
1271 const NUM_KEYS: usize = 1024;
1272 const NUM_THREADS: usize = 16;
1273
1274 fn make_value(key: usize) -> String {
1275 format!("val: {key}")
1276 }
1277
1278 let cache = SegmentedCache::builder(4)
1279 .max_capacity(2048)
1280 .time_to_idle(Duration::from_secs(10))
1281 .build();
1282
1283 for key in 0..NUM_KEYS {
1285 cache.insert(key, make_value(key));
1286 }
1287
1288 let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1289 let write_lock = rw_lock.write().unwrap();
1290
1291 #[allow(clippy::needless_collect)]
1293 let handles = (0..NUM_THREADS)
1294 .map(|n| {
1295 let cache = cache.clone();
1296 let rw_lock = Arc::clone(&rw_lock);
1297
1298 if n % 2 == 0 {
1299 std::thread::spawn(move || {
1301 let read_lock = rw_lock.read().unwrap();
1302 for key in 0..NUM_KEYS {
1303 cache.insert(key, make_value(key));
1305 }
1306 std::mem::drop(read_lock);
1307 })
1308 } else {
1309 std::thread::spawn(move || {
1311 let read_lock = rw_lock.read().unwrap();
1312 let mut key_set = HashSet::new();
1313 for (key, value) in &cache {
1314 assert_eq!(value, make_value(*key));
1315 key_set.insert(*key);
1316 }
1317 assert_eq!(key_set.len(), NUM_KEYS);
1319 std::mem::drop(read_lock);
1320 })
1321 }
1322 })
1323 .collect::<Vec<_>>();
1324
1325 std::mem::drop(write_lock);
1327
1328 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1329
1330 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1332 assert_eq!(key_set.len(), NUM_KEYS);
1333 }
1334
1335 #[test]
1336 fn get_with() {
1337 use std::thread::{sleep, spawn};
1338
1339 let cache = SegmentedCache::new(100, 4);
1340 const KEY: u32 = 0;
1341
1342 let thread1 = {
1348 let cache1 = cache.clone();
1349 spawn(move || {
1350 let v = cache1.get_with(KEY, || {
1352 sleep(Duration::from_millis(300));
1354 "thread1"
1355 });
1356 assert_eq!(v, "thread1");
1357 })
1358 };
1359
1360 let thread2 = {
1364 let cache2 = cache.clone();
1365 spawn(move || {
1366 sleep(Duration::from_millis(100));
1368 let v = cache2.get_with(KEY, || unreachable!());
1369 assert_eq!(v, "thread1");
1370 })
1371 };
1372
1373 let thread3 = {
1379 let cache3 = cache.clone();
1380 spawn(move || {
1381 sleep(Duration::from_millis(400));
1383 let v = cache3.get_with(KEY, || unreachable!());
1384 assert_eq!(v, "thread1");
1385 })
1386 };
1387
1388 let thread4 = {
1391 let cache4 = cache.clone();
1392 spawn(move || {
1393 sleep(Duration::from_millis(200));
1395 let maybe_v = cache4.get(&KEY);
1396 assert!(maybe_v.is_none());
1397 })
1398 };
1399
1400 let thread5 = {
1403 let cache5 = cache.clone();
1404 spawn(move || {
1405 sleep(Duration::from_millis(400));
1407 let maybe_v = cache5.get(&KEY);
1408 assert_eq!(maybe_v, Some("thread1"));
1409 })
1410 };
1411
1412 for t in [thread1, thread2, thread3, thread4, thread5] {
1413 t.join().expect("Failed to join");
1414 }
1415
1416 assert!(cache.is_waiter_map_empty());
1417 }
1418
1419 #[test]
1420 fn get_with_if() {
1421 use std::thread::{sleep, spawn};
1422
1423 let cache = SegmentedCache::new(100, 4);
1424 const KEY: u32 = 0;
1425
1426 let thread1 = {
1432 let cache1 = cache.clone();
1433 spawn(move || {
1434 let v = cache1.get_with_if(
1436 KEY,
1437 || {
1438 sleep(Duration::from_millis(300));
1440 "thread1"
1441 },
1442 |_v| unreachable!(),
1443 );
1444 assert_eq!(v, "thread1");
1445 })
1446 };
1447
1448 let thread2 = {
1452 let cache2 = cache.clone();
1453 spawn(move || {
1454 sleep(Duration::from_millis(100));
1456 let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1457 assert_eq!(v, "thread1");
1458 })
1459 };
1460
1461 let thread3 = {
1468 let cache3 = cache.clone();
1469 spawn(move || {
1470 sleep(Duration::from_millis(350));
1472 let v = cache3.get_with_if(
1473 KEY,
1474 || unreachable!(),
1475 |v| {
1476 assert_eq!(v, &"thread1");
1477 false
1478 },
1479 );
1480 assert_eq!(v, "thread1");
1481 })
1482 };
1483
1484 let thread4 = {
1489 let cache4 = cache.clone();
1490 spawn(move || {
1491 sleep(Duration::from_millis(400));
1493 let v = cache4.get_with_if(
1494 KEY,
1495 || "thread4",
1496 |v| {
1497 assert_eq!(v, &"thread1");
1498 true
1499 },
1500 );
1501 assert_eq!(v, "thread4");
1502 })
1503 };
1504
1505 let thread5 = {
1508 let cache5 = cache.clone();
1509 spawn(move || {
1510 sleep(Duration::from_millis(200));
1512 let maybe_v = cache5.get(&KEY);
1513 assert!(maybe_v.is_none());
1514 })
1515 };
1516
1517 let thread6 = {
1520 let cache6 = cache.clone();
1521 spawn(move || {
1522 sleep(Duration::from_millis(350));
1524 let maybe_v = cache6.get(&KEY);
1525 assert_eq!(maybe_v, Some("thread1"));
1526 })
1527 };
1528
1529 let thread7 = {
1532 let cache7 = cache.clone();
1533 spawn(move || {
1534 sleep(Duration::from_millis(450));
1536 let maybe_v = cache7.get(&KEY);
1537 assert_eq!(maybe_v, Some("thread4"));
1538 })
1539 };
1540
1541 for t in [
1542 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1543 ] {
1544 t.join().expect("Failed to join");
1545 }
1546
1547 assert!(cache.is_waiter_map_empty());
1548 }
1549
1550 #[test]
1551 fn try_get_with() {
1552 use std::{
1553 sync::Arc,
1554 thread::{sleep, spawn},
1555 };
1556
1557 #[derive(Debug)]
1558 pub struct MyError(String);
1559
1560 impl Display for MyError {
1561 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1562 write!(f, "{}", self.0)
1563 }
1564 }
1565
1566 impl Error for MyError {}
1567
1568 type MyResult<T> = Result<T, Arc<MyError>>;
1569
1570 let cache = SegmentedCache::new(100, 4);
1571 const KEY: u32 = 0;
1572
1573 let thread1 = {
1579 let cache1 = cache.clone();
1580 spawn(move || {
1581 let v = cache1.try_get_with(KEY, || {
1583 sleep(Duration::from_millis(300));
1585 Err(MyError("thread1 error".into()))
1586 });
1587 assert!(v.is_err());
1588 })
1589 };
1590
1591 let thread2 = {
1596 let cache2 = cache.clone();
1597 spawn(move || {
1598 sleep(Duration::from_millis(100));
1600 let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1601 assert!(v.is_err());
1602 })
1603 };
1604
1605 let thread3 = {
1611 let cache3 = cache.clone();
1612 spawn(move || {
1613 sleep(Duration::from_millis(400));
1615 let v: MyResult<_> = cache3.try_get_with(KEY, || {
1616 sleep(Duration::from_millis(300));
1618 Ok("thread3")
1619 });
1620 assert_eq!(v.unwrap(), "thread3");
1621 })
1622 };
1623
1624 let thread4 = {
1628 let cache4 = cache.clone();
1629 spawn(move || {
1630 sleep(Duration::from_millis(500));
1632 let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1633 assert_eq!(v.unwrap(), "thread3");
1634 })
1635 };
1636
1637 let thread5 = {
1643 let cache5 = cache.clone();
1644 spawn(move || {
1645 sleep(Duration::from_millis(800));
1647 let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1648 assert_eq!(v.unwrap(), "thread3");
1649 })
1650 };
1651
1652 let thread6 = {
1655 let cache6 = cache.clone();
1656 spawn(move || {
1657 sleep(Duration::from_millis(200));
1659 let maybe_v = cache6.get(&KEY);
1660 assert!(maybe_v.is_none());
1661 })
1662 };
1663
1664 let thread7 = {
1667 let cache7 = cache.clone();
1668 spawn(move || {
1669 sleep(Duration::from_millis(400));
1671 let maybe_v = cache7.get(&KEY);
1672 assert!(maybe_v.is_none());
1673 })
1674 };
1675
1676 let thread8 = {
1679 let cache8 = cache.clone();
1680 spawn(move || {
1681 sleep(Duration::from_millis(800));
1683 let maybe_v = cache8.get(&KEY);
1684 assert_eq!(maybe_v, Some("thread3"));
1685 })
1686 };
1687
1688 for t in [
1689 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1690 ] {
1691 t.join().expect("Failed to join");
1692 }
1693
1694 assert!(cache.is_waiter_map_empty());
1695 }
1696
1697 #[test]
1698 fn optionally_get_with() {
1699 use std::thread::{sleep, spawn};
1700
1701 let cache = SegmentedCache::new(100, 4);
1702 const KEY: u32 = 0;
1703
1704 let thread1 = {
1710 let cache1 = cache.clone();
1711 spawn(move || {
1712 let v = cache1.optionally_get_with(KEY, || {
1714 sleep(Duration::from_millis(300));
1716 None
1717 });
1718 assert!(v.is_none());
1719 })
1720 };
1721
1722 let thread2 = {
1727 let cache2 = cache.clone();
1728 spawn(move || {
1729 sleep(Duration::from_millis(100));
1731 let v = cache2.optionally_get_with(KEY, || unreachable!());
1732 assert!(v.is_none());
1733 })
1734 };
1735
1736 let thread3 = {
1742 let cache3 = cache.clone();
1743 spawn(move || {
1744 sleep(Duration::from_millis(400));
1746 let v = cache3.optionally_get_with(KEY, || {
1747 sleep(Duration::from_millis(300));
1749 Some("thread3")
1750 });
1751 assert_eq!(v.unwrap(), "thread3");
1752 })
1753 };
1754
1755 let thread4 = {
1759 let cache4 = cache.clone();
1760 spawn(move || {
1761 sleep(Duration::from_millis(500));
1763 let v = cache4.optionally_get_with(KEY, || unreachable!());
1764 assert_eq!(v.unwrap(), "thread3");
1765 })
1766 };
1767
1768 let thread5 = {
1774 let cache5 = cache.clone();
1775 spawn(move || {
1776 sleep(Duration::from_millis(800));
1778 let v = cache5.optionally_get_with(KEY, || unreachable!());
1779 assert_eq!(v.unwrap(), "thread3");
1780 })
1781 };
1782
1783 let thread6 = {
1786 let cache6 = cache.clone();
1787 spawn(move || {
1788 sleep(Duration::from_millis(200));
1790 let maybe_v = cache6.get(&KEY);
1791 assert!(maybe_v.is_none());
1792 })
1793 };
1794
1795 let thread7 = {
1798 let cache7 = cache.clone();
1799 spawn(move || {
1800 sleep(Duration::from_millis(400));
1802 let maybe_v = cache7.get(&KEY);
1803 assert!(maybe_v.is_none());
1804 })
1805 };
1806
1807 let thread8 = {
1810 let cache8 = cache.clone();
1811 spawn(move || {
1812 sleep(Duration::from_millis(800));
1814 let maybe_v = cache8.get(&KEY);
1815 assert_eq!(maybe_v, Some("thread3"));
1816 })
1817 };
1818
1819 for t in [
1820 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1821 ] {
1822 t.join().expect("Failed to join");
1823 }
1824
1825 assert!(cache.is_waiter_map_empty());
1826 }
1827
1828 #[test]
1832 fn borrowed_forms_of_key() {
1833 let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1834
1835 let key = vec![1_u8];
1836 cache.insert(key.clone(), ());
1837
1838 let key_v: &Vec<u8> = &key;
1840 assert!(cache.contains_key(key_v));
1841 assert_eq!(cache.get(key_v), Some(()));
1842 cache.invalidate(key_v);
1843
1844 cache.insert(key, ());
1845
1846 let key_s: &[u8] = &[1_u8];
1848 assert!(cache.contains_key(key_s));
1849 assert_eq!(cache.get(key_s), Some(()));
1850 cache.invalidate(key_s);
1851 }
1852
1853 #[test]
1856 #[ignore]
1857 fn drop_value_immediately_after_eviction() {
1858 use crate::common::test_utils::{Counters, Value};
1859
1860 const NUM_SEGMENTS: usize = 1;
1861 const MAX_CAPACITY: u32 = 500;
1862 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1863
1864 let counters = Arc::new(Counters::default());
1865 let counters1 = Arc::clone(&counters);
1866
1867 let listener = move |_k, _v, cause| match cause {
1868 RemovalCause::Size => counters1.incl_evicted(),
1869 RemovalCause::Explicit => counters1.incl_invalidated(),
1870 _ => (),
1871 };
1872
1873 let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1874 .max_capacity(MAX_CAPACITY as u64)
1875 .eviction_listener(listener)
1876 .build();
1877 cache.reconfigure_for_testing();
1878
1879 let cache = cache;
1881
1882 for key in 0..KEYS {
1883 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1884 cache.insert(key, value);
1885 counters.incl_inserted();
1886 cache.run_pending_tasks();
1887 }
1888
1889 let eviction_count = KEYS - MAX_CAPACITY;
1890
1891 cache.run_pending_tasks();
1892 assert_eq!(counters.inserted(), KEYS, "inserted");
1893 assert_eq!(counters.value_created(), KEYS, "value_created");
1894 assert_eq!(counters.evicted(), eviction_count, "evicted");
1895 assert_eq!(counters.invalidated(), 0, "invalidated");
1896 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1897
1898 for key in 0..KEYS {
1899 cache.invalidate(&key);
1900 cache.run_pending_tasks();
1901 }
1902
1903 cache.run_pending_tasks();
1904 assert_eq!(counters.inserted(), KEYS, "inserted");
1905 assert_eq!(counters.value_created(), KEYS, "value_created");
1906 assert_eq!(counters.evicted(), eviction_count, "evicted");
1907 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1908 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1909
1910 std::mem::drop(cache);
1911 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1912 }
1913
1914 #[test]
1915 fn test_debug_format() {
1916 let cache = SegmentedCache::new(10, 4);
1917 cache.insert('a', "alice");
1918 cache.insert('b', "bob");
1919 cache.insert('c', "cindy");
1920
1921 let debug_str = format!("{cache:?}");
1922 assert!(debug_str.starts_with('{'));
1923 assert!(debug_str.contains(r#"'a': "alice""#));
1924 assert!(debug_str.contains(r#"'b': "bob""#));
1925 assert!(debug_str.contains(r#"'c': "cindy""#));
1926 assert!(debug_str.ends_with('}'));
1927 }
1928
1929 type NotificationPair<V> = (V, RemovalCause);
1930 type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1931
1932 fn verify_notification_vec<K, V, S>(
1933 cache: &SegmentedCache<K, V, S>,
1934 actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1935 expected: &[NotificationTriple<K, V>],
1936 ) where
1937 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1938 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1939 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1940 {
1941 const MAX_RETRIES: usize = 5;
1943 let mut retries = 0;
1944 loop {
1945 std::thread::sleep(Duration::from_millis(500));
1947
1948 let actual = &*actual.lock();
1949 if actual.len() != expected.len() {
1950 if retries <= MAX_RETRIES {
1951 retries += 1;
1952 cache.run_pending_tasks();
1953 continue;
1954 } else {
1955 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1956 }
1957 }
1958
1959 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1960 assert_eq!(actual, expected, "expected[{i}]");
1961 }
1962
1963 break;
1964 }
1965 }
1966
1967 fn verify_notification_map<K, V, S>(
1968 cache: &SegmentedCache<K, V, S>,
1969 actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1970 expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1971 ) where
1972 K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1973 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1974 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1975 {
1976 const MAX_RETRIES: usize = 5;
1978 let mut retries = 0;
1979 loop {
1980 std::thread::sleep(Duration::from_millis(500));
1982
1983 let actual = &*actual.lock();
1984 if actual.len() != expected.len() {
1985 if retries <= MAX_RETRIES {
1986 retries += 1;
1987 cache.run_pending_tasks();
1988 continue;
1989 } else {
1990 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1991 }
1992 }
1993
1994 for actual_key in actual.keys() {
1995 assert_eq!(
1996 actual.get(actual_key),
1997 expected.get(actual_key),
1998 "expected[{actual_key}]",
1999 );
2000 }
2001
2002 break;
2003 }
2004 }
2005}