1use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
2use crate::common::concurrent::Weigher;
3use crate::common::time::Clock;
4use crate::{
5 common::HousekeeperConfig,
6 notification::EvictionListener,
7 policy::{EvictionPolicy, ExpirationPolicy},
8 sync_base::iter::{Iter, ScanningGet},
9 Entry, Policy, PredicateError,
10};
11
12use std::{
13 borrow::Borrow,
14 collections::hash_map::RandomState,
15 fmt,
16 hash::{BuildHasher, Hash, Hasher},
17 sync::Arc,
18};
19
20pub struct SegmentedCache<K, V, S = RandomState> {
31 inner: Arc<Inner<K, V, S>>,
32}
33
34#[allow(clippy::non_send_fields_in_send_ty)]
36unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
37where
38 K: Send + Sync,
39 V: Send + Sync,
40 S: Send,
41{
42}
43
44unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
45where
46 K: Send + Sync,
47 V: Send + Sync,
48 S: Sync,
49{
50}
51
52impl<K, V, S> Clone for SegmentedCache<K, V, S> {
53 fn clone(&self) -> Self {
58 Self {
59 inner: Arc::clone(&self.inner),
60 }
61 }
62}
63
64impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
65where
66 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
67 V: fmt::Debug + Clone + Send + Sync + 'static,
68 S: BuildHasher + Clone + Send + Sync + 'static,
70{
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 let mut d_map = f.debug_map();
73
74 for (k, v) in self {
75 d_map.entry(&k, &v);
76 }
77
78 d_map.finish()
79 }
80}
81
82impl<K, V> SegmentedCache<K, V, RandomState>
83where
84 K: Hash + Eq + Send + Sync + 'static,
85 V: Clone + Send + Sync + 'static,
86{
87 pub fn new(max_capacity: u64, num_segments: usize) -> Self {
99 let build_hasher = RandomState::default();
100 Self::with_everything(
101 None,
102 Some(max_capacity),
103 None,
104 num_segments,
105 build_hasher,
106 None,
107 EvictionPolicy::default(),
108 None,
109 ExpirationPolicy::default(),
110 HousekeeperConfig::default(),
111 false,
112 Clock::default(),
113 )
114 }
115
116 pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
121 CacheBuilder::default().segments(num_segments)
122 }
123}
124
125impl<K, V, S> SegmentedCache<K, V, S> {
126 pub fn name(&self) -> Option<&str> {
128 self.inner.segments[0].name()
129 }
130
131 pub fn policy(&self) -> Policy {
136 let mut policy = self.inner.segments[0].policy();
137 policy.set_max_capacity(self.inner.desired_capacity);
138 policy.set_num_segments(self.inner.segments.len());
139 policy
140 }
141
142 pub fn entry_count(&self) -> u64 {
176 self.inner
177 .segments
178 .iter()
179 .map(|seg| seg.entry_count())
180 .sum()
181 }
182
183 pub fn weighted_size(&self) -> u64 {
190 self.inner
191 .segments
192 .iter()
193 .map(|seg| seg.weighted_size())
194 .sum()
195 }
196}
197
198impl<K, V, S> SegmentedCache<K, V, S>
199where
200 K: Hash + Eq + Send + Sync + 'static,
201 V: Clone + Send + Sync + 'static,
202 S: BuildHasher + Clone + Send + Sync + 'static,
203{
204 #[allow(clippy::too_many_arguments)]
208 pub(crate) fn with_everything(
209 name: Option<String>,
210 max_capacity: Option<u64>,
211 initial_capacity: Option<usize>,
212 num_segments: usize,
213 build_hasher: S,
214 weigher: Option<Weigher<K, V>>,
215 eviction_policy: EvictionPolicy,
216 eviction_listener: Option<EvictionListener<K, V>>,
217 expiration_policy: ExpirationPolicy<K, V>,
218 housekeeper_config: HousekeeperConfig,
219 invalidator_enabled: bool,
220 clock: Clock,
221 ) -> Self {
222 Self {
223 inner: Arc::new(Inner::new(
224 name,
225 max_capacity,
226 initial_capacity,
227 num_segments,
228 build_hasher,
229 weigher,
230 eviction_policy,
231 eviction_listener,
232 expiration_policy,
233 housekeeper_config,
234 invalidator_enabled,
235 clock,
236 )),
237 }
238 }
239
240 pub fn contains_key<Q>(&self, key: &Q) -> bool
249 where
250 K: Borrow<Q>,
251 Q: Hash + Eq + ?Sized,
252 {
253 let hash = self.inner.hash(key);
254 self.inner.select(hash).contains_key_with_hash(key, hash)
255 }
256
257 pub fn get<Q>(&self, key: &Q) -> Option<V>
268 where
269 K: Borrow<Q>,
270 Q: Hash + Eq + ?Sized,
271 {
272 let hash = self.inner.hash(key);
273 self.inner
274 .select(hash)
275 .get_with_hash(key, hash, false)
276 .map(Entry::into_value)
277 }
278
279 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280 where
281 K: Hash + Eq,
282 {
283 let hash = self.inner.hash(&key);
284 let cache = self.inner.select(hash);
285 OwnedKeyEntrySelector::new(key, hash, cache)
286 }
287
288 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289 where
290 K: Borrow<Q>,
291 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
292 {
293 let hash = self.inner.hash(key);
294 let cache = self.inner.select(hash);
295 RefKeyEntrySelector::new(key, hash, cache)
296 }
297
298 #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
301 pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
302 self.get_with(key, init)
303 }
304
305 #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
308 pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
309 where
310 F: FnOnce() -> Result<V, E>,
311 E: Send + Sync + 'static,
312 {
313 self.try_get_with(key, init)
314 }
315
316 pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
328 let hash = self.inner.hash(&key);
329 let key = Arc::new(key);
330 let replace_if = None as Option<fn(&V) -> bool>;
331 self.inner
332 .select(hash)
333 .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
334 .into_value()
335 }
336
337 pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
341 where
342 K: Borrow<Q>,
343 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
344 {
345 let hash = self.inner.hash(key);
346 let replace_if = None as Option<fn(&V) -> bool>;
347 self.inner
348 .select(hash)
349 .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
350 .into_value()
351 }
352
353 pub fn get_with_if(
362 &self,
363 key: K,
364 init: impl FnOnce() -> V,
365 replace_if: impl FnMut(&V) -> bool,
366 ) -> V {
367 let hash = self.inner.hash(&key);
368 let key = Arc::new(key);
369 self.inner
370 .select(hash)
371 .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
372 .into_value()
373 }
374
375 pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
389 where
390 F: FnOnce() -> Option<V>,
391 {
392 let hash = self.inner.hash(&key);
393 let key = Arc::new(key);
394 self.inner
395 .select(hash)
396 .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
397 .map(Entry::into_value)
398 }
399
400 pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
405 where
406 F: FnOnce() -> Option<V>,
407 K: Borrow<Q>,
408 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
409 {
410 let hash = self.inner.hash(key);
411 self.inner
412 .select(hash)
413 .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
414 .map(Entry::into_value)
415 }
416
417 pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
434 where
435 F: FnOnce() -> Result<V, E>,
436 E: Send + Sync + 'static,
437 {
438 let hash = self.inner.hash(&key);
439 let key = Arc::new(key);
440 self.inner
441 .select(hash)
442 .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
443 .map(Entry::into_value)
444 }
445
446 pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
450 where
451 F: FnOnce() -> Result<V, E>,
452 E: Send + Sync + 'static,
453 K: Borrow<Q>,
454 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
455 {
456 let hash = self.inner.hash(key);
457 self.inner
458 .select(hash)
459 .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
460 .map(Entry::into_value)
461 }
462
463 pub fn insert(&self, key: K, value: V) {
467 let hash = self.inner.hash(&key);
468 let key = Arc::new(key);
469 self.inner.select(hash).insert_with_hash(key, hash, value);
470 }
471
472 pub fn invalidate<Q>(&self, key: &Q)
480 where
481 K: Borrow<Q>,
482 Q: Hash + Eq + ?Sized,
483 {
484 let hash = self.inner.hash(key);
485 self.inner
486 .select(hash)
487 .invalidate_with_hash(key, hash, false);
488 }
489
490 pub fn remove<Q>(&self, key: &Q) -> Option<V>
498 where
499 K: Borrow<Q>,
500 Q: Hash + Eq + ?Sized,
501 {
502 let hash = self.inner.hash(key);
503 self.inner
504 .select(hash)
505 .invalidate_with_hash(key, hash, true)
506 }
507
508 pub fn invalidate_all(&self) {
523 for segment in self.inner.segments.iter() {
524 segment.invalidate_all();
525 }
526 }
527
528 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
562 where
563 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
564 {
565 let pred = Arc::new(predicate);
566 for segment in self.inner.segments.iter() {
567 segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
568 }
569 Ok(())
570 }
571
572 pub fn iter(&self) -> Iter<'_, K, V> {
620 let num_cht_segments = self.inner.segments[0].num_cht_segments();
621 let segments = self
622 .inner
623 .segments
624 .iter()
625 .map(|c| c as &dyn ScanningGet<_, _>)
626 .collect::<Vec<_>>()
627 .into_boxed_slice();
628 Iter::with_multiple_cache_segments(segments, num_cht_segments)
629 }
630
631 pub fn run_pending_tasks(&self) {
633 for segment in self.inner.segments.iter() {
634 segment.run_pending_tasks();
635 }
636 }
637
638 }
647
648impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
649where
650 K: Hash + Eq + Send + Sync + 'static,
651 V: Clone + Send + Sync + 'static,
652 S: BuildHasher + Clone + Send + Sync + 'static,
653{
654 type Item = (Arc<K>, V);
655
656 type IntoIter = Iter<'a, K, V>;
657
658 fn into_iter(self) -> Self::IntoIter {
659 self.iter()
660 }
661}
662
663#[cfg(test)]
665impl<K, V, S> SegmentedCache<K, V, S> {
666 fn is_waiter_map_empty(&self) -> bool {
667 self.inner.segments.iter().all(Cache::is_waiter_map_empty)
668 }
669}
670
671#[cfg(test)]
672impl<K, V, S> SegmentedCache<K, V, S>
673where
674 K: Hash + Eq + Send + Sync + 'static,
675 V: Clone + Send + Sync + 'static,
676 S: BuildHasher + Clone + Send + Sync + 'static,
677{
678 fn invalidation_predicate_count(&self) -> usize {
679 self.inner
680 .segments
681 .iter()
682 .map(|seg| seg.invalidation_predicate_count())
683 .sum()
684 }
685
686 fn reconfigure_for_testing(&mut self) {
687 let inner = Arc::get_mut(&mut self.inner)
688 .expect("There are other strong reference to self.inner Arc");
689
690 for segment in inner.segments.iter_mut() {
691 segment.reconfigure_for_testing();
692 }
693 }
694
695 fn key_locks_map_is_empty(&self) -> bool {
696 self.inner
697 .segments
698 .iter()
699 .all(|seg| seg.key_locks_map_is_empty())
700 }
701}
702
703struct Inner<K, V, S> {
704 desired_capacity: Option<u64>,
705 segments: Box<[Cache<K, V, S>]>,
706 build_hasher: S,
707 segment_shift: u32,
708}
709
710impl<K, V, S> Inner<K, V, S>
711where
712 K: Hash + Eq + Send + Sync + 'static,
713 V: Clone + Send + Sync + 'static,
714 S: BuildHasher + Clone + Send + Sync + 'static,
715{
716 #[allow(clippy::too_many_arguments)]
720 fn new(
721 name: Option<String>,
722 max_capacity: Option<u64>,
723 initial_capacity: Option<usize>,
724 num_segments: usize,
725 build_hasher: S,
726 weigher: Option<Weigher<K, V>>,
727 eviction_policy: EvictionPolicy,
728 eviction_listener: Option<EvictionListener<K, V>>,
729 expiration_policy: ExpirationPolicy<K, V>,
730 housekeeper_config: HousekeeperConfig,
731 invalidator_enabled: bool,
732 clock: Clock,
733 ) -> Self {
734 assert!(num_segments > 0);
735
736 let actual_num_segments = num_segments.next_power_of_two();
737 let segment_shift = 64 - actual_num_segments.trailing_zeros();
738 let seg_max_capacity =
739 max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
740 let seg_init_capacity =
741 initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
742 let segments = (0..actual_num_segments)
745 .map(|_| {
746 Cache::with_everything(
747 name.clone(),
748 seg_max_capacity,
749 seg_init_capacity,
750 build_hasher.clone(),
751 weigher.clone(),
752 eviction_policy.clone(),
753 eviction_listener.clone(),
754 expiration_policy.clone(),
755 housekeeper_config.clone(),
756 invalidator_enabled,
757 clock.clone(),
758 )
759 })
760 .collect::<Vec<_>>();
761
762 Self {
763 desired_capacity: max_capacity,
764 segments: segments.into_boxed_slice(),
765 build_hasher,
766 segment_shift,
767 }
768 }
769
770 #[inline]
771 fn hash<Q>(&self, key: &Q) -> u64
772 where
773 K: Borrow<Q>,
774 Q: Hash + Eq + ?Sized,
775 {
776 let mut hasher = self.build_hasher.build_hasher();
777 key.hash(&mut hasher);
778 hasher.finish()
779 }
780
781 #[inline]
782 fn select(&self, hash: u64) -> &Cache<K, V, S> {
783 let index = self.segment_index_from_hash(hash);
784 &self.segments[index]
785 }
786
787 #[inline]
788 fn segment_index_from_hash(&self, hash: u64) -> usize {
789 if self.segment_shift == 64 {
790 0
791 } else {
792 (hash >> self.segment_shift) as usize
793 }
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::SegmentedCache;
800 use crate::notification::RemovalCause;
801 use parking_lot::Mutex;
802 use std::{sync::Arc, time::Duration};
803
804 #[test]
805 fn max_capacity_zero() {
806 let mut cache = SegmentedCache::new(0, 1);
807 cache.reconfigure_for_testing();
808
809 let cache = cache;
811
812 cache.insert(0, ());
813
814 assert!(!cache.contains_key(&0));
815 assert!(cache.get(&0).is_none());
816 cache.run_pending_tasks();
817 assert!(!cache.contains_key(&0));
818 assert!(cache.get(&0).is_none());
819 assert_eq!(cache.entry_count(), 0)
820 }
821
822 #[test]
823 fn basic_single_thread() {
824 let actual = Arc::new(Mutex::new(Vec::new()));
826 let mut expected = Vec::new();
827
828 let a1 = Arc::clone(&actual);
830 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
831
832 let mut cache = SegmentedCache::builder(1)
834 .max_capacity(3)
835 .eviction_listener(listener)
836 .build();
837 cache.reconfigure_for_testing();
838
839 let cache = cache;
841
842 cache.insert("a", "alice");
843 cache.insert("b", "bob");
844 assert_eq!(cache.get(&"a"), Some("alice"));
845 assert!(cache.contains_key(&"a"));
846 assert!(cache.contains_key(&"b"));
847 assert_eq!(cache.get(&"b"), Some("bob"));
848 cache.run_pending_tasks();
849 cache.insert("c", "cindy");
852 assert_eq!(cache.get(&"c"), Some("cindy"));
853 assert!(cache.contains_key(&"c"));
854 cache.run_pending_tasks();
856
857 assert!(cache.contains_key(&"a"));
858 assert_eq!(cache.get(&"a"), Some("alice"));
859 assert_eq!(cache.get(&"b"), Some("bob"));
860 assert!(cache.contains_key(&"b"));
861 cache.run_pending_tasks();
862 cache.insert("d", "david"); expected.push((Arc::new("d"), "david", RemovalCause::Size));
867 cache.run_pending_tasks();
868 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
870
871 cache.insert("d", "david");
872 expected.push((Arc::new("d"), "david", RemovalCause::Size));
873 cache.run_pending_tasks();
874 assert!(!cache.contains_key(&"d"));
875 assert_eq!(cache.get(&"d"), None); cache.insert("d", "dennis");
880 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
881 cache.run_pending_tasks();
882 assert_eq!(cache.get(&"a"), Some("alice"));
883 assert_eq!(cache.get(&"b"), Some("bob"));
884 assert_eq!(cache.get(&"c"), None);
885 assert_eq!(cache.get(&"d"), Some("dennis"));
886 assert!(cache.contains_key(&"a"));
887 assert!(cache.contains_key(&"b"));
888 assert!(!cache.contains_key(&"c"));
889 assert!(cache.contains_key(&"d"));
890
891 cache.invalidate(&"b");
892 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
893 cache.run_pending_tasks();
894 assert_eq!(cache.get(&"b"), None);
895 assert!(!cache.contains_key(&"b"));
896
897 assert!(cache.remove(&"b").is_none());
898 assert_eq!(cache.remove(&"d"), Some("dennis"));
899 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
900 cache.run_pending_tasks();
901 assert_eq!(cache.get(&"d"), None);
902 assert!(!cache.contains_key(&"d"));
903
904 verify_notification_vec(&cache, actual, &expected);
905 assert!(cache.key_locks_map_is_empty());
906 }
907
908 #[test]
909 fn non_power_of_two_segments() {
910 let mut cache = SegmentedCache::new(100, 5);
911 cache.reconfigure_for_testing();
912
913 let cache = cache;
915
916 assert_eq!(cache.iter().count(), 0);
917
918 cache.insert("a", "alice");
919 cache.insert("b", "bob");
920 cache.insert("c", "cindy");
921
922 assert_eq!(cache.iter().count(), 3);
923 cache.run_pending_tasks();
924 assert_eq!(cache.iter().count(), 3);
925 }
926
927 #[test]
928 fn size_aware_eviction() {
929 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
930
931 let alice = ("alice", 10);
932 let bob = ("bob", 15);
933 let bill = ("bill", 20);
934 let cindy = ("cindy", 5);
935 let david = ("david", 15);
936 let dennis = ("dennis", 15);
937
938 let actual = Arc::new(Mutex::new(Vec::new()));
940 let mut expected = Vec::new();
941
942 let a1 = Arc::clone(&actual);
944 let listener = move |k, v, cause| a1.lock().push((k, v, cause));
945
946 let mut cache = SegmentedCache::builder(1)
948 .max_capacity(31)
949 .weigher(weigher)
950 .eviction_listener(listener)
951 .build();
952 cache.reconfigure_for_testing();
953
954 let cache = cache;
956
957 cache.insert("a", alice);
958 cache.insert("b", bob);
959 assert_eq!(cache.get(&"a"), Some(alice));
960 assert!(cache.contains_key(&"a"));
961 assert!(cache.contains_key(&"b"));
962 assert_eq!(cache.get(&"b"), Some(bob));
963 cache.run_pending_tasks();
964 cache.insert("c", cindy);
967 assert_eq!(cache.get(&"c"), Some(cindy));
968 assert!(cache.contains_key(&"c"));
969 cache.run_pending_tasks();
971
972 assert!(cache.contains_key(&"a"));
973 assert_eq!(cache.get(&"a"), Some(alice));
974 assert_eq!(cache.get(&"b"), Some(bob));
975 assert!(cache.contains_key(&"b"));
976 cache.run_pending_tasks();
977 cache.insert("d", david); expected.push((Arc::new("d"), david, RemovalCause::Size));
984 cache.run_pending_tasks();
985 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
987
988 cache.insert("d", david);
989 expected.push((Arc::new("d"), david, RemovalCause::Size));
990 cache.run_pending_tasks();
991 assert!(!cache.contains_key(&"d"));
992 assert_eq!(cache.get(&"d"), None); cache.insert("d", david);
995 expected.push((Arc::new("d"), david, RemovalCause::Size));
996 cache.run_pending_tasks();
997 assert_eq!(cache.get(&"d"), None); assert!(!cache.contains_key(&"d"));
999
1000 cache.insert("d", david);
1001 expected.push((Arc::new("d"), david, RemovalCause::Size));
1002 cache.run_pending_tasks();
1003 assert!(!cache.contains_key(&"d"));
1004 assert_eq!(cache.get(&"d"), None); cache.insert("d", dennis);
1008 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
1009 expected.push((Arc::new("a"), alice, RemovalCause::Size));
1010 cache.run_pending_tasks();
1011 assert_eq!(cache.get(&"a"), None);
1012 assert_eq!(cache.get(&"b"), Some(bob));
1013 assert_eq!(cache.get(&"c"), None);
1014 assert_eq!(cache.get(&"d"), Some(dennis));
1015 assert!(!cache.contains_key(&"a"));
1016 assert!(cache.contains_key(&"b"));
1017 assert!(!cache.contains_key(&"c"));
1018 assert!(cache.contains_key(&"d"));
1019
1020 cache.insert("b", bill);
1022 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1023 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1024 cache.run_pending_tasks();
1025 assert_eq!(cache.get(&"b"), Some(bill));
1026 assert_eq!(cache.get(&"d"), None);
1027 assert!(cache.contains_key(&"b"));
1028 assert!(!cache.contains_key(&"d"));
1029
1030 cache.insert("a", alice);
1032 cache.insert("b", bob);
1033 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1034 cache.run_pending_tasks();
1035 assert_eq!(cache.get(&"a"), Some(alice));
1036 assert_eq!(cache.get(&"b"), Some(bob));
1037 assert_eq!(cache.get(&"d"), None);
1038 assert!(cache.contains_key(&"a"));
1039 assert!(cache.contains_key(&"b"));
1040 assert!(!cache.contains_key(&"d"));
1041
1042 assert_eq!(cache.entry_count(), 2);
1044 assert_eq!(cache.weighted_size(), 25);
1045
1046 verify_notification_vec(&cache, actual, &expected);
1047 assert!(cache.key_locks_map_is_empty());
1048 }
1049
1050 #[test]
1051 fn basic_multi_threads() {
1052 let num_threads = 4;
1053
1054 let mut cache = SegmentedCache::new(100, num_threads);
1055 cache.reconfigure_for_testing();
1056
1057 let cache = cache;
1059
1060 #[allow(clippy::needless_collect)]
1062 let handles = (0..num_threads)
1063 .map(|id| {
1064 let cache = cache.clone();
1065 std::thread::spawn(move || {
1066 cache.insert(10, format!("{id}-100"));
1067 cache.get(&10);
1068 cache.run_pending_tasks();
1069 cache.insert(20, format!("{id}-200"));
1070 cache.invalidate(&10);
1071 })
1072 })
1073 .collect::<Vec<_>>();
1074
1075 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1076
1077 cache.run_pending_tasks();
1078
1079 assert!(cache.get(&10).is_none());
1080 assert!(cache.get(&20).is_some());
1081 assert!(!cache.contains_key(&10));
1082 assert!(cache.contains_key(&20));
1083 }
1084
1085 #[test]
1086 fn invalidate_all() {
1087 use std::collections::HashMap;
1088
1089 let actual = Arc::new(Mutex::new(HashMap::new()));
1092 let mut expected = HashMap::new();
1093
1094 let a1 = Arc::clone(&actual);
1096 let listener = move |k, v, cause| {
1097 a1.lock().insert(k, (v, cause));
1098 };
1099
1100 let mut cache = SegmentedCache::builder(4)
1102 .max_capacity(100)
1103 .eviction_listener(listener)
1104 .build();
1105 cache.reconfigure_for_testing();
1106
1107 let cache = cache;
1109
1110 cache.insert("a", "alice");
1111 cache.insert("b", "bob");
1112 cache.insert("c", "cindy");
1113 assert_eq!(cache.get(&"a"), Some("alice"));
1114 assert_eq!(cache.get(&"b"), Some("bob"));
1115 assert_eq!(cache.get(&"c"), Some("cindy"));
1116 assert!(cache.contains_key(&"a"));
1117 assert!(cache.contains_key(&"b"));
1118 assert!(cache.contains_key(&"c"));
1119
1120 cache.invalidate_all();
1125 expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1126 expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1127 expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1128 cache.run_pending_tasks();
1129
1130 cache.insert("d", "david");
1131 cache.run_pending_tasks();
1132
1133 assert!(cache.get(&"a").is_none());
1134 assert!(cache.get(&"b").is_none());
1135 assert!(cache.get(&"c").is_none());
1136 assert_eq!(cache.get(&"d"), Some("david"));
1137 assert!(!cache.contains_key(&"a"));
1138 assert!(!cache.contains_key(&"b"));
1139 assert!(!cache.contains_key(&"c"));
1140 assert!(cache.contains_key(&"d"));
1141
1142 verify_notification_map(&cache, actual, &expected);
1143 }
1144
1145 #[test]
1146 fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1147 use std::collections::{HashMap, HashSet};
1148
1149 const SEGMENTS: usize = 4;
1150
1151 let actual = Arc::new(Mutex::new(HashMap::new()));
1154 let mut expected = HashMap::new();
1155
1156 let a1 = Arc::clone(&actual);
1158 let listener = move |k, v, cause| {
1159 a1.lock().insert(k, (v, cause));
1160 };
1161
1162 let (clock, mock) = crate::common::time::Clock::mock();
1163
1164 let mut cache = SegmentedCache::builder(SEGMENTS)
1166 .max_capacity(100)
1167 .support_invalidation_closures()
1168 .eviction_listener(listener)
1169 .clock(clock)
1170 .build();
1171 cache.reconfigure_for_testing();
1172
1173 let cache = cache;
1175
1176 cache.insert(0, "alice");
1177 cache.insert(1, "bob");
1178 cache.insert(2, "alex");
1179 cache.run_pending_tasks();
1180 mock.increment(Duration::from_secs(5)); cache.run_pending_tasks();
1182
1183 assert_eq!(cache.get(&0), Some("alice"));
1184 assert_eq!(cache.get(&1), Some("bob"));
1185 assert_eq!(cache.get(&2), Some("alex"));
1186 assert!(cache.contains_key(&0));
1187 assert!(cache.contains_key(&1));
1188 assert!(cache.contains_key(&2));
1189
1190 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1191 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1192 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1193 expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1194 expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1195
1196 mock.increment(Duration::from_secs(5)); cache.insert(3, "alice");
1199
1200 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1203 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1205
1206 assert!(cache.get(&0).is_none());
1207 assert!(cache.get(&2).is_none());
1208 assert_eq!(cache.get(&1), Some("bob"));
1209 assert_eq!(cache.get(&3), Some("alice"));
1211
1212 assert!(!cache.contains_key(&0));
1213 assert!(cache.contains_key(&1));
1214 assert!(!cache.contains_key(&2));
1215 assert!(cache.contains_key(&3));
1216
1217 assert_eq!(cache.entry_count(), 2);
1218 assert_eq!(cache.invalidation_predicate_count(), 0);
1219
1220 mock.increment(Duration::from_secs(5)); cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1223 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1224 assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1225 expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1226 expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1227
1228 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1231 cache.run_pending_tasks(); std::thread::sleep(Duration::from_millis(200));
1233
1234 assert!(cache.get(&1).is_none());
1235 assert!(cache.get(&3).is_none());
1236
1237 assert!(!cache.contains_key(&1));
1238 assert!(!cache.contains_key(&3));
1239
1240 assert_eq!(cache.entry_count(), 0);
1241 assert_eq!(cache.invalidation_predicate_count(), 0);
1242
1243 verify_notification_map(&cache, actual, &expected);
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn test_iter() {
1250 const NUM_KEYS: usize = 50;
1251
1252 fn make_value(key: usize) -> String {
1253 format!("val: {key}")
1254 }
1255
1256 let cache = SegmentedCache::builder(4)
1258 .max_capacity(100)
1259 .time_to_idle(Duration::from_secs(10))
1260 .build();
1261
1262 for key in 0..NUM_KEYS {
1263 cache.insert(key, make_value(key));
1264 }
1265
1266 let mut key_set = std::collections::HashSet::new();
1267
1268 for (key, value) in &cache {
1269 assert_eq!(value, make_value(*key));
1270
1271 key_set.insert(*key);
1272 }
1273
1274 assert_eq!(key_set.len(), NUM_KEYS);
1276 }
1277
1278 #[test]
1284 fn test_iter_multi_threads() {
1285 use std::collections::HashSet;
1286
1287 const NUM_KEYS: usize = 1024;
1288 const NUM_THREADS: usize = 16;
1289
1290 fn make_value(key: usize) -> String {
1291 format!("val: {key}")
1292 }
1293
1294 let cache = SegmentedCache::builder(4)
1295 .max_capacity(2048)
1296 .time_to_idle(Duration::from_secs(10))
1297 .build();
1298
1299 for key in 0..NUM_KEYS {
1301 cache.insert(key, make_value(key));
1302 }
1303
1304 let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1305 let write_lock = rw_lock.write().unwrap();
1306
1307 #[allow(clippy::needless_collect)]
1309 let handles = (0..NUM_THREADS)
1310 .map(|n| {
1311 let cache = cache.clone();
1312 let rw_lock = Arc::clone(&rw_lock);
1313
1314 if n % 2 == 0 {
1315 std::thread::spawn(move || {
1317 let read_lock = rw_lock.read().unwrap();
1318 for key in 0..NUM_KEYS {
1319 cache.insert(key, make_value(key));
1321 }
1322 std::mem::drop(read_lock);
1323 })
1324 } else {
1325 std::thread::spawn(move || {
1327 let read_lock = rw_lock.read().unwrap();
1328 let mut key_set = HashSet::new();
1329 for (key, value) in &cache {
1330 assert_eq!(value, make_value(*key));
1331 key_set.insert(*key);
1332 }
1333 assert_eq!(key_set.len(), NUM_KEYS);
1335 std::mem::drop(read_lock);
1336 })
1337 }
1338 })
1339 .collect::<Vec<_>>();
1340
1341 std::mem::drop(write_lock);
1343
1344 handles.into_iter().for_each(|h| h.join().expect("Failed"));
1345
1346 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1348 assert_eq!(key_set.len(), NUM_KEYS);
1349 }
1350
1351 #[test]
1352 fn get_with() {
1353 use std::thread::{sleep, spawn};
1354
1355 let cache = SegmentedCache::new(100, 4);
1356 const KEY: u32 = 0;
1357
1358 let thread1 = {
1364 let cache1 = cache.clone();
1365 spawn(move || {
1366 let v = cache1.get_with(KEY, || {
1368 sleep(Duration::from_millis(300));
1370 "thread1"
1371 });
1372 assert_eq!(v, "thread1");
1373 })
1374 };
1375
1376 let thread2 = {
1380 let cache2 = cache.clone();
1381 spawn(move || {
1382 sleep(Duration::from_millis(100));
1384 let v = cache2.get_with(KEY, || unreachable!());
1385 assert_eq!(v, "thread1");
1386 })
1387 };
1388
1389 let thread3 = {
1395 let cache3 = cache.clone();
1396 spawn(move || {
1397 sleep(Duration::from_millis(400));
1399 let v = cache3.get_with(KEY, || unreachable!());
1400 assert_eq!(v, "thread1");
1401 })
1402 };
1403
1404 let thread4 = {
1407 let cache4 = cache.clone();
1408 spawn(move || {
1409 sleep(Duration::from_millis(200));
1411 let maybe_v = cache4.get(&KEY);
1412 assert!(maybe_v.is_none());
1413 })
1414 };
1415
1416 let thread5 = {
1419 let cache5 = cache.clone();
1420 spawn(move || {
1421 sleep(Duration::from_millis(400));
1423 let maybe_v = cache5.get(&KEY);
1424 assert_eq!(maybe_v, Some("thread1"));
1425 })
1426 };
1427
1428 for t in [thread1, thread2, thread3, thread4, thread5] {
1429 t.join().expect("Failed to join");
1430 }
1431
1432 assert!(cache.is_waiter_map_empty());
1433 }
1434
1435 #[test]
1436 fn get_with_if() {
1437 use std::thread::{sleep, spawn};
1438
1439 let cache = SegmentedCache::new(100, 4);
1440 const KEY: u32 = 0;
1441
1442 let thread1 = {
1448 let cache1 = cache.clone();
1449 spawn(move || {
1450 let v = cache1.get_with_if(
1452 KEY,
1453 || {
1454 sleep(Duration::from_millis(300));
1456 "thread1"
1457 },
1458 |_v| unreachable!(),
1459 );
1460 assert_eq!(v, "thread1");
1461 })
1462 };
1463
1464 let thread2 = {
1468 let cache2 = cache.clone();
1469 spawn(move || {
1470 sleep(Duration::from_millis(100));
1472 let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1473 assert_eq!(v, "thread1");
1474 })
1475 };
1476
1477 let thread3 = {
1484 let cache3 = cache.clone();
1485 spawn(move || {
1486 sleep(Duration::from_millis(350));
1488 let v = cache3.get_with_if(
1489 KEY,
1490 || unreachable!(),
1491 |v| {
1492 assert_eq!(v, &"thread1");
1493 false
1494 },
1495 );
1496 assert_eq!(v, "thread1");
1497 })
1498 };
1499
1500 let thread4 = {
1505 let cache4 = cache.clone();
1506 spawn(move || {
1507 sleep(Duration::from_millis(400));
1509 let v = cache4.get_with_if(
1510 KEY,
1511 || "thread4",
1512 |v| {
1513 assert_eq!(v, &"thread1");
1514 true
1515 },
1516 );
1517 assert_eq!(v, "thread4");
1518 })
1519 };
1520
1521 let thread5 = {
1524 let cache5 = cache.clone();
1525 spawn(move || {
1526 sleep(Duration::from_millis(200));
1528 let maybe_v = cache5.get(&KEY);
1529 assert!(maybe_v.is_none());
1530 })
1531 };
1532
1533 let thread6 = {
1536 let cache6 = cache.clone();
1537 spawn(move || {
1538 sleep(Duration::from_millis(350));
1540 let maybe_v = cache6.get(&KEY);
1541 assert_eq!(maybe_v, Some("thread1"));
1542 })
1543 };
1544
1545 let thread7 = {
1548 let cache7 = cache.clone();
1549 spawn(move || {
1550 sleep(Duration::from_millis(450));
1552 let maybe_v = cache7.get(&KEY);
1553 assert_eq!(maybe_v, Some("thread4"));
1554 })
1555 };
1556
1557 for t in [
1558 thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1559 ] {
1560 t.join().expect("Failed to join");
1561 }
1562
1563 assert!(cache.is_waiter_map_empty());
1564 }
1565
1566 #[test]
1567 fn try_get_with() {
1568 use std::{
1569 sync::Arc,
1570 thread::{sleep, spawn},
1571 };
1572
1573 #[derive(thiserror::Error, Debug)]
1574 #[error("{}", _0)]
1575 pub struct MyError(String);
1576
1577 type MyResult<T> = Result<T, Arc<MyError>>;
1578
1579 let cache = SegmentedCache::new(100, 4);
1580 const KEY: u32 = 0;
1581
1582 let thread1 = {
1588 let cache1 = cache.clone();
1589 spawn(move || {
1590 let v = cache1.try_get_with(KEY, || {
1592 sleep(Duration::from_millis(300));
1594 Err(MyError("thread1 error".into()))
1595 });
1596 assert!(v.is_err());
1597 })
1598 };
1599
1600 let thread2 = {
1605 let cache2 = cache.clone();
1606 spawn(move || {
1607 sleep(Duration::from_millis(100));
1609 let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1610 assert!(v.is_err());
1611 })
1612 };
1613
1614 let thread3 = {
1620 let cache3 = cache.clone();
1621 spawn(move || {
1622 sleep(Duration::from_millis(400));
1624 let v: MyResult<_> = cache3.try_get_with(KEY, || {
1625 sleep(Duration::from_millis(300));
1627 Ok("thread3")
1628 });
1629 assert_eq!(v.unwrap(), "thread3");
1630 })
1631 };
1632
1633 let thread4 = {
1637 let cache4 = cache.clone();
1638 spawn(move || {
1639 sleep(Duration::from_millis(500));
1641 let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1642 assert_eq!(v.unwrap(), "thread3");
1643 })
1644 };
1645
1646 let thread5 = {
1652 let cache5 = cache.clone();
1653 spawn(move || {
1654 sleep(Duration::from_millis(800));
1656 let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1657 assert_eq!(v.unwrap(), "thread3");
1658 })
1659 };
1660
1661 let thread6 = {
1664 let cache6 = cache.clone();
1665 spawn(move || {
1666 sleep(Duration::from_millis(200));
1668 let maybe_v = cache6.get(&KEY);
1669 assert!(maybe_v.is_none());
1670 })
1671 };
1672
1673 let thread7 = {
1676 let cache7 = cache.clone();
1677 spawn(move || {
1678 sleep(Duration::from_millis(400));
1680 let maybe_v = cache7.get(&KEY);
1681 assert!(maybe_v.is_none());
1682 })
1683 };
1684
1685 let thread8 = {
1688 let cache8 = cache.clone();
1689 spawn(move || {
1690 sleep(Duration::from_millis(800));
1692 let maybe_v = cache8.get(&KEY);
1693 assert_eq!(maybe_v, Some("thread3"));
1694 })
1695 };
1696
1697 for t in [
1698 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1699 ] {
1700 t.join().expect("Failed to join");
1701 }
1702
1703 assert!(cache.is_waiter_map_empty());
1704 }
1705
1706 #[test]
1707 fn optionally_get_with() {
1708 use std::thread::{sleep, spawn};
1709
1710 let cache = SegmentedCache::new(100, 4);
1711 const KEY: u32 = 0;
1712
1713 let thread1 = {
1719 let cache1 = cache.clone();
1720 spawn(move || {
1721 let v = cache1.optionally_get_with(KEY, || {
1723 sleep(Duration::from_millis(300));
1725 None
1726 });
1727 assert!(v.is_none());
1728 })
1729 };
1730
1731 let thread2 = {
1736 let cache2 = cache.clone();
1737 spawn(move || {
1738 sleep(Duration::from_millis(100));
1740 let v = cache2.optionally_get_with(KEY, || unreachable!());
1741 assert!(v.is_none());
1742 })
1743 };
1744
1745 let thread3 = {
1751 let cache3 = cache.clone();
1752 spawn(move || {
1753 sleep(Duration::from_millis(400));
1755 let v = cache3.optionally_get_with(KEY, || {
1756 sleep(Duration::from_millis(300));
1758 Some("thread3")
1759 });
1760 assert_eq!(v.unwrap(), "thread3");
1761 })
1762 };
1763
1764 let thread4 = {
1768 let cache4 = cache.clone();
1769 spawn(move || {
1770 sleep(Duration::from_millis(500));
1772 let v = cache4.optionally_get_with(KEY, || unreachable!());
1773 assert_eq!(v.unwrap(), "thread3");
1774 })
1775 };
1776
1777 let thread5 = {
1783 let cache5 = cache.clone();
1784 spawn(move || {
1785 sleep(Duration::from_millis(800));
1787 let v = cache5.optionally_get_with(KEY, || unreachable!());
1788 assert_eq!(v.unwrap(), "thread3");
1789 })
1790 };
1791
1792 let thread6 = {
1795 let cache6 = cache.clone();
1796 spawn(move || {
1797 sleep(Duration::from_millis(200));
1799 let maybe_v = cache6.get(&KEY);
1800 assert!(maybe_v.is_none());
1801 })
1802 };
1803
1804 let thread7 = {
1807 let cache7 = cache.clone();
1808 spawn(move || {
1809 sleep(Duration::from_millis(400));
1811 let maybe_v = cache7.get(&KEY);
1812 assert!(maybe_v.is_none());
1813 })
1814 };
1815
1816 let thread8 = {
1819 let cache8 = cache.clone();
1820 spawn(move || {
1821 sleep(Duration::from_millis(800));
1823 let maybe_v = cache8.get(&KEY);
1824 assert_eq!(maybe_v, Some("thread3"));
1825 })
1826 };
1827
1828 for t in [
1829 thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1830 ] {
1831 t.join().expect("Failed to join");
1832 }
1833
1834 assert!(cache.is_waiter_map_empty());
1835 }
1836
1837 #[test]
1841 fn borrowed_forms_of_key() {
1842 let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1843
1844 let key = vec![1_u8];
1845 cache.insert(key.clone(), ());
1846
1847 let key_v: &Vec<u8> = &key;
1849 assert!(cache.contains_key(key_v));
1850 assert_eq!(cache.get(key_v), Some(()));
1851 cache.invalidate(key_v);
1852
1853 cache.insert(key, ());
1854
1855 let key_s: &[u8] = &[1_u8];
1857 assert!(cache.contains_key(key_s));
1858 assert_eq!(cache.get(key_s), Some(()));
1859 cache.invalidate(key_s);
1860 }
1861
1862 #[test]
1865 #[ignore]
1866 fn drop_value_immediately_after_eviction() {
1867 use crate::common::test_utils::{Counters, Value};
1868
1869 const NUM_SEGMENTS: usize = 1;
1870 const MAX_CAPACITY: u32 = 500;
1871 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1872
1873 let counters = Arc::new(Counters::default());
1874 let counters1 = Arc::clone(&counters);
1875
1876 let listener = move |_k, _v, cause| match cause {
1877 RemovalCause::Size => counters1.incl_evicted(),
1878 RemovalCause::Explicit => counters1.incl_invalidated(),
1879 _ => (),
1880 };
1881
1882 let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1883 .max_capacity(MAX_CAPACITY as u64)
1884 .eviction_listener(listener)
1885 .build();
1886 cache.reconfigure_for_testing();
1887
1888 let cache = cache;
1890
1891 for key in 0..KEYS {
1892 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1893 cache.insert(key, value);
1894 counters.incl_inserted();
1895 cache.run_pending_tasks();
1896 }
1897
1898 let eviction_count = KEYS - MAX_CAPACITY;
1899
1900 cache.run_pending_tasks();
1901 assert_eq!(counters.inserted(), KEYS, "inserted");
1902 assert_eq!(counters.value_created(), KEYS, "value_created");
1903 assert_eq!(counters.evicted(), eviction_count, "evicted");
1904 assert_eq!(counters.invalidated(), 0, "invalidated");
1905 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1906
1907 for key in 0..KEYS {
1908 cache.invalidate(&key);
1909 cache.run_pending_tasks();
1910 }
1911
1912 cache.run_pending_tasks();
1913 assert_eq!(counters.inserted(), KEYS, "inserted");
1914 assert_eq!(counters.value_created(), KEYS, "value_created");
1915 assert_eq!(counters.evicted(), eviction_count, "evicted");
1916 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1917 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1918
1919 std::mem::drop(cache);
1920 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1921 }
1922
1923 #[test]
1924 fn test_debug_format() {
1925 let cache = SegmentedCache::new(10, 4);
1926 cache.insert('a', "alice");
1927 cache.insert('b', "bob");
1928 cache.insert('c', "cindy");
1929
1930 let debug_str = format!("{cache:?}");
1931 assert!(debug_str.starts_with('{'));
1932 assert!(debug_str.contains(r#"'a': "alice""#));
1933 assert!(debug_str.contains(r#"'b': "bob""#));
1934 assert!(debug_str.contains(r#"'c': "cindy""#));
1935 assert!(debug_str.ends_with('}'));
1936 }
1937
1938 type NotificationPair<V> = (V, RemovalCause);
1939 type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1940
1941 fn verify_notification_vec<K, V, S>(
1942 cache: &SegmentedCache<K, V, S>,
1943 actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1944 expected: &[NotificationTriple<K, V>],
1945 ) where
1946 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1947 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1948 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1949 {
1950 const MAX_RETRIES: usize = 5;
1952 let mut retries = 0;
1953 loop {
1954 std::thread::sleep(Duration::from_millis(500));
1956
1957 let actual = &*actual.lock();
1958 if actual.len() != expected.len() {
1959 if retries <= MAX_RETRIES {
1960 retries += 1;
1961 cache.run_pending_tasks();
1962 continue;
1963 } else {
1964 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1965 }
1966 }
1967
1968 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1969 assert_eq!(actual, expected, "expected[{i}]");
1970 }
1971
1972 break;
1973 }
1974 }
1975
1976 fn verify_notification_map<K, V, S>(
1977 cache: &SegmentedCache<K, V, S>,
1978 actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1979 expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1980 ) where
1981 K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1982 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1983 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1984 {
1985 const MAX_RETRIES: usize = 5;
1987 let mut retries = 0;
1988 loop {
1989 std::thread::sleep(Duration::from_millis(500));
1991
1992 let actual = &*actual.lock();
1993 if actual.len() != expected.len() {
1994 if retries <= MAX_RETRIES {
1995 retries += 1;
1996 cache.run_pending_tasks();
1997 continue;
1998 } else {
1999 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
2000 }
2001 }
2002
2003 for actual_key in actual.keys() {
2004 assert_eq!(
2005 actual.get(actual_key),
2006 expected.get(actual_key),
2007 "expected[{actual_key}]",
2008 );
2009 }
2010
2011 break;
2012 }
2013 }
2014}