1use super::{
2 invalidator::{Invalidator, KeyDateLite, PredicateFun},
3 iter::ScanningGet,
4 key_lock::{KeyLock, KeyLockMap},
5 PredicateId,
6};
7
8use crate::{
9 common::{
10 self,
11 concurrent::{
12 arc::MiniArc,
13 constants::{
14 READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT,
15 },
16 deques::Deques,
17 entry_info::EntryInfo,
18 housekeeper::{Housekeeper, InnerSync},
19 AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher,
20 WriteOp,
21 },
22 deque::{DeqNode, Deque},
23 frequency_sketch::FrequencySketch,
24 time::{AtomicInstant, Clock, Instant},
25 timer_wheel::{ReschedulingResult, TimerWheel},
26 CacheRegion, HousekeeperConfig,
27 },
28 notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause},
29 policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy},
30 Entry, Expiry, Policy, PredicateError,
31};
32
33use crossbeam_channel::{Receiver, Sender, TrySendError};
34use crossbeam_utils::atomic::AtomicCell;
35use parking_lot::{Mutex, RwLock};
36use smallvec::SmallVec;
37use std::{
38 borrow::Borrow,
39 collections::hash_map::RandomState,
40 hash::{BuildHasher, Hash, Hasher},
41 rc::Rc,
42 sync::{
43 atomic::{AtomicBool, AtomicU8, Ordering},
44 Arc,
45 },
46 time::{Duration, Instant as StdInstant},
47};
48
49pub(crate) type HouseKeeperArc = Arc<Housekeeper>;
50
51pub(crate) struct BaseCache<K, V, S = RandomState> {
52 pub(crate) inner: Arc<Inner<K, V, S>>,
53 read_op_ch: Sender<ReadOp<K, V>>,
54 pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
55 pub(crate) housekeeper: Option<HouseKeeperArc>,
56}
57
58impl<K, V, S> Clone for BaseCache<K, V, S> {
59 fn clone(&self) -> Self {
64 Self {
65 inner: Arc::clone(&self.inner),
66 read_op_ch: self.read_op_ch.clone(),
67 write_op_ch: self.write_op_ch.clone(),
68 housekeeper: self.housekeeper.clone(),
69 }
70 }
71}
72
73impl<K, V, S> Drop for BaseCache<K, V, S> {
74 fn drop(&mut self) {
75 std::mem::drop(self.housekeeper.take());
77 }
78}
79
80impl<K, V, S> BaseCache<K, V, S> {
81 pub(crate) fn name(&self) -> Option<&str> {
82 self.inner.name()
83 }
84
85 pub(crate) fn policy(&self) -> Policy {
86 self.inner.policy()
87 }
88
89 pub(crate) fn entry_count(&self) -> u64 {
90 self.inner.entry_count()
91 }
92
93 pub(crate) fn weighted_size(&self) -> u64 {
94 self.inner.weighted_size()
95 }
96
97 pub(crate) fn is_map_disabled(&self) -> bool {
98 self.inner.max_capacity == Some(0)
99 }
100
101 #[inline]
102 pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
103 self.inner.is_removal_notifier_enabled()
104 }
105
106 #[inline]
107 pub(crate) fn current_time(&self) -> Instant {
108 self.inner.current_time()
109 }
110
111 pub(crate) fn notify_invalidate(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>)
112 where
113 K: Send + Sync + 'static,
114 V: Clone + Send + Sync + 'static,
115 {
116 self.inner.notify_invalidate(key, entry);
117 }
118}
119
120impl<K, V, S> BaseCache<K, V, S>
121where
122 K: Hash + Eq,
123 S: BuildHasher,
124{
125 pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>> {
126 self.inner.maybe_key_lock(key)
127 }
128}
129
130impl<K, V, S> BaseCache<K, V, S>
131where
132 K: Hash + Eq + Send + Sync + 'static,
133 V: Clone + Send + Sync + 'static,
134 S: BuildHasher + Clone + Send + Sync + 'static,
135{
136 #[allow(clippy::too_many_arguments)]
138 pub(crate) fn new(
139 name: Option<String>,
140 max_capacity: Option<u64>,
141 initial_capacity: Option<usize>,
142 build_hasher: S,
143 weigher: Option<Weigher<K, V>>,
144 eviction_policy: EvictionPolicy,
145 eviction_listener: Option<EvictionListener<K, V>>,
146 expiration_policy: ExpirationPolicy<K, V>,
147 housekeeper_config: HousekeeperConfig,
148 invalidator_enabled: bool,
149 clock: Clock,
150 ) -> Self {
151 let (r_size, w_size) = if max_capacity == Some(0) {
152 (0, 0)
153 } else {
154 (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE)
155 };
156 let is_eviction_listener_enabled = eviction_listener.is_some();
157 let fast_now = clock.fast_now();
158
159 let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
160 let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
161
162 let inner = Arc::new(Inner::new(
163 name,
164 max_capacity,
165 initial_capacity,
166 build_hasher,
167 weigher,
168 eviction_policy,
169 eviction_listener,
170 r_rcv,
171 w_rcv,
172 expiration_policy,
173 invalidator_enabled,
174 clock,
175 ));
176
177 Self {
178 inner,
179 read_op_ch: r_snd,
180 write_op_ch: w_snd,
181 housekeeper: Some(Arc::new(Housekeeper::new(
182 is_eviction_listener_enabled,
183 housekeeper_config,
184 fast_now,
185 ))),
186 }
187 }
188
189 #[inline]
190 pub(crate) fn hash<Q>(&self, key: &Q) -> u64
191 where
192 K: Borrow<Q>,
193 Q: Hash + Eq + ?Sized,
194 {
195 self.inner.hash(key)
196 }
197
198 pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
199 where
200 K: Borrow<Q>,
201 Q: Hash + Eq + ?Sized,
202 {
203 self.inner
205 .get_key_value_and(key, hash, |k, entry| {
206 let i = &self.inner;
207 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
208 let now = self.current_time();
209
210 !is_expired_by_per_entry_ttl(entry.entry_info(), now)
211 && !is_expired_entry_wo(ttl, va, entry, now)
212 && !is_expired_entry_ao(tti, va, entry, now)
213 && !i.is_invalidated_entry(k, entry)
214 })
215 .unwrap_or_default() }
217
218 pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
219 where
220 K: Borrow<Q>,
221 Q: Hash + Eq + ?Sized,
222 {
223 let record = |op, now| {
225 self.record_read_op(op, now)
226 .expect("Failed to record a get op");
227 };
228 let ignore_if = None as Option<&mut fn(&V) -> bool>;
229 self.do_get_with_hash(key, hash, record, ignore_if, need_key)
230 }
231
232 pub(crate) fn get_with_hash_and_ignore_if<Q, I>(
233 &self,
234 key: &Q,
235 hash: u64,
236 ignore_if: Option<&mut I>,
237 need_key: bool,
238 ) -> Option<Entry<K, V>>
239 where
240 K: Borrow<Q>,
241 Q: Hash + Eq + ?Sized,
242 I: FnMut(&V) -> bool,
243 {
244 let record = |op, now| {
246 self.record_read_op(op, now)
247 .expect("Failed to record a get op");
248 };
249 self.do_get_with_hash(key, hash, record, ignore_if, need_key)
250 }
251
252 pub(crate) fn get_with_hash_without_recording<Q, I>(
253 &self,
254 key: &Q,
255 hash: u64,
256 ignore_if: Option<&mut I>,
257 ) -> Option<V>
258 where
259 K: Borrow<Q>,
260 Q: Hash + Eq + ?Sized,
261 I: FnMut(&V) -> bool,
262 {
263 let record = |_op, _now| {};
265 self.do_get_with_hash(key, hash, record, ignore_if, false)
266 .map(Entry::into_value)
267 }
268
269 fn do_get_with_hash<Q, R, I>(
270 &self,
271 key: &Q,
272 hash: u64,
273 read_recorder: R,
274 mut ignore_if: Option<&mut I>,
275 need_key: bool,
276 ) -> Option<Entry<K, V>>
277 where
278 K: Borrow<Q>,
279 Q: Hash + Eq + ?Sized,
280 R: Fn(ReadOp<K, V>, Instant),
281 I: FnMut(&V) -> bool,
282 {
283 if self.is_map_disabled() {
284 return None;
285 }
286
287 let mut now = self.current_time();
288
289 let maybe_entry = self
290 .inner
291 .get_key_value_and_then(key, hash, move |k, entry| {
292 if let Some(ignore_if) = &mut ignore_if {
293 if ignore_if(&entry.value) {
294 return None;
296 }
297 }
298
299 let i = &self.inner;
300 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
301
302 if is_expired_by_per_entry_ttl(entry.entry_info(), now)
303 || is_expired_entry_wo(ttl, va, entry, now)
304 || is_expired_entry_ao(tti, va, entry, now)
305 || i.is_invalidated_entry(k, entry)
306 {
307 None
309 } else {
310 let maybe_key = if need_key { Some(Arc::clone(k)) } else { None };
312 Some((maybe_key, MiniArc::clone(entry)))
313 }
314 });
315
316 if let Some((maybe_key, entry)) = maybe_entry {
317 let mut is_expiry_modified = false;
318
319 if let Some(expiry) = &self.inner.expiration_policy.expiry() {
321 let lm = entry.last_modified().expect("Last modified is not set");
322 now = now.max(lm);
327
328 let lm = self.inner.clock().to_std_instant(lm);
331
332 is_expiry_modified = Self::expire_after_read_or_update(
352 |k, v, t, d| expiry.expire_after_read(k, v, t, d, lm),
353 &entry.entry_info().key_hash().key,
354 &entry,
355 self.inner.expiration_policy.time_to_live(),
356 self.inner.expiration_policy.time_to_idle(),
357 now,
358 self.inner.clock(),
359 );
360 }
361
362 entry.set_last_accessed(now);
363
364 let v = entry.value.clone();
365 let op = ReadOp::Hit {
366 value_entry: entry,
367 is_expiry_modified,
368 };
369 read_recorder(op, now);
370 Some(Entry::new(maybe_key, v, false, false))
371 } else {
372 read_recorder(ReadOp::Miss(hash), now);
373 None
374 }
375 }
376
377 pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
378 where
379 K: Borrow<Q>,
380 Q: Hash + Eq + ?Sized,
381 {
382 self.inner
383 .get_key_value_and(key, hash, |k, _entry| Arc::clone(k))
384 }
385
386 #[inline]
387 pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
388 where
389 K: Borrow<Q>,
390 Q: Hash + Eq + ?Sized,
391 {
392 self.inner.remove_entry(key, hash)
393 }
394
395 #[inline]
396 pub(crate) fn apply_reads_writes_if_needed(
397 inner: &impl InnerSync,
398 ch: &Sender<WriteOp<K, V>>,
399 now: Instant,
400 housekeeper: Option<&HouseKeeperArc>,
401 ) {
402 let w_len = ch.len();
403
404 if let Some(hk) = housekeeper {
405 if Self::should_apply_writes(hk, w_len, now) {
406 hk.try_run_pending_tasks(inner);
407 }
408 }
409 }
410
411 pub(crate) fn invalidate_all(&self) {
412 let now = self.current_time();
413 self.inner.set_valid_after(now);
414 }
415
416 pub(crate) fn invalidate_entries_if(
417 &self,
418 predicate: PredicateFun<K, V>,
419 ) -> Result<PredicateId, PredicateError> {
420 let now = self.current_time();
421 self.inner.register_invalidation_predicate(predicate, now)
422 }
423}
424
425impl<K, V, S> ScanningGet<K, V> for BaseCache<K, V, S>
429where
430 K: Hash + Eq + Send + Sync + 'static,
431 V: Clone + Send + Sync + 'static,
432 S: BuildHasher + Clone + Send + Sync + 'static,
433{
434 fn num_cht_segments(&self) -> usize {
435 self.inner.num_cht_segments()
436 }
437
438 fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
439 let hash = self.hash(key);
440 self.inner.get_key_value_and_then(key, hash, |k, entry| {
441 let i = &self.inner;
442 let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
443 let now = self.current_time();
444
445 if is_expired_by_per_entry_ttl(entry.entry_info(), now)
446 || is_expired_entry_wo(ttl, va, entry, now)
447 || is_expired_entry_ao(tti, va, entry, now)
448 || i.is_invalidated_entry(k, entry)
449 {
450 None
452 } else {
453 Some(entry.value.clone())
455 }
456 })
457 }
458
459 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
460 self.inner.keys(cht_segment)
461 }
462}
463
464impl<K, V, S> BaseCache<K, V, S>
468where
469 K: Hash + Eq + Send + Sync + 'static,
470 V: Clone + Send + Sync + 'static,
471 S: BuildHasher + Clone + Send + Sync + 'static,
472{
473 #[inline]
474 fn record_read_op(
475 &self,
476 op: ReadOp<K, V>,
477 now: Instant,
478 ) -> Result<(), TrySendError<ReadOp<K, V>>> {
479 self.apply_reads_if_needed(&self.inner, now);
480 let ch = &self.read_op_ch;
481 match ch.try_send(op) {
482 Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
484 Err(e @ TrySendError::Disconnected(_)) => Err(e),
485 }
486 }
487
488 #[inline]
489 pub(crate) fn do_insert_with_hash(
490 &self,
491 key: Arc<K>,
492 hash: u64,
493 value: V,
494 ) -> (WriteOp<K, V>, Instant) {
495 let weight = self.inner.weigh(&key, &value);
496 let op_cnt1 = Rc::new(AtomicU8::new(0));
497 let op_cnt2 = Rc::clone(&op_cnt1);
498 let mut op1 = None;
499 let mut op2 = None;
500
501 let kl = self.maybe_key_lock(&key);
503 let _klg = &kl.as_ref().map(|kl| kl.lock());
504
505 let ts = self.current_time();
506
507 self.inner.cache.insert_with_or_modify(
520 Arc::clone(&key),
521 hash,
522 || {
524 let (entry, gen) = self.new_value_entry(&key, hash, value.clone(), ts, weight);
525 let ins_op = WriteOp::new_upsert(&key, hash, &entry, gen, 0, weight);
526 let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
527 op1 = Some((cnt, ins_op));
528 entry
529 },
530 |_k, old_entry| {
532 let old_weight = old_entry.policy_weight();
533
534 let old_info = OldEntryInfo::new(old_entry);
538 let (entry, gen) = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
539 let upd_op = WriteOp::new_upsert(&key, hash, &entry, gen, old_weight, weight);
540 let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
541 op2 = Some((cnt, old_info, upd_op));
542 entry
543 },
544 );
545
546 match (op1, op2) {
547 (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op),
548 (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
549 self.do_post_insert_steps(ts, &key, ins_op)
550 }
551 (_, Some((_cnt, old_info, upd_op))) => {
552 self.do_post_update_steps(ts, key, old_info, upd_op)
553 }
554 (None, None) => unreachable!(),
555 }
556 }
557
558 fn do_post_insert_steps(
559 &self,
560 ts: Instant,
561 key: &Arc<K>,
562 ins_op: WriteOp<K, V>,
563 ) -> (WriteOp<K, V>, Instant) {
564 if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
565 (&self.inner.expiration_policy.expiry(), &ins_op)
566 {
567 Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock());
568 }
569 (ins_op, ts)
570 }
571
572 fn do_post_update_steps(
573 &self,
574 ts: Instant,
575 key: Arc<K>,
576 old_info: OldEntryInfo<K, V>,
577 upd_op: WriteOp<K, V>,
578 ) -> (WriteOp<K, V>, Instant) {
579 if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
580 (&self.inner.expiration_policy.expiry(), &upd_op)
581 {
582 Self::expire_after_read_or_update(
583 |k, v, t, d| expiry.expire_after_update(k, v, t, d),
584 &key,
585 value_entry,
586 self.inner.expiration_policy.time_to_live(),
587 self.inner.expiration_policy.time_to_idle(),
588 ts,
589 self.inner.clock(),
590 );
591 }
592
593 if self.is_removal_notifier_enabled() {
594 self.inner.notify_upsert(
595 key,
596 &old_info.entry,
597 old_info.last_accessed,
598 old_info.last_modified,
599 );
600 }
601 crossbeam_epoch::pin().flush();
602 (upd_op, ts)
603 }
604
605 #[inline]
606 fn apply_reads_if_needed(&self, inner: &Inner<K, V, S>, now: Instant) {
607 let len = self.read_op_ch.len();
608
609 if let Some(hk) = &self.housekeeper {
610 if Self::should_apply_reads(hk, len, now) {
611 hk.try_run_pending_tasks(inner);
612 }
613 }
614 }
615
616 #[inline]
617 fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
618 hk.should_apply_reads(ch_len, now)
619 }
620
621 #[inline]
622 fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
623 hk.should_apply_writes(ch_len, now)
624 }
625}
626
627impl<K, V, S> BaseCache<K, V, S> {
628 #[inline]
629 fn new_value_entry(
630 &self,
631 key: &Arc<K>,
632 hash: u64,
633 value: V,
634 timestamp: Instant,
635 policy_weight: u32,
636 ) -> (MiniArc<ValueEntry<K, V>>, u16) {
637 let key_hash = KeyHash::new(Arc::clone(key), hash);
638 let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight));
639 let gen: u16 = info.entry_gen();
640 (MiniArc::new(ValueEntry::new(value, info)), gen)
641 }
642
643 #[inline]
644 fn new_value_entry_from(
645 &self,
646 value: V,
647 timestamp: Instant,
648 policy_weight: u32,
649 other: &ValueEntry<K, V>,
650 ) -> (MiniArc<ValueEntry<K, V>>, u16) {
651 let info = MiniArc::clone(other.entry_info());
652 let gen = info.incr_entry_gen();
655 info.set_last_accessed(timestamp);
656 info.set_last_modified(timestamp);
657 info.set_policy_weight(policy_weight);
658 (MiniArc::new(ValueEntry::new_from(value, info, other)), gen)
659 }
660
661 fn expire_after_create(
662 expiry: &Arc<dyn Expiry<K, V> + Send + Sync + 'static>,
663 key: &K,
664 value_entry: &ValueEntry<K, V>,
665 ts: Instant,
666 clock: &Clock,
667 ) {
668 let duration =
669 expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts));
670 let expiration_time = duration.map(|duration| ts.saturating_add(duration));
671 value_entry
672 .entry_info()
673 .set_expiration_time(expiration_time);
674 }
675
676 fn expire_after_read_or_update(
677 expiry: impl FnOnce(&K, &V, StdInstant, Option<Duration>) -> Option<Duration>,
678 key: &K,
679 value_entry: &ValueEntry<K, V>,
680 ttl: Option<Duration>,
681 tti: Option<Duration>,
682 ts: Instant,
683 clock: &Clock,
684 ) -> bool {
685 let current_time = clock.to_std_instant(ts);
686 let ei = &value_entry.entry_info();
687
688 let exp_time = IntoIterator::into_iter([
689 ei.expiration_time(),
690 ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))),
691 tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))),
692 ])
693 .flatten()
694 .min();
695
696 let current_duration = exp_time.and_then(|time| {
697 let std_time = clock.to_std_instant(time);
698 std_time.checked_duration_since(current_time)
699 });
700
701 let duration = expiry(key, &value_entry.value, current_time, current_duration);
702
703 if duration != current_duration {
704 let expiration_time = duration.map(|duration| ts.saturating_add(duration));
705 value_entry
706 .entry_info()
707 .set_expiration_time(expiration_time);
708 true
710 } else {
711 false
712 }
713 }
714}
715
716#[cfg(test)]
720impl<K, V, S> BaseCache<K, V, S>
721where
722 K: Hash + Eq + Send + Sync + 'static,
723 V: Clone + Send + Sync + 'static,
724 S: BuildHasher + Clone + Send + Sync + 'static,
725{
726 pub(crate) fn invalidation_predicate_count(&self) -> usize {
727 self.inner.invalidation_predicate_count()
728 }
729
730 pub(crate) fn reconfigure_for_testing(&mut self) {
731 self.inner.enable_frequency_sketch_for_testing();
733 if let Some(hk) = &self.housekeeper {
735 hk.disable_auto_run();
736 }
737 }
738
739 pub(crate) fn key_locks_map_is_empty(&self) -> bool {
740 self.inner.key_locks_map_is_empty()
741 }
742}
743
744struct EvictionState<'a, K, V> {
745 counters: EvictionCounters,
746 notifier: Option<&'a RemovalNotifier<K, V>>,
747 more_entries_to_evict: bool,
748}
749
750impl<'a, K, V> EvictionState<'a, K, V> {
751 fn new(
752 entry_count: u64,
753 weighted_size: u64,
754 notifier: Option<&'a RemovalNotifier<K, V>>,
755 ) -> Self {
756 Self {
757 counters: EvictionCounters::new(entry_count, weighted_size),
758 notifier,
759 more_entries_to_evict: false,
760 }
761 }
762
763 fn is_notifier_enabled(&self) -> bool {
764 self.notifier.is_some()
765 }
766
767 fn notify_entry_removal(
768 &mut self,
769 key: Arc<K>,
770 entry: &MiniArc<ValueEntry<K, V>>,
771 cause: RemovalCause,
772 ) where
773 K: Send + Sync + 'static,
774 V: Clone + Send + Sync + 'static,
775 {
776 if let Some(notifier) = self.notifier {
777 notifier.notify(key, entry.value.clone(), cause);
778 } else {
779 panic!("notify_entry_removal is called when the notification is disabled");
780 }
781 }
782}
783
784struct EvictionCounters {
785 entry_count: u64,
786 weighted_size: u64,
787 eviction_count: u64,
788}
789
790impl EvictionCounters {
791 #[inline]
792 fn new(entry_count: u64, weighted_size: u64) -> Self {
793 Self {
794 entry_count,
795 weighted_size,
796 eviction_count: 0,
797 }
798 }
799
800 #[inline]
801 fn saturating_add(&mut self, entry_count: u64, weight: u32) {
802 self.entry_count += entry_count;
803 let total = &mut self.weighted_size;
804 *total = total.saturating_add(weight as u64);
805 }
806
807 #[inline]
808 fn saturating_sub(&mut self, entry_count: u64, weight: u32) {
809 self.entry_count -= entry_count;
810 let total = &mut self.weighted_size;
811 *total = total.saturating_sub(weight as u64);
812 }
813
814 #[inline]
815 fn incr_eviction_count(&mut self) {
816 let count = &mut self.eviction_count;
817 *count = count.saturating_add(1);
818 }
819}
820
821#[derive(Default)]
822struct EntrySizeAndFrequency {
823 policy_weight: u64,
824 freq: u32,
825}
826
827impl EntrySizeAndFrequency {
828 fn new(policy_weight: u32) -> Self {
829 Self {
830 policy_weight: policy_weight as u64,
831 ..Default::default()
832 }
833 }
834
835 fn add_policy_weight(&mut self, weight: u32) {
836 self.policy_weight += weight as u64;
837 }
838
839 fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) {
840 self.freq += freq.frequency(hash) as u32;
841 }
842}
843
844#[allow(clippy::large_enum_variant)]
852enum AdmissionResult<K> {
853 Admitted {
854 victim_keys: SmallVec<[(KeyHash<K>, Option<Instant>); 8]>,
856 },
857 Rejected,
858}
859
860type CacheStore<K, V, S> = crate::cht::SegmentedHashMap<Arc<K>, MiniArc<ValueEntry<K, V>>, S>;
861
862pub(crate) struct Inner<K, V, S> {
863 name: Option<String>,
864 max_capacity: Option<u64>,
865 entry_count: AtomicCell<u64>,
866 weighted_size: AtomicCell<u64>,
867 pub(crate) cache: CacheStore<K, V, S>,
868 build_hasher: S,
869 deques: Mutex<Deques<K>>,
870 timer_wheel: Mutex<TimerWheel<K>>,
871 frequency_sketch: RwLock<FrequencySketch>,
872 frequency_sketch_enabled: AtomicBool,
873 read_op_ch: Receiver<ReadOp<K, V>>,
874 write_op_ch: Receiver<WriteOp<K, V>>,
875 eviction_policy: EvictionPolicyConfig,
876 expiration_policy: ExpirationPolicy<K, V>,
877 valid_after: AtomicInstant,
878 weigher: Option<Weigher<K, V>>,
879 removal_notifier: Option<RemovalNotifier<K, V>>,
880 key_locks: Option<KeyLockMap<K, S>>,
881 invalidator: Option<Invalidator<K, V, S>>,
882 clock: Clock,
883}
884
885impl<K, V, S> Drop for Inner<K, V, S> {
886 fn drop(&mut self) {
887 for _ in 0..128 {
890 crossbeam_epoch::pin().flush();
891 }
892
893 }
898}
899
900impl<K, V, S> Inner<K, V, S> {
905 fn name(&self) -> Option<&str> {
906 self.name.as_deref()
907 }
908
909 fn policy(&self) -> Policy {
910 let exp = &self.expiration_policy;
911 Policy::new(self.max_capacity, 1, exp.time_to_live(), exp.time_to_idle())
912 }
913
914 #[inline]
915 fn entry_count(&self) -> u64 {
916 self.entry_count.load()
917 }
918
919 #[inline]
920 fn weighted_size(&self) -> u64 {
921 self.weighted_size.load()
922 }
923
924 #[inline]
925 pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
926 self.removal_notifier.is_some()
927 }
928
929 pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
930 where
931 K: Hash + Eq,
932 S: BuildHasher,
933 {
934 self.key_locks.as_ref().map(|kls| kls.key_lock(key))
935 }
936
937 #[inline]
938 fn current_time(&self) -> Instant {
939 self.clock.now()
940 }
941
942 fn clock(&self) -> &Clock {
943 &self.clock
944 }
945
946 fn num_cht_segments(&self) -> usize {
947 self.cache.actual_num_segments()
948 }
949
950 #[inline]
951 fn time_to_live(&self) -> Option<Duration> {
952 self.expiration_policy.time_to_live()
953 }
954
955 #[inline]
956 fn time_to_idle(&self) -> Option<Duration> {
957 self.expiration_policy.time_to_idle()
958 }
959
960 #[inline]
961 fn has_expiry(&self) -> bool {
962 let exp = &self.expiration_policy;
963 exp.time_to_live().is_some() || exp.time_to_idle().is_some()
964 }
965
966 #[inline]
967 fn is_write_order_queue_enabled(&self) -> bool {
968 self.expiration_policy.time_to_live().is_some() || self.invalidator.is_some()
969 }
970
971 #[inline]
972 fn valid_after(&self) -> Option<Instant> {
973 self.valid_after.instant()
974 }
975
976 #[inline]
977 fn set_valid_after(&self, timestamp: Instant) {
978 self.valid_after.set_instant(timestamp);
979 }
980
981 #[inline]
982 fn has_valid_after(&self) -> bool {
983 self.valid_after.is_set()
984 }
985}
986
987impl<K, V, S> Inner<K, V, S>
988where
989 K: Hash + Eq + Send + Sync + 'static,
990 V: Send + Sync + 'static,
991 S: BuildHasher + Clone,
992{
993 #[allow(clippy::too_many_arguments)]
996 fn new(
997 name: Option<String>,
998 max_capacity: Option<u64>,
999 initial_capacity: Option<usize>,
1000 build_hasher: S,
1001 weigher: Option<Weigher<K, V>>,
1002 eviction_policy: EvictionPolicy,
1003 eviction_listener: Option<EvictionListener<K, V>>,
1004 read_op_ch: Receiver<ReadOp<K, V>>,
1005 write_op_ch: Receiver<WriteOp<K, V>>,
1006 expiration_policy: ExpirationPolicy<K, V>,
1007 invalidator_enabled: bool,
1008 clock: Clock,
1009 ) -> Self {
1010 let (num_segments, initial_capacity) = if max_capacity == Some(0) {
1013 (1, 0)
1014 } else {
1015 let ic = initial_capacity
1016 .map(|cap| cap + WRITE_LOG_CH_SIZE)
1017 .unwrap_or_default();
1018 (64, ic)
1019 };
1020 let cache = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
1021 num_segments,
1022 initial_capacity,
1023 build_hasher.clone(),
1024 );
1025
1026 let now = clock.now();
1027 let timer_wheel = Mutex::new(TimerWheel::new(now));
1028
1029 let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener {
1030 let rn = RemovalNotifier::new(listener, name.clone());
1031 let kl = KeyLockMap::with_hasher(build_hasher.clone());
1032 (Some(rn), Some(kl))
1033 } else {
1034 (None, None)
1035 };
1036
1037 let invalidator = if invalidator_enabled {
1038 Some(Invalidator::new(build_hasher.clone()))
1039 } else {
1040 None
1041 };
1042
1043 Self {
1044 name,
1045 max_capacity,
1046 entry_count: AtomicCell::default(),
1047 weighted_size: AtomicCell::default(),
1048 cache,
1049 build_hasher,
1050 deques: Mutex::default(),
1051 timer_wheel,
1052 frequency_sketch: RwLock::new(FrequencySketch::default()),
1053 frequency_sketch_enabled: AtomicBool::default(),
1054 read_op_ch,
1055 write_op_ch,
1056 eviction_policy: eviction_policy.config,
1057 expiration_policy,
1058 valid_after: AtomicInstant::default(),
1059 weigher,
1060 removal_notifier,
1061 key_locks,
1062 invalidator,
1063 clock,
1064 }
1065 }
1066
1067 #[inline]
1068 fn hash<Q>(&self, key: &Q) -> u64
1069 where
1070 K: Borrow<Q>,
1071 Q: Hash + Eq + ?Sized,
1072 {
1073 let mut hasher = self.build_hasher.build_hasher();
1074 key.hash(&mut hasher);
1075 hasher.finish()
1076 }
1077
1078 #[inline]
1079 fn get_key_value_and<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1080 where
1081 K: Borrow<Q>,
1082 Q: Hash + Eq + ?Sized,
1083 F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> T,
1084 {
1085 self.cache
1086 .get_key_value_and(hash, |k| (k as &K).borrow() == key, with_entry)
1087 }
1088
1089 #[inline]
1090 fn get_key_value_and_then<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1091 where
1092 K: Borrow<Q>,
1093 Q: Hash + Eq + ?Sized,
1094 F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> Option<T>,
1095 {
1096 self.cache
1097 .get_key_value_and_then(hash, |k| (k as &K).borrow() == key, with_entry)
1098 }
1099
1100 #[inline]
1101 fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
1102 where
1103 K: Borrow<Q>,
1104 Q: Hash + Eq + ?Sized,
1105 {
1106 self.cache
1107 .remove_entry(hash, |k| (k as &K).borrow() == key)
1108 .map(|(key, entry)| KvEntry::new(key, entry))
1109 }
1110
1111 fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1112 self.cache.keys(cht_segment, Arc::clone)
1118 }
1119
1120 #[inline]
1121 fn register_invalidation_predicate(
1122 &self,
1123 predicate: PredicateFun<K, V>,
1124 registered_at: Instant,
1125 ) -> Result<PredicateId, PredicateError> {
1126 if let Some(inv) = &self.invalidator {
1127 inv.register_predicate(predicate, registered_at)
1128 } else {
1129 Err(PredicateError::InvalidationClosuresDisabled)
1130 }
1131 }
1132
1133 #[inline]
1135 fn is_invalidated_entry(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
1136 where
1137 V: Clone,
1138 {
1139 if let Some(inv) = &self.invalidator {
1140 return inv.apply_predicates(key, entry);
1141 }
1142 false
1143 }
1144
1145 #[inline]
1146 fn weigh(&self, key: &K, value: &V) -> u32 {
1147 self.weigher.as_ref().map_or(1, |w| w(key, value))
1148 }
1149}
1150
1151impl<K, V, S> InnerSync for Inner<K, V, S>
1152where
1153 K: Hash + Eq + Send + Sync + 'static,
1154 V: Clone + Send + Sync + 'static,
1155 S: BuildHasher + Clone + Send + Sync + 'static,
1156{
1157 fn run_pending_tasks(
1158 &self,
1159 timeout: Option<Duration>,
1160 max_log_sync_repeats: u32,
1161 eviction_batch_size: u32,
1162 ) -> bool {
1163 self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size)
1164 }
1165
1166 fn now(&self) -> Instant {
1167 self.current_time()
1168 }
1169}
1170
1171impl<K, V, S> Inner<K, V, S>
1172where
1173 K: Hash + Eq + Send + Sync + 'static,
1174 V: Clone + Send + Sync + 'static,
1175 S: BuildHasher + Clone + Send + Sync + 'static,
1176{
1177 fn do_run_pending_tasks(
1178 &self,
1179 timeout: Option<Duration>,
1180 max_log_sync_repeats: u32,
1181 eviction_batch_size: u32,
1182 ) -> bool {
1183 if self.max_capacity == Some(0) {
1184 return false;
1185 }
1186
1187 let mut deqs = self.deques.lock();
1189 let mut timer_wheel = self.timer_wheel.lock();
1190
1191 let started_at = if timeout.is_some() {
1192 Some(self.current_time())
1193 } else {
1194 None
1195 };
1196 let mut should_process_logs = true;
1197 let mut calls = 0u32;
1198 let current_ec = self.entry_count.load();
1199 let current_ws = self.weighted_size.load();
1200 let mut eviction_state =
1201 EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref());
1202
1203 loop {
1204 if should_process_logs {
1205 let r_len = self.read_op_ch.len();
1206 if r_len > 0 {
1207 self.apply_reads(&mut deqs, &mut timer_wheel, r_len);
1208 }
1209
1210 let w_len = self.write_op_ch.len();
1211 if w_len > 0 {
1212 self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state);
1213 }
1214
1215 if self.eviction_policy == EvictionPolicyConfig::TinyLfu
1216 && self.should_enable_frequency_sketch(&eviction_state.counters)
1217 {
1218 self.enable_frequency_sketch(&eviction_state.counters);
1219 }
1220
1221 calls += 1;
1222 }
1223
1224 eviction_state.more_entries_to_evict = false;
1228 let last_eviction_count = eviction_state.counters.eviction_count;
1229
1230 if timer_wheel.is_enabled() {
1233 self.evict_expired_entries_using_timers(
1234 &mut timer_wheel,
1235 &mut deqs,
1236 &mut eviction_state,
1237 );
1238 }
1239
1240 if self.has_expiry() || self.has_valid_after() {
1243 self.evict_expired_entries_using_deqs(
1244 &mut deqs,
1245 &mut timer_wheel,
1246 eviction_batch_size,
1247 &mut eviction_state,
1248 );
1249 }
1250
1251 if let Some(invalidator) = &self.invalidator {
1254 if !invalidator.is_empty() {
1255 self.invalidate_entries(
1256 invalidator,
1257 &mut deqs,
1258 &mut timer_wheel,
1259 eviction_batch_size,
1260 &mut eviction_state,
1261 );
1262 }
1263 }
1264
1265 let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
1267 if weights_to_evict > 0 {
1268 self.evict_lru_entries(
1269 &mut deqs,
1270 &mut timer_wheel,
1271 eviction_batch_size,
1272 weights_to_evict,
1273 &mut eviction_state,
1274 );
1275 }
1276
1277 should_process_logs = calls <= max_log_sync_repeats
1280 && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
1281 || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT);
1282
1283 let should_evict_more_entries = eviction_state.more_entries_to_evict
1284 && (eviction_state.counters.eviction_count - last_eviction_count) > 0;
1286
1287 if !should_process_logs && !should_evict_more_entries {
1289 break;
1290 }
1291
1292 if let (Some(to), Some(started)) = (timeout, started_at) {
1295 let elapsed = self.current_time().saturating_duration_since(started);
1296 if elapsed >= to {
1297 break;
1298 }
1299 }
1300 }
1301
1302 debug_assert_eq!(self.entry_count.load(), current_ec);
1303 debug_assert_eq!(self.weighted_size.load(), current_ws);
1304 self.entry_count.store(eviction_state.counters.entry_count);
1305 self.weighted_size
1306 .store(eviction_state.counters.weighted_size);
1307
1308 crossbeam_epoch::pin().flush();
1309
1310 drop(deqs);
1312
1313 eviction_state.more_entries_to_evict
1314 }
1315}
1316
1317impl<K, V, S> Inner<K, V, S>
1321where
1322 K: Hash + Eq + Send + Sync + 'static,
1323 V: Send + Sync + 'static,
1324 S: BuildHasher + Clone + Send + Sync + 'static,
1325{
1326 fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool {
1327 self.max_capacity.map_or(true, |limit| {
1328 counters.weighted_size + candidate_weight as u64 <= limit
1329 })
1330 }
1331
1332 fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 {
1333 self.max_capacity
1334 .map(|limit| counters.weighted_size.saturating_sub(limit))
1335 .unwrap_or_default()
1336 }
1337
1338 #[inline]
1339 fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
1340 match self.max_capacity {
1341 None | Some(0) => false,
1342 Some(max_cap) => {
1343 if self.frequency_sketch_enabled.load(Ordering::Acquire) {
1344 false } else {
1346 counters.weighted_size >= max_cap / 2
1347 }
1348 }
1349 }
1350 }
1351
1352 #[inline]
1353 fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
1354 if let Some(max_cap) = self.max_capacity {
1355 let c = counters;
1356 let cap = if self.weigher.is_none() {
1357 max_cap
1358 } else {
1359 (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
1360 };
1361 self.do_enable_frequency_sketch(cap);
1362 }
1363 }
1364
1365 #[cfg(test)]
1366 fn enable_frequency_sketch_for_testing(&self) {
1367 if let Some(max_cap) = self.max_capacity {
1368 self.do_enable_frequency_sketch(max_cap);
1369 }
1370 }
1371
1372 #[inline]
1373 fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
1374 let skt_capacity = common::sketch_capacity(cache_capacity);
1375 self.frequency_sketch.write().ensure_capacity(skt_capacity);
1376 self.frequency_sketch_enabled.store(true, Ordering::Release);
1377 }
1378
1379 fn apply_reads(&self, deqs: &mut Deques<K>, timer_wheel: &mut TimerWheel<K>, count: usize) {
1380 use ReadOp::{Hit, Miss};
1381 let mut freq = self.frequency_sketch.write();
1382 let ch = &self.read_op_ch;
1383 for _ in 0..count {
1384 match ch.try_recv() {
1385 Ok(Hit {
1386 value_entry,
1387 is_expiry_modified,
1388 }) => {
1389 let kh = value_entry.entry_info().key_hash();
1390 freq.increment(kh.hash);
1391 if is_expiry_modified {
1392 self.update_timer_wheel(&value_entry, timer_wheel);
1393 }
1394 deqs.move_to_back_ao(&value_entry);
1395 }
1396 Ok(Miss(hash)) => freq.increment(hash),
1397 Err(_) => break,
1398 }
1399 }
1400 }
1401
1402 fn apply_writes(
1403 &self,
1404 deqs: &mut Deques<K>,
1405 timer_wheel: &mut TimerWheel<K>,
1406 count: usize,
1407 eviction_state: &mut EvictionState<'_, K, V>,
1408 ) where
1409 V: Clone,
1410 {
1411 use WriteOp::{Remove, Upsert};
1412 let freq = self.frequency_sketch.read();
1413 let ch = &self.write_op_ch;
1414
1415 for _ in 0..count {
1416 match ch.try_recv() {
1417 Ok(Upsert {
1418 key_hash: kh,
1419 value_entry: entry,
1420 entry_gen: gen,
1421 old_weight,
1422 new_weight,
1423 }) => self.handle_upsert(
1424 kh,
1425 entry,
1426 gen,
1427 old_weight,
1428 new_weight,
1429 deqs,
1430 timer_wheel,
1431 &freq,
1432 eviction_state,
1433 ),
1434 Ok(Remove {
1435 kv_entry: KvEntry { key: _key, entry },
1436 entry_gen: gen,
1437 }) => {
1438 Self::handle_remove(
1439 deqs,
1440 timer_wheel,
1441 entry,
1442 Some(gen),
1443 &mut eviction_state.counters,
1444 );
1445 }
1446 Err(_) => break,
1447 };
1448 }
1449 }
1450
1451 #[allow(clippy::too_many_arguments)]
1452 fn handle_upsert(
1453 &self,
1454 kh: KeyHash<K>,
1455 entry: MiniArc<ValueEntry<K, V>>,
1456 gen: u16,
1457 old_weight: u32,
1458 new_weight: u32,
1459 deqs: &mut Deques<K>,
1460 timer_wheel: &mut TimerWheel<K>,
1461 freq: &FrequencySketch,
1462 eviction_state: &mut EvictionState<'_, K, V>,
1463 ) where
1464 V: Clone,
1465 {
1466 {
1467 let counters = &mut eviction_state.counters;
1468
1469 if entry.is_admitted() {
1470 counters.saturating_sub(0, old_weight);
1472 counters.saturating_add(0, new_weight);
1473 self.update_timer_wheel(&entry, timer_wheel);
1474 deqs.move_to_back_ao(&entry);
1475 deqs.move_to_back_wo(&entry);
1476 entry.entry_info().set_policy_gen(gen);
1477 return;
1478 }
1479
1480 if self.has_enough_capacity(new_weight, counters) {
1481 self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters);
1484 entry.entry_info().set_policy_gen(gen);
1485 return;
1486 }
1487 }
1488
1489 if let Some(max) = self.max_capacity {
1490 if new_weight as u64 > max {
1491 let kl = self.maybe_key_lock(&kh.key);
1495 let _klg = &kl.as_ref().map(|kl| kl.lock());
1496
1497 let removed = self.cache.remove_if(
1498 kh.hash,
1499 |k| k == &kh.key,
1500 |_, current_entry| {
1501 MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1502 && current_entry.entry_info().entry_gen() == gen
1503 },
1504 );
1505 if let Some(entry) = removed {
1506 if eviction_state.is_notifier_enabled() {
1507 let key = Arc::clone(&kh.key);
1508 eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
1509 }
1510 eviction_state.counters.incr_eviction_count();
1511 }
1512 entry.entry_info().set_policy_gen(gen);
1513 return;
1514 }
1515 }
1516
1517 let admission_result = match &self.eviction_policy {
1522 EvictionPolicyConfig::TinyLfu => {
1523 let mut candidate = EntrySizeAndFrequency::new(new_weight);
1524 candidate.add_frequency(freq, kh.hash);
1525 Self::admit(&candidate, &self.cache, deqs, freq)
1526 }
1527 EvictionPolicyConfig::Lru => AdmissionResult::Admitted {
1528 victim_keys: SmallVec::default(),
1529 },
1530 };
1531
1532 match admission_result {
1533 AdmissionResult::Admitted { victim_keys } => {
1534 for (vic_kh, vic_la) in victim_keys {
1536 let vic_key = vic_kh.key;
1537 let vic_hash = vic_kh.hash;
1538
1539 let kl = self.maybe_key_lock(&vic_key);
1541 let _klg = &kl.as_ref().map(|kl| kl.lock());
1542
1543 if let Some((vic_key, vic_entry)) = self.cache.remove_entry_if_and(
1544 vic_hash,
1545 |k| k == &vic_key,
1546 |_, entry| entry.entry_info().last_accessed() == vic_la,
1547 |k, v| (k.clone(), v.clone()),
1548 ) {
1549 if eviction_state.is_notifier_enabled() {
1550 eviction_state.notify_entry_removal(
1551 vic_key,
1552 &vic_entry,
1553 RemovalCause::Size,
1554 );
1555 }
1556 eviction_state.counters.incr_eviction_count();
1557 Self::handle_remove(
1559 deqs,
1560 timer_wheel,
1561 vic_entry,
1562 None,
1563 &mut eviction_state.counters,
1564 );
1565 } else {
1566 if let Some(node) = deqs.probation.peek_front() {
1569 if node.element.key() == &vic_key && node.element.hash() == vic_hash {
1570 deqs.probation.move_front_to_back();
1571 }
1572 }
1573 }
1574 }
1575
1576 self.handle_admit(
1578 &entry,
1579 new_weight,
1580 deqs,
1581 timer_wheel,
1582 &mut eviction_state.counters,
1583 );
1584 entry.entry_info().set_policy_gen(gen);
1585 }
1586 AdmissionResult::Rejected => {
1587 let kl = self.maybe_key_lock(&kh.key);
1589 let _klg = &kl.as_ref().map(|kl| kl.lock());
1590
1591 let key = Arc::clone(&kh.key);
1594 let removed = self.cache.remove_if(
1595 kh.hash,
1596 |k| k == &key,
1597 |_, current_entry| {
1598 MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1599 && current_entry.entry_info().entry_gen() == gen
1600 },
1601 );
1602
1603 if let Some(entry) = removed {
1604 entry.entry_info().set_policy_gen(gen);
1605 if eviction_state.is_notifier_enabled() {
1606 eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
1607 }
1608 eviction_state.counters.incr_eviction_count();
1609 }
1610 }
1611 };
1612 }
1613
1614 #[inline]
1632 fn admit(
1633 candidate: &EntrySizeAndFrequency,
1634 cache: &CacheStore<K, V, S>,
1635 deqs: &mut Deques<K>,
1636 freq: &FrequencySketch,
1637 ) -> AdmissionResult<K> {
1638 const MAX_CONSECUTIVE_RETRIES: usize = 5;
1639 let mut retries = 0;
1640
1641 let mut victims = EntrySizeAndFrequency::default();
1642 let mut victim_keys = SmallVec::default();
1643
1644 let deq = &mut deqs.probation;
1645
1646 let mut next_victim = deq.peek_front_ptr();
1648
1649 while victims.policy_weight < candidate.policy_weight
1651 && victims.freq <= candidate.freq
1652 && retries <= MAX_CONSECUTIVE_RETRIES
1653 {
1654 let Some(victim) = next_victim.take() else {
1655 break;
1657 };
1658 next_victim = DeqNode::next_node_ptr(victim);
1659
1660 let vic_elem = &unsafe { victim.as_ref() }.element;
1661 if vic_elem.is_dirty() {
1662 unsafe { deq.move_to_back(victim) };
1664 retries += 1;
1665 continue;
1666 }
1667
1668 let key = vic_elem.key();
1669 let hash = vic_elem.hash();
1670 let last_accessed = vic_elem.entry_info().last_accessed();
1671
1672 if let Some(vic_entry) = cache.get(hash, |k| k == key) {
1673 victims.add_policy_weight(vic_entry.policy_weight());
1674 victims.add_frequency(freq, hash);
1675 victim_keys.push((KeyHash::new(Arc::clone(key), hash), last_accessed));
1676 retries = 0;
1677 } else {
1678 unsafe { deq.move_to_back(victim) };
1682 retries += 1;
1683 }
1684 }
1685
1686 if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
1692 AdmissionResult::Admitted { victim_keys }
1693 } else {
1694 AdmissionResult::Rejected
1695 }
1696 }
1697
1698 fn handle_admit(
1699 &self,
1700 entry: &MiniArc<ValueEntry<K, V>>,
1701 policy_weight: u32,
1702 deqs: &mut Deques<K>,
1703 timer_wheel: &mut TimerWheel<K>,
1704 counters: &mut EvictionCounters,
1705 ) {
1706 counters.saturating_add(1, policy_weight);
1707
1708 self.update_timer_wheel(entry, timer_wheel);
1709
1710 deqs.push_back_ao(
1712 CacheRegion::MainProbation,
1713 KeyHashDate::new(entry.entry_info()),
1714 entry,
1715 );
1716 if self.is_write_order_queue_enabled() {
1717 deqs.push_back_wo(KeyHashDate::new(entry.entry_info()), entry);
1718 }
1719 entry.set_admitted(true);
1720 }
1721
1722 fn update_timer_wheel(
1724 &self,
1725 entry: &MiniArc<ValueEntry<K, V>>,
1726 timer_wheel: &mut TimerWheel<K>,
1727 ) {
1728 if entry.entry_info().expiration_time().is_some() && !timer_wheel.is_enabled() {
1730 timer_wheel.enable();
1731 }
1732
1733 match (
1735 entry.entry_info().expiration_time().is_some(),
1736 entry.timer_node(),
1737 ) {
1738 (false, None) => (),
1741 (true, None) => {
1744 let timer = timer_wheel.schedule(
1745 MiniArc::clone(entry.entry_info()),
1746 MiniArc::clone(entry.deq_nodes()),
1747 );
1748 entry.set_timer_node(timer);
1749 }
1750 (true, Some(tn)) => {
1753 let result = timer_wheel.reschedule(tn);
1754 if let ReschedulingResult::Removed(removed_tn) = result {
1755 entry.set_timer_node(None);
1759 drop(removed_tn);
1760 }
1761 }
1762 (false, Some(tn)) => {
1765 entry.set_timer_node(None);
1766 timer_wheel.deschedule(tn);
1767 }
1768 }
1769 }
1770
1771 fn handle_remove(
1772 deqs: &mut Deques<K>,
1773 timer_wheel: &mut TimerWheel<K>,
1774 entry: MiniArc<ValueEntry<K, V>>,
1775 gen: Option<u16>,
1776 counters: &mut EvictionCounters,
1777 ) {
1778 if let Some(timer_node) = entry.take_timer_node() {
1779 timer_wheel.deschedule(timer_node);
1780 }
1781 Self::handle_remove_without_timer_wheel(deqs, entry, gen, counters);
1782 }
1783
1784 fn handle_remove_without_timer_wheel(
1785 deqs: &mut Deques<K>,
1786 entry: MiniArc<ValueEntry<K, V>>,
1787 gen: Option<u16>,
1788 counters: &mut EvictionCounters,
1789 ) {
1790 if entry.is_admitted() {
1791 entry.set_admitted(false);
1792 counters.saturating_sub(1, entry.policy_weight());
1793 deqs.unlink_ao(&entry);
1795 Deques::unlink_wo(&mut deqs.write_order, &entry);
1796 } else {
1797 entry.unset_q_nodes();
1798 }
1799 if let Some(g) = gen {
1800 entry.entry_info().set_policy_gen(g);
1801 }
1802 }
1803
1804 fn handle_remove_with_deques(
1805 ao_deq_name: &str,
1806 ao_deq: &mut Deque<KeyHashDate<K>>,
1807 wo_deq: &mut Deque<KeyHashDate<K>>,
1808 timer_wheel: &mut TimerWheel<K>,
1809 entry: MiniArc<ValueEntry<K, V>>,
1810 counters: &mut EvictionCounters,
1811 ) {
1812 if let Some(timer) = entry.take_timer_node() {
1813 timer_wheel.deschedule(timer);
1814 }
1815 if entry.is_admitted() {
1816 entry.set_admitted(false);
1817 counters.saturating_sub(1, entry.policy_weight());
1818 Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
1820 Deques::unlink_wo(wo_deq, &entry);
1821 } else {
1822 entry.unset_q_nodes();
1823 }
1824 }
1825
1826 fn evict_expired_entries_using_timers(
1827 &self,
1828 timer_wheel: &mut TimerWheel<K>,
1829 deqs: &mut Deques<K>,
1830 eviction_state: &mut EvictionState<'_, K, V>,
1831 ) where
1832 V: Clone,
1833 {
1834 use crate::common::timer_wheel::TimerEvent;
1835
1836 let now = self.current_time();
1837
1838 for event in timer_wheel.advance(now) {
1850 if let TimerEvent::Expired(node) = event {
1853 let entry_info = node.element.entry_info();
1854
1855 if entry_info.is_dirty() {
1856 continue;
1859 }
1860
1861 let kh = entry_info.key_hash();
1862 let key = &kh.key;
1863 let hash = kh.hash;
1864
1865 let kl = self.maybe_key_lock(key);
1868 let _klg = &kl.as_ref().map(|kl| kl.lock());
1869
1870 let maybe_entry = self.cache.remove_if(
1872 hash,
1873 |k| k == key,
1874 |_, v| is_expired_by_per_entry_ttl(v.entry_info(), now),
1875 );
1876
1877 if let Some(entry) = maybe_entry {
1878 if eviction_state.is_notifier_enabled() {
1879 let key = Arc::clone(key);
1880 eviction_state.notify_entry_removal(key, &entry, RemovalCause::Expired);
1881 }
1882 eviction_state.counters.incr_eviction_count();
1883 Self::handle_remove_without_timer_wheel(
1884 deqs,
1885 entry,
1886 None,
1887 &mut eviction_state.counters,
1888 );
1889 } else {
1890 }
1893 }
1894 }
1895 }
1896
1897 fn evict_expired_entries_using_deqs(
1898 &self,
1899 deqs: &mut Deques<K>,
1900 timer_wheel: &mut TimerWheel<K>,
1901 batch_size: u32,
1902 state: &mut EvictionState<'_, K, V>,
1903 ) where
1904 V: Clone,
1905 {
1906 use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window};
1907
1908 let now = self.current_time();
1909
1910 if self.is_write_order_queue_enabled() {
1911 self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state);
1912 }
1913
1914 if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() {
1915 self.remove_expired_ao(Window, deqs, timer_wheel, batch_size, now, state);
1916 self.remove_expired_ao(Probation, deqs, timer_wheel, batch_size, now, state);
1917 self.remove_expired_ao(Protected, deqs, timer_wheel, batch_size, now, state);
1918 }
1919 }
1920
1921 #[allow(clippy::too_many_arguments)]
1922 #[inline]
1923 fn remove_expired_ao(
1924 &self,
1925 cache_region: CacheRegion,
1926 deqs: &mut Deques<K>,
1927 timer_wheel: &mut TimerWheel<K>,
1928 batch_size: u32,
1929 now: Instant,
1930 eviction_state: &mut EvictionState<'_, K, V>,
1931 ) where
1932 V: Clone,
1933 {
1934 let tti = &self.expiration_policy.time_to_idle();
1935 let va = &self.valid_after();
1936 let deq_name = cache_region.name();
1937 let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
1938 let mut more_to_evict = true;
1939
1940 for _ in 0..batch_size {
1941 let maybe_key_hash_ts = ao_deq.peek_front().map(|node| {
1942 let elem = &node.element;
1943 (
1944 Arc::clone(elem.key()),
1945 elem.hash(),
1946 elem.is_dirty(),
1947 elem.last_accessed(),
1948 )
1949 });
1950
1951 let (key, hash, cause) = match maybe_key_hash_ts {
1952 Some((key, hash, false, Some(ts))) => {
1953 let cause = match is_entry_expired_ao_or_invalid(tti, va, ts, now) {
1954 (true, _) => RemovalCause::Expired,
1955 (false, true) => RemovalCause::Explicit,
1956 (false, false) => {
1957 more_to_evict = false;
1958 break;
1959 }
1960 };
1961 (key, hash, cause)
1962 }
1963 Some((key, hash, true, _) | (key, hash, false, None)) => {
1967 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
1971 more_to_evict = false;
1975 continue;
1976 }
1977 None => {
1978 more_to_evict = false;
1979 break;
1980 }
1981 };
1982
1983 let kl = self.maybe_key_lock(&key);
1985 let _klg = &kl.as_ref().map(|kl| kl.lock());
1986
1987 let maybe_entry = self.cache.remove_if(
1992 hash,
1993 |k| k == &key,
1994 |_, v| is_expired_entry_ao(tti, va, v, now),
1995 );
1996
1997 if let Some(entry) = maybe_entry {
1998 if eviction_state.is_notifier_enabled() {
1999 eviction_state.notify_entry_removal(key, &entry, cause);
2000 }
2001 eviction_state.counters.incr_eviction_count();
2002 Self::handle_remove_with_deques(
2003 deq_name,
2004 ao_deq,
2005 wo_deq,
2006 timer_wheel,
2007 entry,
2008 &mut eviction_state.counters,
2009 );
2010 } else {
2011 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2012 more_to_evict = false;
2013 }
2014 }
2015
2016 if more_to_evict {
2017 eviction_state.more_entries_to_evict = true;
2018 }
2019 }
2020
2021 #[inline]
2022 fn skip_updated_entry_ao(
2023 &self,
2024 key: &K,
2025 hash: u64,
2026 deq_name: &str,
2027 deq: &mut Deque<KeyHashDate<K>>,
2028 write_order_deq: &mut Deque<KeyHashDate<K>>,
2029 ) {
2030 if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2031 Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
2034 if entry.is_dirty() {
2035 Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
2036 }
2037 } else {
2038 deq.move_front_to_back();
2043 }
2044 }
2045
2046 #[inline]
2047 fn skip_updated_entry_wo(&self, key: &K, hash: u64, deqs: &mut Deques<K>) {
2048 if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2049 deqs.move_to_back_ao(&entry);
2052 deqs.move_to_back_wo(&entry);
2053 } else {
2054 deqs.write_order.move_front_to_back();
2059 }
2060 }
2061
2062 #[inline]
2063 fn remove_expired_wo(
2064 &self,
2065 deqs: &mut Deques<K>,
2066 timer_wheel: &mut TimerWheel<K>,
2067 batch_size: u32,
2068 now: Instant,
2069 eviction_state: &mut EvictionState<'_, K, V>,
2070 ) where
2071 V: Clone,
2072 {
2073 let ttl = &self.expiration_policy.time_to_live();
2074 let va = &self.valid_after();
2075 let mut more_to_evict = true;
2076
2077 for _ in 0..batch_size {
2078 let maybe_key_hash_ts = deqs.write_order.peek_front().map(|node| {
2079 let elem = &node.element;
2080 (
2081 Arc::clone(elem.key()),
2082 elem.hash(),
2083 elem.is_dirty(),
2084 elem.last_modified(),
2085 )
2086 });
2087
2088 let (key, hash, cause) = match maybe_key_hash_ts {
2089 Some((key, hash, false, Some(ts))) => {
2090 let cause = match is_entry_expired_wo_or_invalid(ttl, va, ts, now) {
2091 (true, _) => RemovalCause::Expired,
2092 (false, true) => RemovalCause::Explicit,
2093 (false, false) => {
2094 more_to_evict = false;
2095 break;
2096 }
2097 };
2098 (key, hash, cause)
2099 }
2100 Some((key, hash, true, _) | (key, hash, false, None)) => {
2104 self.skip_updated_entry_wo(&key, hash, deqs);
2105 more_to_evict = false;
2106 continue;
2107 }
2108 None => {
2109 more_to_evict = false;
2110 break;
2111 }
2112 };
2113
2114 let kl = self.maybe_key_lock(&key);
2116 let _klg = &kl.as_ref().map(|kl| kl.lock());
2117
2118 let maybe_entry = self.cache.remove_if(
2119 hash,
2120 |k| k == &key,
2121 |_, v| is_expired_entry_wo(ttl, va, v, now),
2122 );
2123
2124 if let Some(entry) = maybe_entry {
2125 if eviction_state.is_notifier_enabled() {
2126 eviction_state.notify_entry_removal(key, &entry, cause);
2127 }
2128 eviction_state.counters.incr_eviction_count();
2129 Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2130 } else {
2131 self.skip_updated_entry_wo(&key, hash, deqs);
2132 more_to_evict = false;
2133 }
2134 }
2135
2136 if more_to_evict {
2137 eviction_state.more_entries_to_evict = true;
2138 }
2139 }
2140
2141 fn invalidate_entries(
2142 &self,
2143 invalidator: &Invalidator<K, V, S>,
2144 deqs: &mut Deques<K>,
2145 timer_wheel: &mut TimerWheel<K>,
2146 batch_size: u32,
2147 eviction_state: &mut EvictionState<'_, K, V>,
2148 ) where
2149 V: Clone,
2150 {
2151 let now = self.current_time();
2152
2153 if deqs.write_order.len() == 0 {
2156 invalidator.remove_predicates_registered_before(now);
2157 return;
2158 }
2159
2160 let mut candidates = Vec::new();
2161 let mut len = 0;
2162 let has_next;
2163 {
2164 let iter = &mut deqs.write_order.peekable();
2165
2166 while len < batch_size {
2167 if let Some(kd) = iter.next() {
2168 if !kd.is_dirty() {
2169 if let Some(ts) = kd.last_modified() {
2170 let key = kd.key();
2171 let hash = self.hash(key);
2172 candidates.push(KeyDateLite::new(key, hash, ts));
2173 len += 1;
2174 }
2175 }
2176 } else {
2177 break;
2178 }
2179 }
2180
2181 has_next = iter.peek().is_some();
2182 }
2183
2184 if len == 0 {
2185 return;
2186 }
2187
2188 let is_truncated = len == batch_size && has_next;
2189 let (invalidated, is_done) =
2190 invalidator.scan_and_invalidate(self, candidates, is_truncated);
2191
2192 for KvEntry { key: _key, entry } in invalidated {
2193 Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2194 }
2195 if is_done {
2196 deqs.write_order.reset_cursor();
2197 }
2198 if !invalidator.is_empty() {
2199 eviction_state.more_entries_to_evict = true;
2200 }
2201 }
2202
2203 fn evict_lru_entries(
2204 &self,
2205 deqs: &mut Deques<K>,
2206 timer_wheel: &mut TimerWheel<K>,
2207 batch_size: u32,
2208 weights_to_evict: u64,
2209 eviction_state: &mut EvictionState<'_, K, V>,
2210 ) where
2211 V: Clone,
2212 {
2213 const CACHE_REGION: CacheRegion = CacheRegion::MainProbation;
2214 let deq_name = CACHE_REGION.name();
2215 let (ao_deq, wo_deq) = deqs.select_mut(CACHE_REGION);
2216 let mut evicted = 0u64;
2217 let mut more_to_evict = true;
2218
2219 for _ in 0..batch_size {
2220 if evicted >= weights_to_evict {
2221 more_to_evict = false;
2222 break;
2223 }
2224
2225 let maybe_key_hash_ts = ao_deq.peek_front().map(|node| {
2226 let entry_info = node.element.entry_info();
2227 (
2228 Arc::clone(node.element.key()),
2229 node.element.hash(),
2230 entry_info.is_dirty(),
2231 entry_info.last_accessed(),
2232 )
2233 });
2234
2235 let (key, hash, ts) = match maybe_key_hash_ts {
2236 Some((key, hash, false, Some(ts))) => (key, hash, ts),
2237 Some((key, hash, true, _) | (key, hash, false, None)) => {
2241 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2245 more_to_evict = false;
2249 continue;
2250 }
2251 None => {
2252 more_to_evict = false;
2253 break;
2254 }
2255 };
2256
2257 let kl = self.maybe_key_lock(&key);
2259 let _klg = &kl.as_ref().map(|kl| kl.lock());
2260
2261 let maybe_entry = self.cache.remove_if(
2262 hash,
2263 |k| k == &key,
2264 |_, v| {
2265 if let Some(la) = v.last_accessed() {
2266 la == ts
2267 } else {
2268 false
2269 }
2270 },
2271 );
2272
2273 if let Some(entry) = maybe_entry {
2274 if eviction_state.is_notifier_enabled() {
2275 eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
2276 }
2277 eviction_state.counters.incr_eviction_count();
2278 let weight = entry.policy_weight();
2279 Self::handle_remove_with_deques(
2280 deq_name,
2281 ao_deq,
2282 wo_deq,
2283 timer_wheel,
2284 entry,
2285 &mut eviction_state.counters,
2286 );
2287 evicted = evicted.saturating_add(weight as u64);
2288 } else {
2289 self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2290 more_to_evict = false;
2291 }
2292 }
2293
2294 if more_to_evict {
2295 eviction_state.more_entries_to_evict = true;
2296 }
2297 }
2298}
2299
2300impl<K, V, S> Inner<K, V, S>
2301where
2302 K: Send + Sync + 'static,
2303 V: Clone + Send + Sync + 'static,
2304{
2305 pub(crate) fn notify_single_removal(
2306 &self,
2307 key: Arc<K>,
2308 entry: &MiniArc<ValueEntry<K, V>>,
2309 cause: RemovalCause,
2310 ) {
2311 if let Some(notifier) = &self.removal_notifier {
2312 notifier.notify(key, entry.value.clone(), cause);
2313 }
2314 }
2315
2316 #[inline]
2317 fn notify_upsert(
2318 &self,
2319 key: Arc<K>,
2320 entry: &MiniArc<ValueEntry<K, V>>,
2321 last_accessed: Option<Instant>,
2322 last_modified: Option<Instant>,
2323 ) {
2324 let now = self.current_time();
2325 let exp = &self.expiration_policy;
2326
2327 let mut cause = RemovalCause::Replaced;
2328
2329 if let Some(last_accessed) = last_accessed {
2330 if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2331 cause = RemovalCause::Expired;
2332 }
2333 }
2334
2335 if let Some(last_modified) = last_modified {
2336 if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2337 cause = RemovalCause::Expired;
2338 } else if is_invalid_entry(&self.valid_after(), last_modified) {
2339 cause = RemovalCause::Explicit;
2340 }
2341 }
2342
2343 self.notify_single_removal(key, entry, cause);
2344 }
2345
2346 #[inline]
2347 fn notify_invalidate(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) {
2348 let now = self.current_time();
2349 let exp = &self.expiration_policy;
2350
2351 let mut cause = RemovalCause::Explicit;
2352
2353 if let Some(last_accessed) = entry.last_accessed() {
2354 if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2355 cause = RemovalCause::Expired;
2356 }
2357 }
2358
2359 if let Some(last_modified) = entry.last_modified() {
2360 if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2361 cause = RemovalCause::Expired;
2362 }
2363 }
2364
2365 self.notify_single_removal(Arc::clone(key), entry, cause);
2366 }
2367}
2368
2369#[cfg(test)]
2373impl<K, V, S> Inner<K, V, S>
2374where
2375 K: Hash + Eq,
2376 S: BuildHasher + Clone,
2377{
2378 fn invalidation_predicate_count(&self) -> usize {
2379 if let Some(inv) = &self.invalidator {
2380 inv.predicate_count()
2381 } else {
2382 0
2383 }
2384 }
2385
2386 fn key_locks_map_is_empty(&self) -> bool {
2387 self.key_locks
2388 .as_ref()
2389 .map(|m| m.is_empty())
2390 .unwrap_or(true)
2392 }
2393}
2394
2395#[inline]
2401fn is_expired_by_per_entry_ttl<K>(entry_info: &MiniArc<EntryInfo<K>>, now: Instant) -> bool {
2402 if let Some(ts) = entry_info.expiration_time() {
2403 ts <= now
2404 } else {
2405 false
2406 }
2407}
2408
2409#[inline]
2414fn is_expired_entry_ao(
2415 time_to_idle: &Option<Duration>,
2416 valid_after: &Option<Instant>,
2417 entry: &impl AccessTime,
2418 now: Instant,
2419) -> bool {
2420 if let Some(ts) = entry.last_accessed() {
2421 is_invalid_entry(valid_after, ts) || is_expired_by_tti(time_to_idle, ts, now)
2422 } else {
2423 false
2424 }
2425}
2426
2427#[inline]
2432fn is_expired_entry_wo(
2433 time_to_live: &Option<Duration>,
2434 valid_after: &Option<Instant>,
2435 entry: &impl AccessTime,
2436 now: Instant,
2437) -> bool {
2438 if let Some(ts) = entry.last_modified() {
2439 is_invalid_entry(valid_after, ts) || is_expired_by_ttl(time_to_live, ts, now)
2440 } else {
2441 false
2442 }
2443}
2444
2445#[inline]
2446fn is_entry_expired_ao_or_invalid(
2447 time_to_idle: &Option<Duration>,
2448 valid_after: &Option<Instant>,
2449 entry_last_accessed: Instant,
2450 now: Instant,
2451) -> (bool, bool) {
2452 let ts = entry_last_accessed;
2453 let expired = is_expired_by_tti(time_to_idle, ts, now);
2454 let invalid = is_invalid_entry(valid_after, ts);
2455 (expired, invalid)
2456}
2457
2458#[inline]
2459fn is_entry_expired_wo_or_invalid(
2460 time_to_live: &Option<Duration>,
2461 valid_after: &Option<Instant>,
2462 entry_last_modified: Instant,
2463 now: Instant,
2464) -> (bool, bool) {
2465 let ts = entry_last_modified;
2466 let expired = is_expired_by_ttl(time_to_live, ts, now);
2467 let invalid = is_invalid_entry(valid_after, ts);
2468 (expired, invalid)
2469}
2470
2471#[inline]
2472fn is_invalid_entry(valid_after: &Option<Instant>, entry_ts: Instant) -> bool {
2473 if let Some(va) = valid_after {
2474 entry_ts < *va
2475 } else {
2476 false
2477 }
2478}
2479
2480#[inline]
2481fn is_expired_by_tti(
2482 time_to_idle: &Option<Duration>,
2483 entry_last_accessed: Instant,
2484 now: Instant,
2485) -> bool {
2486 if let Some(tti) = time_to_idle {
2487 let expiration = entry_last_accessed.saturating_add(*tti);
2488 expiration <= now
2489 } else {
2490 false
2491 }
2492}
2493
2494#[inline]
2495fn is_expired_by_ttl(
2496 time_to_live: &Option<Duration>,
2497 entry_last_modified: Instant,
2498 now: Instant,
2499) -> bool {
2500 if let Some(ttl) = time_to_live {
2501 let expiration = entry_last_modified.saturating_add(*ttl);
2502 expiration <= now
2503 } else {
2504 false
2505 }
2506}
2507
2508#[cfg(test)]
2509mod tests {
2510 use crate::{
2511 common::{time::Clock, HousekeeperConfig},
2512 policy::{EvictionPolicy, ExpirationPolicy},
2513 };
2514
2515 use super::BaseCache;
2516
2517 #[cfg_attr(target_pointer_width = "16", ignore)]
2518 #[test]
2519 fn test_skt_capacity_will_not_overflow() {
2520 use std::collections::hash_map::RandomState;
2521
2522 let pot = |exp| 2u64.pow(exp);
2524
2525 let ensure_sketch_len = |max_capacity, len, name| {
2526 let cache = BaseCache::<u8, u8>::new(
2527 None,
2528 Some(max_capacity),
2529 None,
2530 RandomState::default(),
2531 None,
2532 EvictionPolicy::default(),
2533 None,
2534 ExpirationPolicy::default(),
2535 HousekeeperConfig::default(),
2536 false,
2537 Clock::default(),
2538 );
2539 cache.inner.enable_frequency_sketch_for_testing();
2540 assert_eq!(
2541 cache.inner.frequency_sketch.read().table_len(),
2542 len as usize,
2543 "{name}"
2544 );
2545 };
2546
2547 if cfg!(target_pointer_width = "32") {
2548 let pot24 = pot(24);
2549 let pot16 = pot(16);
2550 ensure_sketch_len(0, 128, "0");
2551 ensure_sketch_len(128, 128, "128");
2552 ensure_sketch_len(pot16, pot16, "pot16");
2553 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
2555 ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1");
2557 ensure_sketch_len(pot24, pot24, "pot24");
2558 ensure_sketch_len(pot(27), pot24, "pot(27)");
2559 ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX");
2560 } else {
2561 let pot30 = pot(30);
2563 let pot16 = pot(16);
2564 ensure_sketch_len(0, 128, "0");
2565 ensure_sketch_len(128, 128, "128");
2566 ensure_sketch_len(pot16, pot16, "pot16");
2567 ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
2569
2570 if !cfg!(skip_large_mem_tests) {
2572 ensure_sketch_len(pot30 - 1, pot30, "pot30- 1");
2574 ensure_sketch_len(pot30, pot30, "pot30");
2575 ensure_sketch_len(u64::MAX, pot30, "u64::MAX");
2576 }
2577 };
2578 }
2579
2580 #[test]
2581 fn test_per_entry_expiration() {
2582 use super::InnerSync;
2583 use crate::{common::time::Clock, Entry, Expiry};
2584
2585 use std::{
2586 collections::hash_map::RandomState,
2587 sync::{Arc, Mutex},
2588 time::{Duration, Instant as StdInstant},
2589 };
2590
2591 type Key = u32;
2592 type Value = char;
2593
2594 fn current_time(cache: &BaseCache<Key, Value>) -> StdInstant {
2595 cache.inner.clock().to_std_instant(cache.current_time())
2596 }
2597
2598 fn insert(cache: &BaseCache<Key, Value>, key: Key, hash: u64, value: Value) {
2599 let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, value);
2600 cache.write_op_ch.send(op).expect("Failed to send");
2601 }
2602
2603 macro_rules! assert_params_eq {
2604 ($left:expr, $right:expr, $param_name:expr, $line:expr) => {
2605 assert_eq!(
2606 $left, $right,
2607 "Mismatched `{}`s. line: {}",
2608 $param_name, $line
2609 );
2610 };
2611 }
2612
2613 macro_rules! assert_expiry {
2614 ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => {
2615 $mock.increment(Duration::from_millis($duration_secs * 1000 - 1));
2617 $cache.inner.run_pending_tasks(None, 1, 10);
2618 assert!($cache.contains_key_with_hash(&$key, $hash));
2619 assert_eq!($cache.entry_count(), 1);
2620
2621 $mock.increment(Duration::from_millis(1));
2623 $cache.inner.run_pending_tasks(None, 1, 10);
2624 assert!(!$cache.contains_key_with_hash(&$key, $hash));
2625
2626 $mock.increment(Duration::from_secs(1));
2629 $cache.inner.run_pending_tasks(None, 1, 10);
2630 assert_eq!($cache.entry_count(), 0);
2631 };
2632 }
2633
2634 #[derive(Debug)]
2636 enum ExpiryExpectation {
2637 NoCall,
2638 AfterCreate {
2639 caller_line: u32,
2640 key: Key,
2641 value: Value,
2642 current_time: StdInstant,
2643 new_duration_secs: Option<u64>,
2644 },
2645 AfterRead {
2646 caller_line: u32,
2647 key: Key,
2648 value: Value,
2649 current_time: StdInstant,
2650 current_duration_secs: Option<u64>,
2651 last_modified_at: StdInstant,
2652 new_duration_secs: Option<u64>,
2653 },
2654 AfterUpdate {
2655 caller_line: u32,
2656 key: Key,
2657 value: Value,
2658 current_time: StdInstant,
2659 current_duration_secs: Option<u64>,
2660 new_duration_secs: Option<u64>,
2661 },
2662 }
2663
2664 impl ExpiryExpectation {
2665 fn after_create(
2666 caller_line: u32,
2667 key: Key,
2668 value: Value,
2669 current_time: StdInstant,
2670 new_duration_secs: Option<u64>,
2671 ) -> Self {
2672 Self::AfterCreate {
2673 caller_line,
2674 key,
2675 value,
2676 current_time,
2677 new_duration_secs,
2678 }
2679 }
2680
2681 fn after_read(
2682 caller_line: u32,
2683 key: Key,
2684 value: Value,
2685 current_time: StdInstant,
2686 current_duration_secs: Option<u64>,
2687 last_modified_at: StdInstant,
2688 new_duration_secs: Option<u64>,
2689 ) -> Self {
2690 Self::AfterRead {
2691 caller_line,
2692 key,
2693 value,
2694 current_time,
2695 current_duration_secs,
2696 last_modified_at,
2697 new_duration_secs,
2698 }
2699 }
2700
2701 fn after_update(
2702 caller_line: u32,
2703 key: Key,
2704 value: Value,
2705 current_time: StdInstant,
2706 current_duration_secs: Option<u64>,
2707 new_duration_secs: Option<u64>,
2708 ) -> Self {
2709 Self::AfterUpdate {
2710 caller_line,
2711 key,
2712 value,
2713 current_time,
2714 current_duration_secs,
2715 new_duration_secs,
2716 }
2717 }
2718 }
2719
2720 let expectation = Arc::new(Mutex::new(ExpiryExpectation::NoCall));
2721
2722 struct MyExpiry {
2723 expectation: Arc<Mutex<ExpiryExpectation>>,
2724 }
2725
2726 impl Expiry<u32, char> for MyExpiry {
2727 fn expire_after_create(
2728 &self,
2729 actual_key: &u32,
2730 actual_value: &char,
2731 actual_current_time: StdInstant,
2732 ) -> Option<Duration> {
2733 use ExpiryExpectation::*;
2734
2735 let lock = &mut *self.expectation.lock().unwrap();
2736 let expected = std::mem::replace(lock, NoCall);
2737 match expected {
2738 AfterCreate {
2739 caller_line,
2740 key,
2741 value,
2742 current_time,
2743 new_duration_secs: new_duration,
2744 } => {
2745 assert_params_eq!(*actual_key, key, "key", caller_line);
2746 assert_params_eq!(*actual_value, value, "value", caller_line);
2747 assert_params_eq!(
2748 actual_current_time,
2749 current_time,
2750 "current_time",
2751 caller_line
2752 );
2753 new_duration.map(Duration::from_secs)
2754 }
2755 expected => {
2756 panic!(
2757 "Unexpected call to expire_after_create: caller_line {}, expected: {expected:?}",
2758 line!()
2759 );
2760 }
2761 }
2762 }
2763
2764 fn expire_after_read(
2765 &self,
2766 actual_key: &u32,
2767 actual_value: &char,
2768 actual_current_time: StdInstant,
2769 actual_current_duration: Option<Duration>,
2770 actual_last_modified_at: StdInstant,
2771 ) -> Option<Duration> {
2772 use ExpiryExpectation::*;
2773
2774 let lock = &mut *self.expectation.lock().unwrap();
2775 let expected = std::mem::replace(lock, NoCall);
2776 match expected {
2777 AfterRead {
2778 caller_line,
2779 key,
2780 value,
2781 current_time,
2782 current_duration_secs,
2783 last_modified_at,
2784 new_duration_secs,
2785 } => {
2786 assert_params_eq!(*actual_key, key, "key", caller_line);
2787 assert_params_eq!(*actual_value, value, "value", caller_line);
2788 assert_params_eq!(
2789 actual_current_time,
2790 current_time,
2791 "current_time",
2792 caller_line
2793 );
2794 assert_params_eq!(
2795 actual_current_duration,
2796 current_duration_secs.map(Duration::from_secs),
2797 "current_duration",
2798 caller_line
2799 );
2800 assert_params_eq!(
2801 actual_last_modified_at,
2802 last_modified_at,
2803 "last_modified_at",
2804 caller_line
2805 );
2806 new_duration_secs.map(Duration::from_secs)
2807 }
2808 expected => {
2809 panic!(
2810 "Unexpected call to expire_after_read: caller_line {}, expected: {expected:?}",
2811 line!()
2812 );
2813 }
2814 }
2815 }
2816
2817 fn expire_after_update(
2818 &self,
2819 actual_key: &u32,
2820 actual_value: &char,
2821 actual_current_time: StdInstant,
2822 actual_current_duration: Option<Duration>,
2823 ) -> Option<Duration> {
2824 use ExpiryExpectation::*;
2825
2826 let lock = &mut *self.expectation.lock().unwrap();
2827 let expected = std::mem::replace(lock, NoCall);
2828 match expected {
2829 AfterUpdate {
2830 caller_line,
2831 key,
2832 value,
2833 current_time,
2834 current_duration_secs,
2835 new_duration_secs,
2836 } => {
2837 assert_params_eq!(*actual_key, key, "key", caller_line);
2838 assert_params_eq!(*actual_value, value, "value", caller_line);
2839 assert_params_eq!(
2840 actual_current_time,
2841 current_time,
2842 "current_time",
2843 caller_line
2844 );
2845 assert_params_eq!(
2846 actual_current_duration,
2847 current_duration_secs.map(Duration::from_secs),
2848 "current_duration",
2849 caller_line
2850 );
2851 new_duration_secs.map(Duration::from_secs)
2852 }
2853 expected => {
2854 panic!(
2855 "Unexpected call to expire_after_update: caller_line {}, expected: {expected:?}",
2856 line!()
2857 );
2858 }
2859 }
2860 }
2861 }
2862
2863 const TTL: u64 = 16;
2864 const TTI: u64 = 7;
2865 let expiry: Option<Arc<dyn Expiry<_, _> + Send + Sync + 'static>> =
2866 Some(Arc::new(MyExpiry {
2867 expectation: Arc::clone(&expectation),
2868 }));
2869
2870 let (clock, mock) = Clock::mock();
2871
2872 let mut cache = BaseCache::<Key, Value>::new(
2873 None,
2874 None,
2875 None,
2876 RandomState::default(),
2877 None,
2878 EvictionPolicy::default(),
2879 None,
2880 ExpirationPolicy::new(
2881 Some(Duration::from_secs(TTL)),
2882 Some(Duration::from_secs(TTI)),
2883 expiry,
2884 ),
2885 HousekeeperConfig::default(),
2886 false,
2887 clock,
2888 );
2889 cache.reconfigure_for_testing();
2890
2891 let cache = cache;
2893
2894 mock.increment(Duration::from_millis(10));
2895
2896 let key = 1;
2905 let hash = cache.hash(&key);
2906 let value = 'a';
2907
2908 *expectation.lock().unwrap() =
2909 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(1));
2910
2911 insert(&cache, key, hash, value);
2912 cache.inner.run_pending_tasks(None, 1, 10);
2915 assert_eq!(cache.entry_count(), 1);
2916
2917 assert_expiry!(cache, key, hash, mock, 1);
2918
2919 let key = 2;
2929 let hash = cache.hash(&key);
2930 let value = 'b';
2931
2932 *expectation.lock().unwrap() =
2933 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
2934 let inserted_at = current_time(&cache);
2935 insert(&cache, key, hash, value);
2936 cache.inner.run_pending_tasks(None, 1, 10);
2937 assert_eq!(cache.entry_count(), 1);
2938
2939 mock.increment(Duration::from_secs(1));
2941 cache.inner.run_pending_tasks(None, 1, 10);
2942 assert!(cache.contains_key_with_hash(&key, hash));
2943
2944 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
2946 line!(),
2947 key,
2948 value,
2949 current_time(&cache),
2950 Some(TTI - 1),
2951 inserted_at,
2952 Some(3),
2953 );
2954 assert_eq!(
2955 cache
2956 .get_with_hash(&key, hash, false)
2957 .map(Entry::into_value),
2958 Some(value)
2959 );
2960 cache.inner.run_pending_tasks(None, 1, 10);
2961
2962 assert_expiry!(cache, key, hash, mock, 3);
2963
2964 let key = 3;
2975 let hash = cache.hash(&key);
2976 let value = 'c';
2977
2978 *expectation.lock().unwrap() =
2979 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
2980 let inserted_at = current_time(&cache);
2981 insert(&cache, key, hash, value);
2982 cache.inner.run_pending_tasks(None, 1, 10);
2983 assert_eq!(cache.entry_count(), 1);
2984
2985 mock.increment(Duration::from_secs(1));
2987 cache.inner.run_pending_tasks(None, 1, 10);
2988 assert!(cache.contains_key_with_hash(&key, hash));
2989
2990 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
2992 line!(),
2993 key,
2994 value,
2995 current_time(&cache),
2996 Some(TTI - 1),
2997 inserted_at,
2998 None,
2999 );
3000 assert_eq!(
3001 cache
3002 .get_with_hash(&key, hash, false)
3003 .map(Entry::into_value),
3004 Some(value)
3005 );
3006 cache.inner.run_pending_tasks(None, 1, 10);
3007
3008 mock.increment(Duration::from_secs(2));
3010 cache.inner.run_pending_tasks(None, 1, 10);
3011 assert!(cache.contains_key_with_hash(&key, hash));
3012 assert_eq!(cache.entry_count(), 1);
3013
3014 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3016 line!(),
3017 key,
3018 value,
3019 current_time(&cache),
3020 Some(TTI),
3022 Some(3),
3023 );
3024 insert(&cache, key, hash, value);
3025 cache.inner.run_pending_tasks(None, 1, 10);
3026 assert_eq!(cache.entry_count(), 1);
3027
3028 assert_expiry!(cache, key, hash, mock, 3);
3029
3030 let key = 4;
3041 let hash = cache.hash(&key);
3042 let value = 'd';
3043
3044 *expectation.lock().unwrap() =
3045 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3046 let inserted_at = current_time(&cache);
3047 insert(&cache, key, hash, value);
3048 cache.inner.run_pending_tasks(None, 1, 10);
3049 assert_eq!(cache.entry_count(), 1);
3050
3051 mock.increment(Duration::from_secs(1));
3053 cache.inner.run_pending_tasks(None, 1, 10);
3054 assert!(cache.contains_key_with_hash(&key, hash));
3055 assert_eq!(cache.entry_count(), 1);
3056
3057 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3059 line!(),
3060 key,
3061 value,
3062 current_time(&cache),
3063 Some(TTI - 1),
3064 inserted_at,
3065 None,
3066 );
3067 assert_eq!(
3068 cache
3069 .get_with_hash(&key, hash, false)
3070 .map(Entry::into_value),
3071 Some(value)
3072 );
3073 cache.inner.run_pending_tasks(None, 1, 10);
3074
3075 mock.increment(Duration::from_secs(2));
3077 cache.inner.run_pending_tasks(None, 1, 10);
3078 assert!(cache.contains_key_with_hash(&key, hash));
3079 assert_eq!(cache.entry_count(), 1);
3080
3081 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3083 line!(),
3084 key,
3085 value,
3086 current_time(&cache),
3087 Some(TTI),
3089 None,
3090 );
3091 insert(&cache, key, hash, value);
3092 cache.inner.run_pending_tasks(None, 1, 10);
3093 assert_eq!(cache.entry_count(), 1);
3094
3095 assert_expiry!(cache, key, hash, mock, 7);
3096
3097 let key = 5;
3107 let hash = cache.hash(&key);
3108 let value = 'e';
3109
3110 *expectation.lock().unwrap() =
3111 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3112 let inserted_at = current_time(&cache);
3113 insert(&cache, key, hash, value);
3114 cache.inner.run_pending_tasks(None, 1, 10);
3115 assert_eq!(cache.entry_count(), 1);
3116
3117 mock.increment(Duration::from_secs(5));
3119 cache.inner.run_pending_tasks(None, 1, 10);
3120 assert!(cache.contains_key_with_hash(&key, hash));
3121 assert_eq!(cache.entry_count(), 1);
3122
3123 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3125 line!(),
3126 key,
3127 value,
3128 current_time(&cache),
3129 Some(TTI - 5),
3130 inserted_at,
3131 Some(8),
3132 );
3133 assert_eq!(
3134 cache
3135 .get_with_hash(&key, hash, false)
3136 .map(Entry::into_value),
3137 Some(value)
3138 );
3139 cache.inner.run_pending_tasks(None, 1, 10);
3140
3141 assert_expiry!(cache, key, hash, mock, 7);
3142
3143 let key = 6;
3154 let hash = cache.hash(&key);
3155 let value = 'f';
3156
3157 *expectation.lock().unwrap() =
3158 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3159 let inserted_at = current_time(&cache);
3160 insert(&cache, key, hash, value);
3161 cache.inner.run_pending_tasks(None, 1, 10);
3162 assert_eq!(cache.entry_count(), 1);
3163
3164 mock.increment(Duration::from_secs(5));
3166 cache.inner.run_pending_tasks(None, 1, 10);
3167 assert!(cache.contains_key_with_hash(&key, hash));
3168 assert_eq!(cache.entry_count(), 1);
3169
3170 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3172 line!(),
3173 key,
3174 value,
3175 current_time(&cache),
3176 Some(TTI - 5),
3177 inserted_at,
3178 Some(9),
3179 );
3180 assert_eq!(
3181 cache
3182 .get_with_hash(&key, hash, false)
3183 .map(Entry::into_value),
3184 Some(value)
3185 );
3186 cache.inner.run_pending_tasks(None, 1, 10);
3187
3188 mock.increment(Duration::from_secs(6));
3190 cache.inner.run_pending_tasks(None, 1, 10);
3191 assert!(cache.contains_key_with_hash(&key, hash));
3192 assert_eq!(cache.entry_count(), 1);
3193
3194 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3196 line!(),
3197 key,
3198 value,
3199 current_time(&cache),
3200 Some(TTI - 6),
3201 inserted_at,
3202 Some(10),
3203 );
3204 assert_eq!(
3205 cache
3206 .get_with_hash(&key, hash, false)
3207 .map(Entry::into_value),
3208 Some(value)
3209 );
3210 cache.inner.run_pending_tasks(None, 1, 10);
3211
3212 assert_expiry!(cache, key, hash, mock, 5);
3213
3214 let key = 7;
3225 let hash = cache.hash(&key);
3226 let value = 'g';
3227
3228 *expectation.lock().unwrap() =
3229 ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9));
3230 insert(&cache, key, hash, value);
3231 cache.inner.run_pending_tasks(None, 1, 10);
3232 assert_eq!(cache.entry_count(), 1);
3233
3234 mock.increment(Duration::from_secs(6));
3236 cache.inner.run_pending_tasks(None, 1, 10);
3237 assert!(cache.contains_key_with_hash(&key, hash));
3238 assert_eq!(cache.entry_count(), 1);
3239
3240 *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3242 line!(),
3243 key,
3244 value,
3245 current_time(&cache),
3246 Some(9 - 6),
3248 Some(8),
3249 );
3250 let updated_at = current_time(&cache);
3251 insert(&cache, key, hash, value);
3252 cache.inner.run_pending_tasks(None, 1, 10);
3253 assert_eq!(cache.entry_count(), 1);
3254
3255 mock.increment(Duration::from_secs(6));
3257 cache.inner.run_pending_tasks(None, 1, 10);
3258 assert!(cache.contains_key_with_hash(&key, hash));
3259 assert_eq!(cache.entry_count(), 1);
3260
3261 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3263 line!(),
3264 key,
3265 value,
3266 current_time(&cache),
3267 Some(TTI - 6),
3268 updated_at,
3269 Some(9),
3270 );
3271 assert_eq!(
3272 cache
3273 .get_with_hash(&key, hash, false)
3274 .map(Entry::into_value),
3275 Some(value)
3276 );
3277 cache.inner.run_pending_tasks(None, 1, 10);
3278
3279 mock.increment(Duration::from_secs(6));
3281 cache.inner.run_pending_tasks(None, 1, 10);
3282 assert!(cache.contains_key_with_hash(&key, hash));
3283 assert_eq!(cache.entry_count(), 1);
3284
3285 *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3287 line!(),
3288 key,
3289 value,
3290 current_time(&cache),
3291 Some(TTI - 6),
3292 updated_at,
3293 Some(5),
3294 );
3295 assert_eq!(
3296 cache
3297 .get_with_hash(&key, hash, false)
3298 .map(Entry::into_value),
3299 Some(value)
3300 );
3301 cache.inner.run_pending_tasks(None, 1, 10);
3302
3303 assert_expiry!(cache, key, hash, mock, 4);
3304 }
3305}