moka/sync_base/
base_cache.rs

1use super::{
2    invalidator::{Invalidator, KeyDateLite, PredicateFun},
3    iter::ScanningGet,
4    key_lock::{KeyLock, KeyLockMap},
5    PredicateId,
6};
7
8use crate::{
9    common::{
10        self,
11        concurrent::{
12            arc::MiniArc,
13            constants::{
14                READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT,
15            },
16            deques::Deques,
17            entry_info::EntryInfo,
18            housekeeper::{Housekeeper, InnerSync},
19            AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher,
20            WriteOp,
21        },
22        deque::{DeqNode, Deque},
23        frequency_sketch::FrequencySketch,
24        time::{AtomicInstant, Clock, Instant},
25        timer_wheel::{ReschedulingResult, TimerWheel},
26        CacheRegion, HousekeeperConfig,
27    },
28    notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause},
29    policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy},
30    Entry, Expiry, Policy, PredicateError,
31};
32
33use crossbeam_channel::{Receiver, Sender, TrySendError};
34use crossbeam_utils::atomic::AtomicCell;
35use parking_lot::{Mutex, RwLock};
36use smallvec::SmallVec;
37use std::{
38    borrow::Borrow,
39    collections::hash_map::RandomState,
40    hash::{BuildHasher, Hash, Hasher},
41    rc::Rc,
42    sync::{
43        atomic::{AtomicBool, AtomicU8, Ordering},
44        Arc,
45    },
46    time::{Duration, Instant as StdInstant},
47};
48
49pub(crate) type HouseKeeperArc = Arc<Housekeeper>;
50
51pub(crate) struct BaseCache<K, V, S = RandomState> {
52    pub(crate) inner: Arc<Inner<K, V, S>>,
53    read_op_ch: Sender<ReadOp<K, V>>,
54    pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
55    pub(crate) housekeeper: Option<HouseKeeperArc>,
56}
57
58impl<K, V, S> Clone for BaseCache<K, V, S> {
59    /// Makes a clone of this shared cache.
60    ///
61    /// This operation is cheap as it only creates thread-safe reference counted
62    /// pointers to the shared internal data structures.
63    fn clone(&self) -> Self {
64        Self {
65            inner: Arc::clone(&self.inner),
66            read_op_ch: self.read_op_ch.clone(),
67            write_op_ch: self.write_op_ch.clone(),
68            housekeeper: self.housekeeper.clone(),
69        }
70    }
71}
72
73impl<K, V, S> Drop for BaseCache<K, V, S> {
74    fn drop(&mut self) {
75        // The housekeeper needs to be dropped before the inner is dropped.
76        std::mem::drop(self.housekeeper.take());
77    }
78}
79
80impl<K, V, S> BaseCache<K, V, S> {
81    pub(crate) fn name(&self) -> Option<&str> {
82        self.inner.name()
83    }
84
85    pub(crate) fn policy(&self) -> Policy {
86        self.inner.policy()
87    }
88
89    pub(crate) fn entry_count(&self) -> u64 {
90        self.inner.entry_count()
91    }
92
93    pub(crate) fn weighted_size(&self) -> u64 {
94        self.inner.weighted_size()
95    }
96
97    pub(crate) fn is_map_disabled(&self) -> bool {
98        self.inner.max_capacity == Some(0)
99    }
100
101    #[inline]
102    pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
103        self.inner.is_removal_notifier_enabled()
104    }
105
106    #[inline]
107    pub(crate) fn current_time(&self) -> Instant {
108        self.inner.current_time()
109    }
110
111    pub(crate) fn notify_invalidate(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>)
112    where
113        K: Send + Sync + 'static,
114        V: Clone + Send + Sync + 'static,
115    {
116        self.inner.notify_invalidate(key, entry);
117    }
118}
119
120impl<K, V, S> BaseCache<K, V, S>
121where
122    K: Hash + Eq,
123    S: BuildHasher,
124{
125    pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>> {
126        self.inner.maybe_key_lock(key)
127    }
128}
129
130impl<K, V, S> BaseCache<K, V, S>
131where
132    K: Hash + Eq + Send + Sync + 'static,
133    V: Clone + Send + Sync + 'static,
134    S: BuildHasher + Clone + Send + Sync + 'static,
135{
136    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
137    #[allow(clippy::too_many_arguments)]
138    pub(crate) fn new(
139        name: Option<String>,
140        max_capacity: Option<u64>,
141        initial_capacity: Option<usize>,
142        build_hasher: S,
143        weigher: Option<Weigher<K, V>>,
144        eviction_policy: EvictionPolicy,
145        eviction_listener: Option<EvictionListener<K, V>>,
146        expiration_policy: ExpirationPolicy<K, V>,
147        housekeeper_config: HousekeeperConfig,
148        invalidator_enabled: bool,
149        clock: Clock,
150    ) -> Self {
151        let (r_size, w_size) = if max_capacity == Some(0) {
152            (0, 0)
153        } else {
154            (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE)
155        };
156        let is_eviction_listener_enabled = eviction_listener.is_some();
157        let fast_now = clock.fast_now();
158
159        let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
160        let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
161
162        let inner = Arc::new(Inner::new(
163            name,
164            max_capacity,
165            initial_capacity,
166            build_hasher,
167            weigher,
168            eviction_policy,
169            eviction_listener,
170            r_rcv,
171            w_rcv,
172            expiration_policy,
173            invalidator_enabled,
174            clock,
175        ));
176
177        Self {
178            inner,
179            read_op_ch: r_snd,
180            write_op_ch: w_snd,
181            housekeeper: Some(Arc::new(Housekeeper::new(
182                is_eviction_listener_enabled,
183                housekeeper_config,
184                fast_now,
185            ))),
186        }
187    }
188
189    #[inline]
190    pub(crate) fn hash<Q>(&self, key: &Q) -> u64
191    where
192        K: Borrow<Q>,
193        Q: Hash + Eq + ?Sized,
194    {
195        self.inner.hash(key)
196    }
197
198    pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
199    where
200        K: Borrow<Q>,
201        Q: Hash + Eq + ?Sized,
202    {
203        // TODO: Maybe we can just call ScanningGet::scanning_get.
204        self.inner
205            .get_key_value_and(key, hash, |k, entry| {
206                let i = &self.inner;
207                let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
208                let now = self.current_time();
209
210                !is_expired_by_per_entry_ttl(entry.entry_info(), now)
211                    && !is_expired_entry_wo(ttl, va, entry, now)
212                    && !is_expired_entry_ao(tti, va, entry, now)
213                    && !i.is_invalidated_entry(k, entry)
214            })
215            .unwrap_or_default() // `false` is the default for `bool` type.
216    }
217
218    pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
219    where
220        K: Borrow<Q>,
221        Q: Hash + Eq + ?Sized,
222    {
223        // Define a closure to record a read op.
224        let record = |op, now| {
225            self.record_read_op(op, now)
226                .expect("Failed to record a get op");
227        };
228        let ignore_if = None as Option<&mut fn(&V) -> bool>;
229        self.do_get_with_hash(key, hash, record, ignore_if, need_key)
230    }
231
232    pub(crate) fn get_with_hash_and_ignore_if<Q, I>(
233        &self,
234        key: &Q,
235        hash: u64,
236        ignore_if: Option<&mut I>,
237        need_key: bool,
238    ) -> Option<Entry<K, V>>
239    where
240        K: Borrow<Q>,
241        Q: Hash + Eq + ?Sized,
242        I: FnMut(&V) -> bool,
243    {
244        // Define a closure to record a read op.
245        let record = |op, now| {
246            self.record_read_op(op, now)
247                .expect("Failed to record a get op");
248        };
249        self.do_get_with_hash(key, hash, record, ignore_if, need_key)
250    }
251
252    pub(crate) fn get_with_hash_without_recording<Q, I>(
253        &self,
254        key: &Q,
255        hash: u64,
256        ignore_if: Option<&mut I>,
257    ) -> Option<V>
258    where
259        K: Borrow<Q>,
260        Q: Hash + Eq + ?Sized,
261        I: FnMut(&V) -> bool,
262    {
263        // Define a closure that skips to record a read op.
264        let record = |_op, _now| {};
265        self.do_get_with_hash(key, hash, record, ignore_if, false)
266            .map(Entry::into_value)
267    }
268
269    fn do_get_with_hash<Q, R, I>(
270        &self,
271        key: &Q,
272        hash: u64,
273        read_recorder: R,
274        mut ignore_if: Option<&mut I>,
275        need_key: bool,
276    ) -> Option<Entry<K, V>>
277    where
278        K: Borrow<Q>,
279        Q: Hash + Eq + ?Sized,
280        R: Fn(ReadOp<K, V>, Instant),
281        I: FnMut(&V) -> bool,
282    {
283        if self.is_map_disabled() {
284            return None;
285        }
286
287        let mut now = self.current_time();
288
289        let maybe_entry = self
290            .inner
291            .get_key_value_and_then(key, hash, move |k, entry| {
292                if let Some(ignore_if) = &mut ignore_if {
293                    if ignore_if(&entry.value) {
294                        // Ignore the entry.
295                        return None;
296                    }
297                }
298
299                let i = &self.inner;
300                let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
301
302                if is_expired_by_per_entry_ttl(entry.entry_info(), now)
303                    || is_expired_entry_wo(ttl, va, entry, now)
304                    || is_expired_entry_ao(tti, va, entry, now)
305                    || i.is_invalidated_entry(k, entry)
306                {
307                    // Expired or invalidated entry.
308                    None
309                } else {
310                    // Valid entry.
311                    let maybe_key = if need_key { Some(Arc::clone(k)) } else { None };
312                    Some((maybe_key, MiniArc::clone(entry)))
313                }
314            });
315
316        if let Some((maybe_key, entry)) = maybe_entry {
317            let mut is_expiry_modified = false;
318
319            // Call the user supplied `expire_after_read` method if any.
320            if let Some(expiry) = &self.inner.expiration_policy.expiry() {
321                let lm = entry.last_modified().expect("Last modified is not set");
322                // Check if the `last_modified` of entry is earlier than or equals to
323                // `now`. If not, update the `now` to `last_modified`. This is needed
324                // because there is a small chance that other threads have inserted
325                // the entry _after_ we obtained `now`.
326                now = now.max(lm);
327
328                // Convert `last_modified` from `moka::common::time::Instant` to
329                // `std::time::Instant`.
330                let lm = self.inner.clock().to_std_instant(lm);
331
332                // Call the user supplied `expire_after_read` method.
333                //
334                // We will put the return value (`is_expiry_modified: bool`) to a
335                // `ReadOp` so that `apply_reads` method can determine whether or not
336                // to reschedule the timer for the entry.
337                //
338                // NOTE: It is not guaranteed that the `ReadOp` is passed to
339                // `apply_reads`. Here are the corner cases that the `ReadOp` will
340                // not be passed to `apply_reads`:
341                //
342                // - If the bounded `read_op_ch` channel is full, the `ReadOp` will
343                //   be discarded.
344                // - If we were called by `get_with_hash_without_recording` method,
345                //   the `ReadOp` will not be recorded at all.
346                //
347                // These cases are okay because when the timer wheel tries to expire
348                // the entry, it will check if the entry is actually expired. If not,
349                // the timer wheel will reschedule the expiration timer for the
350                // entry.
351                is_expiry_modified = Self::expire_after_read_or_update(
352                    |k, v, t, d| expiry.expire_after_read(k, v, t, d, lm),
353                    &entry.entry_info().key_hash().key,
354                    &entry,
355                    self.inner.expiration_policy.time_to_live(),
356                    self.inner.expiration_policy.time_to_idle(),
357                    now,
358                    self.inner.clock(),
359                );
360            }
361
362            entry.set_last_accessed(now);
363
364            let v = entry.value.clone();
365            let op = ReadOp::Hit {
366                value_entry: entry,
367                is_expiry_modified,
368            };
369            read_recorder(op, now);
370            Some(Entry::new(maybe_key, v, false, false))
371        } else {
372            read_recorder(ReadOp::Miss(hash), now);
373            None
374        }
375    }
376
377    pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
378    where
379        K: Borrow<Q>,
380        Q: Hash + Eq + ?Sized,
381    {
382        self.inner
383            .get_key_value_and(key, hash, |k, _entry| Arc::clone(k))
384    }
385
386    #[inline]
387    pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
388    where
389        K: Borrow<Q>,
390        Q: Hash + Eq + ?Sized,
391    {
392        self.inner.remove_entry(key, hash)
393    }
394
395    #[inline]
396    pub(crate) fn apply_reads_writes_if_needed(
397        inner: &impl InnerSync,
398        ch: &Sender<WriteOp<K, V>>,
399        now: Instant,
400        housekeeper: Option<&HouseKeeperArc>,
401    ) {
402        let w_len = ch.len();
403
404        if let Some(hk) = housekeeper {
405            if Self::should_apply_writes(hk, w_len, now) {
406                hk.try_run_pending_tasks(inner);
407            }
408        }
409    }
410
411    pub(crate) fn invalidate_all(&self) {
412        let now = self.current_time();
413        self.inner.set_valid_after(now);
414    }
415
416    pub(crate) fn invalidate_entries_if(
417        &self,
418        predicate: PredicateFun<K, V>,
419    ) -> Result<PredicateId, PredicateError> {
420        let now = self.current_time();
421        self.inner.register_invalidation_predicate(predicate, now)
422    }
423}
424
425//
426// Iterator support
427//
428impl<K, V, S> ScanningGet<K, V> for BaseCache<K, V, S>
429where
430    K: Hash + Eq + Send + Sync + 'static,
431    V: Clone + Send + Sync + 'static,
432    S: BuildHasher + Clone + Send + Sync + 'static,
433{
434    fn num_cht_segments(&self) -> usize {
435        self.inner.num_cht_segments()
436    }
437
438    fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
439        let hash = self.hash(key);
440        self.inner.get_key_value_and_then(key, hash, |k, entry| {
441            let i = &self.inner;
442            let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after());
443            let now = self.current_time();
444
445            if is_expired_by_per_entry_ttl(entry.entry_info(), now)
446                || is_expired_entry_wo(ttl, va, entry, now)
447                || is_expired_entry_ao(tti, va, entry, now)
448                || i.is_invalidated_entry(k, entry)
449            {
450                // Expired or invalidated entry.
451                None
452            } else {
453                // Valid entry.
454                Some(entry.value.clone())
455            }
456        })
457    }
458
459    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
460        self.inner.keys(cht_segment)
461    }
462}
463
464//
465// private methods
466//
467impl<K, V, S> BaseCache<K, V, S>
468where
469    K: Hash + Eq + Send + Sync + 'static,
470    V: Clone + Send + Sync + 'static,
471    S: BuildHasher + Clone + Send + Sync + 'static,
472{
473    #[inline]
474    fn record_read_op(
475        &self,
476        op: ReadOp<K, V>,
477        now: Instant,
478    ) -> Result<(), TrySendError<ReadOp<K, V>>> {
479        self.apply_reads_if_needed(&self.inner, now);
480        let ch = &self.read_op_ch;
481        match ch.try_send(op) {
482            // Discard the ReadOp when the channel is full.
483            Ok(()) | Err(TrySendError::Full(_)) => Ok(()),
484            Err(e @ TrySendError::Disconnected(_)) => Err(e),
485        }
486    }
487
488    #[inline]
489    pub(crate) fn do_insert_with_hash(
490        &self,
491        key: Arc<K>,
492        hash: u64,
493        value: V,
494    ) -> (WriteOp<K, V>, Instant) {
495        let weight = self.inner.weigh(&key, &value);
496        let op_cnt1 = Rc::new(AtomicU8::new(0));
497        let op_cnt2 = Rc::clone(&op_cnt1);
498        let mut op1 = None;
499        let mut op2 = None;
500
501        // Lock the key for update if blocking removal notification is enabled.
502        let kl = self.maybe_key_lock(&key);
503        let _klg = &kl.as_ref().map(|kl| kl.lock());
504
505        let ts = self.current_time();
506
507        // TODO: Instead using Arc<AtomicU8> to check if the actual operation was
508        // insert or update, check the return value of insert_with_or_modify. If it
509        // is_some, the value was updated, otherwise the value was inserted.
510
511        // Since the cache (cht::SegmentedHashMap) employs optimistic locking
512        // strategy, insert_with_or_modify() may get an insert/modify operation
513        // conflicted with other concurrent hash table operations. In that case, it
514        // has to retry the insertion or modification, so on_insert and/or on_modify
515        // closures can be executed more than once. In order to identify the last
516        // call of these closures, we use a shared counter (op_cnt{1,2}) here to
517        // record a serial number on a WriteOp, and consider the WriteOp with the
518        // largest serial number is the one made by the last call of the closures.
519        self.inner.cache.insert_with_or_modify(
520            Arc::clone(&key),
521            hash,
522            // on_insert
523            || {
524                let (entry, gen) = self.new_value_entry(&key, hash, value.clone(), ts, weight);
525                let ins_op = WriteOp::new_upsert(&key, hash, &entry, gen, 0, weight);
526                let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
527                op1 = Some((cnt, ins_op));
528                entry
529            },
530            // on_modify
531            |_k, old_entry| {
532                let old_weight = old_entry.policy_weight();
533
534                // Create this OldEntryInfo _before_ creating a new ValueEntry, so
535                // that the OldEntryInfo can preserve the old EntryInfo's
536                // last_accessed and last_modified timestamps.
537                let old_info = OldEntryInfo::new(old_entry);
538                let (entry, gen) = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
539                let upd_op = WriteOp::new_upsert(&key, hash, &entry, gen, old_weight, weight);
540                let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
541                op2 = Some((cnt, old_info, upd_op));
542                entry
543            },
544        );
545
546        match (op1, op2) {
547            (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op),
548            (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
549                self.do_post_insert_steps(ts, &key, ins_op)
550            }
551            (_, Some((_cnt, old_info, upd_op))) => {
552                self.do_post_update_steps(ts, key, old_info, upd_op)
553            }
554            (None, None) => unreachable!(),
555        }
556    }
557
558    fn do_post_insert_steps(
559        &self,
560        ts: Instant,
561        key: &Arc<K>,
562        ins_op: WriteOp<K, V>,
563    ) -> (WriteOp<K, V>, Instant) {
564        if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
565            (&self.inner.expiration_policy.expiry(), &ins_op)
566        {
567            Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clock());
568        }
569        (ins_op, ts)
570    }
571
572    fn do_post_update_steps(
573        &self,
574        ts: Instant,
575        key: Arc<K>,
576        old_info: OldEntryInfo<K, V>,
577        upd_op: WriteOp<K, V>,
578    ) -> (WriteOp<K, V>, Instant) {
579        if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
580            (&self.inner.expiration_policy.expiry(), &upd_op)
581        {
582            Self::expire_after_read_or_update(
583                |k, v, t, d| expiry.expire_after_update(k, v, t, d),
584                &key,
585                value_entry,
586                self.inner.expiration_policy.time_to_live(),
587                self.inner.expiration_policy.time_to_idle(),
588                ts,
589                self.inner.clock(),
590            );
591        }
592
593        if self.is_removal_notifier_enabled() {
594            self.inner.notify_upsert(
595                key,
596                &old_info.entry,
597                old_info.last_accessed,
598                old_info.last_modified,
599            );
600        }
601        crossbeam_epoch::pin().flush();
602        (upd_op, ts)
603    }
604
605    #[inline]
606    fn apply_reads_if_needed(&self, inner: &Inner<K, V, S>, now: Instant) {
607        let len = self.read_op_ch.len();
608
609        if let Some(hk) = &self.housekeeper {
610            if Self::should_apply_reads(hk, len, now) {
611                hk.try_run_pending_tasks(inner);
612            }
613        }
614    }
615
616    #[inline]
617    fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
618        hk.should_apply_reads(ch_len, now)
619    }
620
621    #[inline]
622    fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool {
623        hk.should_apply_writes(ch_len, now)
624    }
625}
626
627impl<K, V, S> BaseCache<K, V, S> {
628    #[inline]
629    fn new_value_entry(
630        &self,
631        key: &Arc<K>,
632        hash: u64,
633        value: V,
634        timestamp: Instant,
635        policy_weight: u32,
636    ) -> (MiniArc<ValueEntry<K, V>>, u16) {
637        let key_hash = KeyHash::new(Arc::clone(key), hash);
638        let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight));
639        let gen: u16 = info.entry_gen();
640        (MiniArc::new(ValueEntry::new(value, info)), gen)
641    }
642
643    #[inline]
644    fn new_value_entry_from(
645        &self,
646        value: V,
647        timestamp: Instant,
648        policy_weight: u32,
649        other: &ValueEntry<K, V>,
650    ) -> (MiniArc<ValueEntry<K, V>>, u16) {
651        let info = MiniArc::clone(other.entry_info());
652        // To prevent this updated ValueEntry from being evicted by an expiration
653        // policy, increment the entry generation.
654        let gen = info.incr_entry_gen();
655        info.set_last_accessed(timestamp);
656        info.set_last_modified(timestamp);
657        info.set_policy_weight(policy_weight);
658        (MiniArc::new(ValueEntry::new_from(value, info, other)), gen)
659    }
660
661    fn expire_after_create(
662        expiry: &Arc<dyn Expiry<K, V> + Send + Sync + 'static>,
663        key: &K,
664        value_entry: &ValueEntry<K, V>,
665        ts: Instant,
666        clock: &Clock,
667    ) {
668        let duration =
669            expiry.expire_after_create(key, &value_entry.value, clock.to_std_instant(ts));
670        let expiration_time = duration.map(|duration| ts.saturating_add(duration));
671        value_entry
672            .entry_info()
673            .set_expiration_time(expiration_time);
674    }
675
676    fn expire_after_read_or_update(
677        expiry: impl FnOnce(&K, &V, StdInstant, Option<Duration>) -> Option<Duration>,
678        key: &K,
679        value_entry: &ValueEntry<K, V>,
680        ttl: Option<Duration>,
681        tti: Option<Duration>,
682        ts: Instant,
683        clock: &Clock,
684    ) -> bool {
685        let current_time = clock.to_std_instant(ts);
686        let ei = &value_entry.entry_info();
687
688        let exp_time = IntoIterator::into_iter([
689            ei.expiration_time(),
690            ttl.and_then(|dur| ei.last_modified().map(|ts| ts.saturating_add(dur))),
691            tti.and_then(|dur| ei.last_accessed().map(|ts| ts.saturating_add(dur))),
692        ])
693        .flatten()
694        .min();
695
696        let current_duration = exp_time.and_then(|time| {
697            let std_time = clock.to_std_instant(time);
698            std_time.checked_duration_since(current_time)
699        });
700
701        let duration = expiry(key, &value_entry.value, current_time, current_duration);
702
703        if duration != current_duration {
704            let expiration_time = duration.map(|duration| ts.saturating_add(duration));
705            value_entry
706                .entry_info()
707                .set_expiration_time(expiration_time);
708            // The `expiration_time` has changed from `None` to `Some` or vice versa.
709            true
710        } else {
711            false
712        }
713    }
714}
715
716//
717// for testing
718//
719#[cfg(test)]
720impl<K, V, S> BaseCache<K, V, S>
721where
722    K: Hash + Eq + Send + Sync + 'static,
723    V: Clone + Send + Sync + 'static,
724    S: BuildHasher + Clone + Send + Sync + 'static,
725{
726    pub(crate) fn invalidation_predicate_count(&self) -> usize {
727        self.inner.invalidation_predicate_count()
728    }
729
730    pub(crate) fn reconfigure_for_testing(&mut self) {
731        // Enable the frequency sketch.
732        self.inner.enable_frequency_sketch_for_testing();
733        // Disable auto clean up of pending tasks.
734        if let Some(hk) = &self.housekeeper {
735            hk.disable_auto_run();
736        }
737    }
738
739    pub(crate) fn key_locks_map_is_empty(&self) -> bool {
740        self.inner.key_locks_map_is_empty()
741    }
742}
743
744struct EvictionState<'a, K, V> {
745    counters: EvictionCounters,
746    notifier: Option<&'a RemovalNotifier<K, V>>,
747    more_entries_to_evict: bool,
748}
749
750impl<'a, K, V> EvictionState<'a, K, V> {
751    fn new(
752        entry_count: u64,
753        weighted_size: u64,
754        notifier: Option<&'a RemovalNotifier<K, V>>,
755    ) -> Self {
756        Self {
757            counters: EvictionCounters::new(entry_count, weighted_size),
758            notifier,
759            more_entries_to_evict: false,
760        }
761    }
762
763    fn is_notifier_enabled(&self) -> bool {
764        self.notifier.is_some()
765    }
766
767    fn notify_entry_removal(
768        &mut self,
769        key: Arc<K>,
770        entry: &MiniArc<ValueEntry<K, V>>,
771        cause: RemovalCause,
772    ) where
773        K: Send + Sync + 'static,
774        V: Clone + Send + Sync + 'static,
775    {
776        if let Some(notifier) = self.notifier {
777            notifier.notify(key, entry.value.clone(), cause);
778        } else {
779            panic!("notify_entry_removal is called when the notification is disabled");
780        }
781    }
782}
783
784struct EvictionCounters {
785    entry_count: u64,
786    weighted_size: u64,
787    eviction_count: u64,
788}
789
790impl EvictionCounters {
791    #[inline]
792    fn new(entry_count: u64, weighted_size: u64) -> Self {
793        Self {
794            entry_count,
795            weighted_size,
796            eviction_count: 0,
797        }
798    }
799
800    #[inline]
801    fn saturating_add(&mut self, entry_count: u64, weight: u32) {
802        self.entry_count += entry_count;
803        let total = &mut self.weighted_size;
804        *total = total.saturating_add(weight as u64);
805    }
806
807    #[inline]
808    fn saturating_sub(&mut self, entry_count: u64, weight: u32) {
809        self.entry_count -= entry_count;
810        let total = &mut self.weighted_size;
811        *total = total.saturating_sub(weight as u64);
812    }
813
814    #[inline]
815    fn incr_eviction_count(&mut self) {
816        let count = &mut self.eviction_count;
817        *count = count.saturating_add(1);
818    }
819}
820
821#[derive(Default)]
822struct EntrySizeAndFrequency {
823    policy_weight: u64,
824    freq: u32,
825}
826
827impl EntrySizeAndFrequency {
828    fn new(policy_weight: u32) -> Self {
829        Self {
830            policy_weight: policy_weight as u64,
831            ..Default::default()
832        }
833    }
834
835    fn add_policy_weight(&mut self, weight: u32) {
836        self.policy_weight += weight as u64;
837    }
838
839    fn add_frequency(&mut self, freq: &FrequencySketch, hash: u64) {
840        self.freq += freq.frequency(hash) as u32;
841    }
842}
843
844// NOTE: Clippy found that the `Admitted` variant contains at least a few hundred
845// bytes of data and the `Rejected` variant contains no data at all. It suggested to
846// box the `SmallVec`.
847//
848// We ignore the suggestion because (1) the `SmallVec` is used to avoid heap
849// allocation as it will be used in a performance hot spot, and (2) this enum has a
850// very short lifetime and there will only one instance at a time.
851#[allow(clippy::large_enum_variant)]
852enum AdmissionResult<K> {
853    Admitted {
854        /// A vec of pairs of `KeyHash` and `last_accessed`.
855        victim_keys: SmallVec<[(KeyHash<K>, Option<Instant>); 8]>,
856    },
857    Rejected,
858}
859
860type CacheStore<K, V, S> = crate::cht::SegmentedHashMap<Arc<K>, MiniArc<ValueEntry<K, V>>, S>;
861
862pub(crate) struct Inner<K, V, S> {
863    name: Option<String>,
864    max_capacity: Option<u64>,
865    entry_count: AtomicCell<u64>,
866    weighted_size: AtomicCell<u64>,
867    pub(crate) cache: CacheStore<K, V, S>,
868    build_hasher: S,
869    deques: Mutex<Deques<K>>,
870    timer_wheel: Mutex<TimerWheel<K>>,
871    frequency_sketch: RwLock<FrequencySketch>,
872    frequency_sketch_enabled: AtomicBool,
873    read_op_ch: Receiver<ReadOp<K, V>>,
874    write_op_ch: Receiver<WriteOp<K, V>>,
875    eviction_policy: EvictionPolicyConfig,
876    expiration_policy: ExpirationPolicy<K, V>,
877    valid_after: AtomicInstant,
878    weigher: Option<Weigher<K, V>>,
879    removal_notifier: Option<RemovalNotifier<K, V>>,
880    key_locks: Option<KeyLockMap<K, S>>,
881    invalidator: Option<Invalidator<K, V, S>>,
882    clock: Clock,
883}
884
885impl<K, V, S> Drop for Inner<K, V, S> {
886    fn drop(&mut self) {
887        // Ensure crossbeam-epoch to collect garbages (`deferred_fn`s) in the
888        // global bag so that previously cached values will be dropped.
889        for _ in 0..128 {
890            crossbeam_epoch::pin().flush();
891        }
892
893        // NOTE: The `CacheStore` (`cht`) will be dropped after returning from this
894        // `drop` method. It uses crossbeam-epoch internally, but we do not have to
895        // call `flush` for it because its `drop` methods do not create
896        // `deferred_fn`s, and drop its values in place.
897    }
898}
899
900//
901// functions/methods used by BaseCache
902//
903
904impl<K, V, S> Inner<K, V, S> {
905    fn name(&self) -> Option<&str> {
906        self.name.as_deref()
907    }
908
909    fn policy(&self) -> Policy {
910        let exp = &self.expiration_policy;
911        Policy::new(self.max_capacity, 1, exp.time_to_live(), exp.time_to_idle())
912    }
913
914    #[inline]
915    fn entry_count(&self) -> u64 {
916        self.entry_count.load()
917    }
918
919    #[inline]
920    fn weighted_size(&self) -> u64 {
921        self.weighted_size.load()
922    }
923
924    #[inline]
925    pub(crate) fn is_removal_notifier_enabled(&self) -> bool {
926        self.removal_notifier.is_some()
927    }
928
929    pub(crate) fn maybe_key_lock(&self, key: &Arc<K>) -> Option<KeyLock<'_, K, S>>
930    where
931        K: Hash + Eq,
932        S: BuildHasher,
933    {
934        self.key_locks.as_ref().map(|kls| kls.key_lock(key))
935    }
936
937    #[inline]
938    fn current_time(&self) -> Instant {
939        self.clock.now()
940    }
941
942    fn clock(&self) -> &Clock {
943        &self.clock
944    }
945
946    fn num_cht_segments(&self) -> usize {
947        self.cache.actual_num_segments()
948    }
949
950    #[inline]
951    fn time_to_live(&self) -> Option<Duration> {
952        self.expiration_policy.time_to_live()
953    }
954
955    #[inline]
956    fn time_to_idle(&self) -> Option<Duration> {
957        self.expiration_policy.time_to_idle()
958    }
959
960    #[inline]
961    fn has_expiry(&self) -> bool {
962        let exp = &self.expiration_policy;
963        exp.time_to_live().is_some() || exp.time_to_idle().is_some()
964    }
965
966    #[inline]
967    fn is_write_order_queue_enabled(&self) -> bool {
968        self.expiration_policy.time_to_live().is_some() || self.invalidator.is_some()
969    }
970
971    #[inline]
972    fn valid_after(&self) -> Option<Instant> {
973        self.valid_after.instant()
974    }
975
976    #[inline]
977    fn set_valid_after(&self, timestamp: Instant) {
978        self.valid_after.set_instant(timestamp);
979    }
980
981    #[inline]
982    fn has_valid_after(&self) -> bool {
983        self.valid_after.is_set()
984    }
985}
986
987impl<K, V, S> Inner<K, V, S>
988where
989    K: Hash + Eq + Send + Sync + 'static,
990    V: Send + Sync + 'static,
991    S: BuildHasher + Clone,
992{
993    // Disable a Clippy warning for having more than seven arguments.
994    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
995    #[allow(clippy::too_many_arguments)]
996    fn new(
997        name: Option<String>,
998        max_capacity: Option<u64>,
999        initial_capacity: Option<usize>,
1000        build_hasher: S,
1001        weigher: Option<Weigher<K, V>>,
1002        eviction_policy: EvictionPolicy,
1003        eviction_listener: Option<EvictionListener<K, V>>,
1004        read_op_ch: Receiver<ReadOp<K, V>>,
1005        write_op_ch: Receiver<WriteOp<K, V>>,
1006        expiration_policy: ExpirationPolicy<K, V>,
1007        invalidator_enabled: bool,
1008        clock: Clock,
1009    ) -> Self {
1010        // TODO: Calculate the number of segments based on the max capacity and the
1011        // number of CPUs.
1012        let (num_segments, initial_capacity) = if max_capacity == Some(0) {
1013            (1, 0)
1014        } else {
1015            let ic = initial_capacity
1016                .map(|cap| cap + WRITE_LOG_CH_SIZE)
1017                .unwrap_or_default();
1018            (64, ic)
1019        };
1020        let cache = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
1021            num_segments,
1022            initial_capacity,
1023            build_hasher.clone(),
1024        );
1025
1026        let now = clock.now();
1027        let timer_wheel = Mutex::new(TimerWheel::new(now));
1028
1029        let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener {
1030            let rn = RemovalNotifier::new(listener, name.clone());
1031            let kl = KeyLockMap::with_hasher(build_hasher.clone());
1032            (Some(rn), Some(kl))
1033        } else {
1034            (None, None)
1035        };
1036
1037        let invalidator = if invalidator_enabled {
1038            Some(Invalidator::new(build_hasher.clone()))
1039        } else {
1040            None
1041        };
1042
1043        Self {
1044            name,
1045            max_capacity,
1046            entry_count: AtomicCell::default(),
1047            weighted_size: AtomicCell::default(),
1048            cache,
1049            build_hasher,
1050            deques: Mutex::default(),
1051            timer_wheel,
1052            frequency_sketch: RwLock::new(FrequencySketch::default()),
1053            frequency_sketch_enabled: AtomicBool::default(),
1054            read_op_ch,
1055            write_op_ch,
1056            eviction_policy: eviction_policy.config,
1057            expiration_policy,
1058            valid_after: AtomicInstant::default(),
1059            weigher,
1060            removal_notifier,
1061            key_locks,
1062            invalidator,
1063            clock,
1064        }
1065    }
1066
1067    #[inline]
1068    fn hash<Q>(&self, key: &Q) -> u64
1069    where
1070        K: Borrow<Q>,
1071        Q: Hash + Eq + ?Sized,
1072    {
1073        let mut hasher = self.build_hasher.build_hasher();
1074        key.hash(&mut hasher);
1075        hasher.finish()
1076    }
1077
1078    #[inline]
1079    fn get_key_value_and<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1080    where
1081        K: Borrow<Q>,
1082        Q: Hash + Eq + ?Sized,
1083        F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> T,
1084    {
1085        self.cache
1086            .get_key_value_and(hash, |k| (k as &K).borrow() == key, with_entry)
1087    }
1088
1089    #[inline]
1090    fn get_key_value_and_then<Q, F, T>(&self, key: &Q, hash: u64, with_entry: F) -> Option<T>
1091    where
1092        K: Borrow<Q>,
1093        Q: Hash + Eq + ?Sized,
1094        F: FnOnce(&Arc<K>, &MiniArc<ValueEntry<K, V>>) -> Option<T>,
1095    {
1096        self.cache
1097            .get_key_value_and_then(hash, |k| (k as &K).borrow() == key, with_entry)
1098    }
1099
1100    #[inline]
1101    fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
1102    where
1103        K: Borrow<Q>,
1104        Q: Hash + Eq + ?Sized,
1105    {
1106        self.cache
1107            .remove_entry(hash, |k| (k as &K).borrow() == key)
1108            .map(|(key, entry)| KvEntry::new(key, entry))
1109    }
1110
1111    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1112        // Do `Arc::clone` instead of `Arc::downgrade`. Updating existing entry
1113        // in the cht with a new value replaces the key in the cht even though the
1114        // old and new keys are equal. If we return `Weak<K>`, it will not be
1115        // upgraded later to `Arc<K> as the key may have been replaced with a new
1116        // key that equals to the old key.
1117        self.cache.keys(cht_segment, Arc::clone)
1118    }
1119
1120    #[inline]
1121    fn register_invalidation_predicate(
1122        &self,
1123        predicate: PredicateFun<K, V>,
1124        registered_at: Instant,
1125    ) -> Result<PredicateId, PredicateError> {
1126        if let Some(inv) = &self.invalidator {
1127            inv.register_predicate(predicate, registered_at)
1128        } else {
1129            Err(PredicateError::InvalidationClosuresDisabled)
1130        }
1131    }
1132
1133    /// Returns `true` if the entry is invalidated by `invalidate_entries_if` method.
1134    #[inline]
1135    fn is_invalidated_entry(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
1136    where
1137        V: Clone,
1138    {
1139        if let Some(inv) = &self.invalidator {
1140            return inv.apply_predicates(key, entry);
1141        }
1142        false
1143    }
1144
1145    #[inline]
1146    fn weigh(&self, key: &K, value: &V) -> u32 {
1147        self.weigher.as_ref().map_or(1, |w| w(key, value))
1148    }
1149}
1150
1151impl<K, V, S> InnerSync for Inner<K, V, S>
1152where
1153    K: Hash + Eq + Send + Sync + 'static,
1154    V: Clone + Send + Sync + 'static,
1155    S: BuildHasher + Clone + Send + Sync + 'static,
1156{
1157    fn run_pending_tasks(
1158        &self,
1159        timeout: Option<Duration>,
1160        max_log_sync_repeats: u32,
1161        eviction_batch_size: u32,
1162    ) -> bool {
1163        self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size)
1164    }
1165
1166    fn now(&self) -> Instant {
1167        self.current_time()
1168    }
1169}
1170
1171impl<K, V, S> Inner<K, V, S>
1172where
1173    K: Hash + Eq + Send + Sync + 'static,
1174    V: Clone + Send + Sync + 'static,
1175    S: BuildHasher + Clone + Send + Sync + 'static,
1176{
1177    fn do_run_pending_tasks(
1178        &self,
1179        timeout: Option<Duration>,
1180        max_log_sync_repeats: u32,
1181        eviction_batch_size: u32,
1182    ) -> bool {
1183        if self.max_capacity == Some(0) {
1184            return false;
1185        }
1186
1187        // Acquire some locks.
1188        let mut deqs = self.deques.lock();
1189        let mut timer_wheel = self.timer_wheel.lock();
1190
1191        let started_at = if timeout.is_some() {
1192            Some(self.current_time())
1193        } else {
1194            None
1195        };
1196        let mut should_process_logs = true;
1197        let mut calls = 0u32;
1198        let current_ec = self.entry_count.load();
1199        let current_ws = self.weighted_size.load();
1200        let mut eviction_state =
1201            EvictionState::new(current_ec, current_ws, self.removal_notifier.as_ref());
1202
1203        loop {
1204            if should_process_logs {
1205                let r_len = self.read_op_ch.len();
1206                if r_len > 0 {
1207                    self.apply_reads(&mut deqs, &mut timer_wheel, r_len);
1208                }
1209
1210                let w_len = self.write_op_ch.len();
1211                if w_len > 0 {
1212                    self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state);
1213                }
1214
1215                if self.eviction_policy == EvictionPolicyConfig::TinyLfu
1216                    && self.should_enable_frequency_sketch(&eviction_state.counters)
1217                {
1218                    self.enable_frequency_sketch(&eviction_state.counters);
1219                }
1220
1221                calls += 1;
1222            }
1223
1224            // Set this flag to `false`. The `evict_*` and `invalidate_*` methods
1225            // below may set it to `true` if there are more entries to evict in next
1226            // loop.
1227            eviction_state.more_entries_to_evict = false;
1228            let last_eviction_count = eviction_state.counters.eviction_count;
1229
1230            // Evict entries if there are any expired entries in the hierarchical
1231            // timer wheels.
1232            if timer_wheel.is_enabled() {
1233                self.evict_expired_entries_using_timers(
1234                    &mut timer_wheel,
1235                    &mut deqs,
1236                    &mut eviction_state,
1237                );
1238            }
1239
1240            // Evict entries if there are any expired entries in the write order or
1241            // access order deques.
1242            if self.has_expiry() || self.has_valid_after() {
1243                self.evict_expired_entries_using_deqs(
1244                    &mut deqs,
1245                    &mut timer_wheel,
1246                    eviction_batch_size,
1247                    &mut eviction_state,
1248                );
1249            }
1250
1251            // Evict entries if there are any invalidation predicates set by the
1252            // `invalidate_entries_if` method.
1253            if let Some(invalidator) = &self.invalidator {
1254                if !invalidator.is_empty() {
1255                    self.invalidate_entries(
1256                        invalidator,
1257                        &mut deqs,
1258                        &mut timer_wheel,
1259                        eviction_batch_size,
1260                        &mut eviction_state,
1261                    );
1262                }
1263            }
1264
1265            // Evict if this cache has more entries than its capacity.
1266            let weights_to_evict = self.weights_to_evict(&eviction_state.counters);
1267            if weights_to_evict > 0 {
1268                self.evict_lru_entries(
1269                    &mut deqs,
1270                    &mut timer_wheel,
1271                    eviction_batch_size,
1272                    weights_to_evict,
1273                    &mut eviction_state,
1274                );
1275            }
1276
1277            // Check whether to continue this loop or not.
1278
1279            should_process_logs = calls <= max_log_sync_repeats
1280                && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT
1281                    || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT);
1282
1283            let should_evict_more_entries = eviction_state.more_entries_to_evict
1284                // Check if there were any entries evicted in this loop.
1285                && (eviction_state.counters.eviction_count - last_eviction_count) > 0;
1286
1287            // Break the loop if there will be nothing to do in next loop.
1288            if !should_process_logs && !should_evict_more_entries {
1289                break;
1290            }
1291
1292            // Break the loop if the eviction listener is set and timeout has been
1293            // reached.
1294            if let (Some(to), Some(started)) = (timeout, started_at) {
1295                let elapsed = self.current_time().saturating_duration_since(started);
1296                if elapsed >= to {
1297                    break;
1298                }
1299            }
1300        }
1301
1302        debug_assert_eq!(self.entry_count.load(), current_ec);
1303        debug_assert_eq!(self.weighted_size.load(), current_ws);
1304        self.entry_count.store(eviction_state.counters.entry_count);
1305        self.weighted_size
1306            .store(eviction_state.counters.weighted_size);
1307
1308        crossbeam_epoch::pin().flush();
1309
1310        // Ensure the deqs lock is held until here.
1311        drop(deqs);
1312
1313        eviction_state.more_entries_to_evict
1314    }
1315}
1316
1317//
1318// private methods
1319//
1320impl<K, V, S> Inner<K, V, S>
1321where
1322    K: Hash + Eq + Send + Sync + 'static,
1323    V: Send + Sync + 'static,
1324    S: BuildHasher + Clone + Send + Sync + 'static,
1325{
1326    fn has_enough_capacity(&self, candidate_weight: u32, counters: &EvictionCounters) -> bool {
1327        self.max_capacity.map_or(true, |limit| {
1328            counters.weighted_size + candidate_weight as u64 <= limit
1329        })
1330    }
1331
1332    fn weights_to_evict(&self, counters: &EvictionCounters) -> u64 {
1333        self.max_capacity
1334            .map(|limit| counters.weighted_size.saturating_sub(limit))
1335            .unwrap_or_default()
1336    }
1337
1338    #[inline]
1339    fn should_enable_frequency_sketch(&self, counters: &EvictionCounters) -> bool {
1340        match self.max_capacity {
1341            None | Some(0) => false,
1342            Some(max_cap) => {
1343                if self.frequency_sketch_enabled.load(Ordering::Acquire) {
1344                    false // The frequency sketch is already enabled.
1345                } else {
1346                    counters.weighted_size >= max_cap / 2
1347                }
1348            }
1349        }
1350    }
1351
1352    #[inline]
1353    fn enable_frequency_sketch(&self, counters: &EvictionCounters) {
1354        if let Some(max_cap) = self.max_capacity {
1355            let c = counters;
1356            let cap = if self.weigher.is_none() {
1357                max_cap
1358            } else {
1359                (c.entry_count as f64 * (c.weighted_size as f64 / max_cap as f64)) as u64
1360            };
1361            self.do_enable_frequency_sketch(cap);
1362        }
1363    }
1364
1365    #[cfg(test)]
1366    fn enable_frequency_sketch_for_testing(&self) {
1367        if let Some(max_cap) = self.max_capacity {
1368            self.do_enable_frequency_sketch(max_cap);
1369        }
1370    }
1371
1372    #[inline]
1373    fn do_enable_frequency_sketch(&self, cache_capacity: u64) {
1374        let skt_capacity = common::sketch_capacity(cache_capacity);
1375        self.frequency_sketch.write().ensure_capacity(skt_capacity);
1376        self.frequency_sketch_enabled.store(true, Ordering::Release);
1377    }
1378
1379    fn apply_reads(&self, deqs: &mut Deques<K>, timer_wheel: &mut TimerWheel<K>, count: usize) {
1380        use ReadOp::{Hit, Miss};
1381        let mut freq = self.frequency_sketch.write();
1382        let ch = &self.read_op_ch;
1383        for _ in 0..count {
1384            match ch.try_recv() {
1385                Ok(Hit {
1386                    value_entry,
1387                    is_expiry_modified,
1388                }) => {
1389                    let kh = value_entry.entry_info().key_hash();
1390                    freq.increment(kh.hash);
1391                    if is_expiry_modified {
1392                        self.update_timer_wheel(&value_entry, timer_wheel);
1393                    }
1394                    deqs.move_to_back_ao(&value_entry);
1395                }
1396                Ok(Miss(hash)) => freq.increment(hash),
1397                Err(_) => break,
1398            }
1399        }
1400    }
1401
1402    fn apply_writes(
1403        &self,
1404        deqs: &mut Deques<K>,
1405        timer_wheel: &mut TimerWheel<K>,
1406        count: usize,
1407        eviction_state: &mut EvictionState<'_, K, V>,
1408    ) where
1409        V: Clone,
1410    {
1411        use WriteOp::{Remove, Upsert};
1412        let freq = self.frequency_sketch.read();
1413        let ch = &self.write_op_ch;
1414
1415        for _ in 0..count {
1416            match ch.try_recv() {
1417                Ok(Upsert {
1418                    key_hash: kh,
1419                    value_entry: entry,
1420                    entry_gen: gen,
1421                    old_weight,
1422                    new_weight,
1423                }) => self.handle_upsert(
1424                    kh,
1425                    entry,
1426                    gen,
1427                    old_weight,
1428                    new_weight,
1429                    deqs,
1430                    timer_wheel,
1431                    &freq,
1432                    eviction_state,
1433                ),
1434                Ok(Remove {
1435                    kv_entry: KvEntry { key: _key, entry },
1436                    entry_gen: gen,
1437                }) => {
1438                    Self::handle_remove(
1439                        deqs,
1440                        timer_wheel,
1441                        entry,
1442                        Some(gen),
1443                        &mut eviction_state.counters,
1444                    );
1445                }
1446                Err(_) => break,
1447            };
1448        }
1449    }
1450
1451    #[allow(clippy::too_many_arguments)]
1452    fn handle_upsert(
1453        &self,
1454        kh: KeyHash<K>,
1455        entry: MiniArc<ValueEntry<K, V>>,
1456        gen: u16,
1457        old_weight: u32,
1458        new_weight: u32,
1459        deqs: &mut Deques<K>,
1460        timer_wheel: &mut TimerWheel<K>,
1461        freq: &FrequencySketch,
1462        eviction_state: &mut EvictionState<'_, K, V>,
1463    ) where
1464        V: Clone,
1465    {
1466        {
1467            let counters = &mut eviction_state.counters;
1468
1469            if entry.is_admitted() {
1470                // The entry has been already admitted, so treat this as an update.
1471                counters.saturating_sub(0, old_weight);
1472                counters.saturating_add(0, new_weight);
1473                self.update_timer_wheel(&entry, timer_wheel);
1474                deqs.move_to_back_ao(&entry);
1475                deqs.move_to_back_wo(&entry);
1476                entry.entry_info().set_policy_gen(gen);
1477                return;
1478            }
1479
1480            if self.has_enough_capacity(new_weight, counters) {
1481                // There are enough room in the cache (or the cache is unbounded).
1482                // Add the candidate to the deques.
1483                self.handle_admit(&entry, new_weight, deqs, timer_wheel, counters);
1484                entry.entry_info().set_policy_gen(gen);
1485                return;
1486            }
1487        }
1488
1489        if let Some(max) = self.max_capacity {
1490            if new_weight as u64 > max {
1491                // The candidate is too big to fit in the cache. Reject it.
1492
1493                // Lock the key for removal if blocking removal notification is enabled.
1494                let kl = self.maybe_key_lock(&kh.key);
1495                let _klg = &kl.as_ref().map(|kl| kl.lock());
1496
1497                let removed = self.cache.remove_if(
1498                    kh.hash,
1499                    |k| k == &kh.key,
1500                    |_, current_entry| {
1501                        MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1502                            && current_entry.entry_info().entry_gen() == gen
1503                    },
1504                );
1505                if let Some(entry) = removed {
1506                    if eviction_state.is_notifier_enabled() {
1507                        let key = Arc::clone(&kh.key);
1508                        eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
1509                    }
1510                    eviction_state.counters.incr_eviction_count();
1511                }
1512                entry.entry_info().set_policy_gen(gen);
1513                return;
1514            }
1515        }
1516
1517        // TODO: Refactoring the policy implementations.
1518        // https://github.com/moka-rs/moka/issues/389
1519
1520        // Try to admit the candidate.
1521        let admission_result = match &self.eviction_policy {
1522            EvictionPolicyConfig::TinyLfu => {
1523                let mut candidate = EntrySizeAndFrequency::new(new_weight);
1524                candidate.add_frequency(freq, kh.hash);
1525                Self::admit(&candidate, &self.cache, deqs, freq)
1526            }
1527            EvictionPolicyConfig::Lru => AdmissionResult::Admitted {
1528                victim_keys: SmallVec::default(),
1529            },
1530        };
1531
1532        match admission_result {
1533            AdmissionResult::Admitted { victim_keys } => {
1534                // Try to remove the victims from the hash map.
1535                for (vic_kh, vic_la) in victim_keys {
1536                    let vic_key = vic_kh.key;
1537                    let vic_hash = vic_kh.hash;
1538
1539                    // Lock the key for removal if blocking removal notification is enabled.
1540                    let kl = self.maybe_key_lock(&vic_key);
1541                    let _klg = &kl.as_ref().map(|kl| kl.lock());
1542
1543                    if let Some((vic_key, vic_entry)) = self.cache.remove_entry_if_and(
1544                        vic_hash,
1545                        |k| k == &vic_key,
1546                        |_, entry| entry.entry_info().last_accessed() == vic_la,
1547                        |k, v| (k.clone(), v.clone()),
1548                    ) {
1549                        if eviction_state.is_notifier_enabled() {
1550                            eviction_state.notify_entry_removal(
1551                                vic_key,
1552                                &vic_entry,
1553                                RemovalCause::Size,
1554                            );
1555                        }
1556                        eviction_state.counters.incr_eviction_count();
1557                        // And then remove the victim from the deques.
1558                        Self::handle_remove(
1559                            deqs,
1560                            timer_wheel,
1561                            vic_entry,
1562                            None,
1563                            &mut eviction_state.counters,
1564                        );
1565                    } else {
1566                        // Could not remove the victim from the cache. Skip it as its
1567                        // ValueEntry might have been invalidated.
1568                        if let Some(node) = deqs.probation.peek_front() {
1569                            if node.element.key() == &vic_key && node.element.hash() == vic_hash {
1570                                deqs.probation.move_front_to_back();
1571                            }
1572                        }
1573                    }
1574                }
1575
1576                // Add the candidate to the deques.
1577                self.handle_admit(
1578                    &entry,
1579                    new_weight,
1580                    deqs,
1581                    timer_wheel,
1582                    &mut eviction_state.counters,
1583                );
1584                entry.entry_info().set_policy_gen(gen);
1585            }
1586            AdmissionResult::Rejected => {
1587                // Lock the key for removal if blocking removal notification is enabled.
1588                let kl = self.maybe_key_lock(&kh.key);
1589                let _klg = &kl.as_ref().map(|kl| kl.lock());
1590
1591                // Remove the candidate from the cache (hash map) if the entry
1592                // generation matches.
1593                let key = Arc::clone(&kh.key);
1594                let removed = self.cache.remove_if(
1595                    kh.hash,
1596                    |k| k == &key,
1597                    |_, current_entry| {
1598                        MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info())
1599                            && current_entry.entry_info().entry_gen() == gen
1600                    },
1601                );
1602
1603                if let Some(entry) = removed {
1604                    entry.entry_info().set_policy_gen(gen);
1605                    if eviction_state.is_notifier_enabled() {
1606                        eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
1607                    }
1608                    eviction_state.counters.incr_eviction_count();
1609                }
1610            }
1611        };
1612    }
1613
1614    /// Performs size-aware admission explained in the paper:
1615    /// [Lightweight Robust Size Aware Cache Management][size-aware-cache-paper]
1616    /// by Gil Einziger, Ohad Eytan, Roy Friedman, Ben Manes.
1617    ///
1618    /// [size-aware-cache-paper]: https://arxiv.org/abs/2105.08770
1619    ///
1620    /// There are some modifications in this implementation:
1621    /// - To admit to the main space, candidate's frequency must be higher than
1622    ///   the aggregated frequencies of the potential victims. (In the paper,
1623    ///   `>=` operator is used rather than `>`)  The `>` operator will do a better
1624    ///   job to prevent the main space from polluting.
1625    /// - When a candidate is rejected, the potential victims will stay at the LRU
1626    ///   position of the probation access-order queue. (In the paper, they will be
1627    ///   promoted (to the MRU position?) to force the eviction policy to select a
1628    ///   different set of victims for the next candidate). We may implement the
1629    ///   paper's behavior later?
1630    ///
1631    #[inline]
1632    fn admit(
1633        candidate: &EntrySizeAndFrequency,
1634        cache: &CacheStore<K, V, S>,
1635        deqs: &mut Deques<K>,
1636        freq: &FrequencySketch,
1637    ) -> AdmissionResult<K> {
1638        const MAX_CONSECUTIVE_RETRIES: usize = 5;
1639        let mut retries = 0;
1640
1641        let mut victims = EntrySizeAndFrequency::default();
1642        let mut victim_keys = SmallVec::default();
1643
1644        let deq = &mut deqs.probation;
1645
1646        // Get first potential victim at the LRU position.
1647        let mut next_victim = deq.peek_front_ptr();
1648
1649        // Aggregate potential victims.
1650        while victims.policy_weight < candidate.policy_weight
1651            && victims.freq <= candidate.freq
1652            && retries <= MAX_CONSECUTIVE_RETRIES
1653        {
1654            let Some(victim) = next_victim.take() else {
1655                // No more potential victims.
1656                break;
1657            };
1658            next_victim = DeqNode::next_node_ptr(victim);
1659
1660            let vic_elem = &unsafe { victim.as_ref() }.element;
1661            if vic_elem.is_dirty() {
1662                // Skip this node as its ValueEntry have been updated or invalidated.
1663                unsafe { deq.move_to_back(victim) };
1664                retries += 1;
1665                continue;
1666            }
1667
1668            let key = vic_elem.key();
1669            let hash = vic_elem.hash();
1670            let last_accessed = vic_elem.entry_info().last_accessed();
1671
1672            if let Some(vic_entry) = cache.get(hash, |k| k == key) {
1673                victims.add_policy_weight(vic_entry.policy_weight());
1674                victims.add_frequency(freq, hash);
1675                victim_keys.push((KeyHash::new(Arc::clone(key), hash), last_accessed));
1676                retries = 0;
1677            } else {
1678                // Could not get the victim from the cache (hash map). Skip this node
1679                // as its ValueEntry might have been invalidated (after we checked
1680                // `is_dirty` above`).
1681                unsafe { deq.move_to_back(victim) };
1682                retries += 1;
1683            }
1684        }
1685
1686        // Admit or reject the candidate.
1687
1688        // TODO: Implement some randomness to mitigate hash DoS attack.
1689        // See Caffeine's implementation.
1690
1691        if victims.policy_weight >= candidate.policy_weight && candidate.freq > victims.freq {
1692            AdmissionResult::Admitted { victim_keys }
1693        } else {
1694            AdmissionResult::Rejected
1695        }
1696    }
1697
1698    fn handle_admit(
1699        &self,
1700        entry: &MiniArc<ValueEntry<K, V>>,
1701        policy_weight: u32,
1702        deqs: &mut Deques<K>,
1703        timer_wheel: &mut TimerWheel<K>,
1704        counters: &mut EvictionCounters,
1705    ) {
1706        counters.saturating_add(1, policy_weight);
1707
1708        self.update_timer_wheel(entry, timer_wheel);
1709
1710        // Update the deques.
1711        deqs.push_back_ao(
1712            CacheRegion::MainProbation,
1713            KeyHashDate::new(entry.entry_info()),
1714            entry,
1715        );
1716        if self.is_write_order_queue_enabled() {
1717            deqs.push_back_wo(KeyHashDate::new(entry.entry_info()), entry);
1718        }
1719        entry.set_admitted(true);
1720    }
1721
1722    /// NOTE: This method may enable the timer wheel.
1723    fn update_timer_wheel(
1724        &self,
1725        entry: &MiniArc<ValueEntry<K, V>>,
1726        timer_wheel: &mut TimerWheel<K>,
1727    ) {
1728        // Enable the timer wheel if needed.
1729        if entry.entry_info().expiration_time().is_some() && !timer_wheel.is_enabled() {
1730            timer_wheel.enable();
1731        }
1732
1733        // Update the timer wheel.
1734        match (
1735            entry.entry_info().expiration_time().is_some(),
1736            entry.timer_node(),
1737        ) {
1738            // Do nothing; the cache entry has no expiration time and not registered
1739            // to the timer wheel.
1740            (false, None) => (),
1741            // Register the cache entry to the timer wheel; the cache entry has an
1742            // expiration time and not registered to the timer wheel.
1743            (true, None) => {
1744                let timer = timer_wheel.schedule(
1745                    MiniArc::clone(entry.entry_info()),
1746                    MiniArc::clone(entry.deq_nodes()),
1747                );
1748                entry.set_timer_node(timer);
1749            }
1750            // Reschedule the cache entry in the timer wheel; the cache entry has an
1751            // expiration time and already registered to the timer wheel.
1752            (true, Some(tn)) => {
1753                let result = timer_wheel.reschedule(tn);
1754                if let ReschedulingResult::Removed(removed_tn) = result {
1755                    // The timer node was removed from the timer wheel because the
1756                    // expiration time has been unset by other thread after we
1757                    // checked.
1758                    entry.set_timer_node(None);
1759                    drop(removed_tn);
1760                }
1761            }
1762            // Unregister the cache entry from the timer wheel; the cache entry has
1763            // no expiration time but registered to the timer wheel.
1764            (false, Some(tn)) => {
1765                entry.set_timer_node(None);
1766                timer_wheel.deschedule(tn);
1767            }
1768        }
1769    }
1770
1771    fn handle_remove(
1772        deqs: &mut Deques<K>,
1773        timer_wheel: &mut TimerWheel<K>,
1774        entry: MiniArc<ValueEntry<K, V>>,
1775        gen: Option<u16>,
1776        counters: &mut EvictionCounters,
1777    ) {
1778        if let Some(timer_node) = entry.take_timer_node() {
1779            timer_wheel.deschedule(timer_node);
1780        }
1781        Self::handle_remove_without_timer_wheel(deqs, entry, gen, counters);
1782    }
1783
1784    fn handle_remove_without_timer_wheel(
1785        deqs: &mut Deques<K>,
1786        entry: MiniArc<ValueEntry<K, V>>,
1787        gen: Option<u16>,
1788        counters: &mut EvictionCounters,
1789    ) {
1790        if entry.is_admitted() {
1791            entry.set_admitted(false);
1792            counters.saturating_sub(1, entry.policy_weight());
1793            // The following two unlink_* functions will unset the deq nodes.
1794            deqs.unlink_ao(&entry);
1795            Deques::unlink_wo(&mut deqs.write_order, &entry);
1796        } else {
1797            entry.unset_q_nodes();
1798        }
1799        if let Some(g) = gen {
1800            entry.entry_info().set_policy_gen(g);
1801        }
1802    }
1803
1804    fn handle_remove_with_deques(
1805        ao_deq_name: &str,
1806        ao_deq: &mut Deque<KeyHashDate<K>>,
1807        wo_deq: &mut Deque<KeyHashDate<K>>,
1808        timer_wheel: &mut TimerWheel<K>,
1809        entry: MiniArc<ValueEntry<K, V>>,
1810        counters: &mut EvictionCounters,
1811    ) {
1812        if let Some(timer) = entry.take_timer_node() {
1813            timer_wheel.deschedule(timer);
1814        }
1815        if entry.is_admitted() {
1816            entry.set_admitted(false);
1817            counters.saturating_sub(1, entry.policy_weight());
1818            // The following two unlink_* functions will unset the deq nodes.
1819            Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
1820            Deques::unlink_wo(wo_deq, &entry);
1821        } else {
1822            entry.unset_q_nodes();
1823        }
1824    }
1825
1826    fn evict_expired_entries_using_timers(
1827        &self,
1828        timer_wheel: &mut TimerWheel<K>,
1829        deqs: &mut Deques<K>,
1830        eviction_state: &mut EvictionState<'_, K, V>,
1831    ) where
1832        V: Clone,
1833    {
1834        use crate::common::timer_wheel::TimerEvent;
1835
1836        let now = self.current_time();
1837
1838        // NOTES:
1839        //
1840        // 1. When necessary, the iterator returned from advance() will unset the
1841        //    timer node pointer in the `ValueEntry`, so we do not have to do it
1842        //    here.
1843        // 2. If an entry is dirty or `cache.remove_if` returns `None`, we will skip
1844        //    it as it may have been read, updated or invalidated by other thread.
1845        //    - The timer node should have been unset in the current `ValueEntry` as
1846        //      described above.
1847        //    - When necessary, a new timer node will be recreated for the current or
1848        //      new `ValueEntry` when its `WriteOp` or `ReadOp` is processed.
1849        for event in timer_wheel.advance(now) {
1850            // We do not have to do anything if event is `TimerEvent::Descheduled(_)`
1851            // or `TimerEvent::Rescheduled(_)`.
1852            if let TimerEvent::Expired(node) = event {
1853                let entry_info = node.element.entry_info();
1854
1855                if entry_info.is_dirty() {
1856                    // Skip this entry as it has been updated or invalidated by other
1857                    // thread.
1858                    continue;
1859                }
1860
1861                let kh = entry_info.key_hash();
1862                let key = &kh.key;
1863                let hash = kh.hash;
1864
1865                // Lock the key for removal if blocking removal notification is
1866                // enabled.
1867                let kl = self.maybe_key_lock(key);
1868                let _klg = &kl.as_ref().map(|kl| kl.lock());
1869
1870                // Remove the key from the map only when the entry is really expired.
1871                let maybe_entry = self.cache.remove_if(
1872                    hash,
1873                    |k| k == key,
1874                    |_, v| is_expired_by_per_entry_ttl(v.entry_info(), now),
1875                );
1876
1877                if let Some(entry) = maybe_entry {
1878                    if eviction_state.is_notifier_enabled() {
1879                        let key = Arc::clone(key);
1880                        eviction_state.notify_entry_removal(key, &entry, RemovalCause::Expired);
1881                    }
1882                    eviction_state.counters.incr_eviction_count();
1883                    Self::handle_remove_without_timer_wheel(
1884                        deqs,
1885                        entry,
1886                        None,
1887                        &mut eviction_state.counters,
1888                    );
1889                } else {
1890                    // Skip this entry as the key may have been read, updated or
1891                    // invalidated by other thread.
1892                }
1893            }
1894        }
1895    }
1896
1897    fn evict_expired_entries_using_deqs(
1898        &self,
1899        deqs: &mut Deques<K>,
1900        timer_wheel: &mut TimerWheel<K>,
1901        batch_size: u32,
1902        state: &mut EvictionState<'_, K, V>,
1903    ) where
1904        V: Clone,
1905    {
1906        use CacheRegion::{MainProbation as Probation, MainProtected as Protected, Window};
1907
1908        let now = self.current_time();
1909
1910        if self.is_write_order_queue_enabled() {
1911            self.remove_expired_wo(deqs, timer_wheel, batch_size, now, state);
1912        }
1913
1914        if self.expiration_policy.time_to_idle().is_some() || self.has_valid_after() {
1915            self.remove_expired_ao(Window, deqs, timer_wheel, batch_size, now, state);
1916            self.remove_expired_ao(Probation, deqs, timer_wheel, batch_size, now, state);
1917            self.remove_expired_ao(Protected, deqs, timer_wheel, batch_size, now, state);
1918        }
1919    }
1920
1921    #[allow(clippy::too_many_arguments)]
1922    #[inline]
1923    fn remove_expired_ao(
1924        &self,
1925        cache_region: CacheRegion,
1926        deqs: &mut Deques<K>,
1927        timer_wheel: &mut TimerWheel<K>,
1928        batch_size: u32,
1929        now: Instant,
1930        eviction_state: &mut EvictionState<'_, K, V>,
1931    ) where
1932        V: Clone,
1933    {
1934        let tti = &self.expiration_policy.time_to_idle();
1935        let va = &self.valid_after();
1936        let deq_name = cache_region.name();
1937        let (ao_deq, wo_deq) = deqs.select_mut(cache_region);
1938        let mut more_to_evict = true;
1939
1940        for _ in 0..batch_size {
1941            let maybe_key_hash_ts = ao_deq.peek_front().map(|node| {
1942                let elem = &node.element;
1943                (
1944                    Arc::clone(elem.key()),
1945                    elem.hash(),
1946                    elem.is_dirty(),
1947                    elem.last_accessed(),
1948                )
1949            });
1950
1951            let (key, hash, cause) = match maybe_key_hash_ts {
1952                Some((key, hash, false, Some(ts))) => {
1953                    let cause = match is_entry_expired_ao_or_invalid(tti, va, ts, now) {
1954                        (true, _) => RemovalCause::Expired,
1955                        (false, true) => RemovalCause::Explicit,
1956                        (false, false) => {
1957                            more_to_evict = false;
1958                            break;
1959                        }
1960                    };
1961                    (key, hash, cause)
1962                }
1963                // TODO: Remove the second pattern `Some((_key, false, None))` once
1964                // we change `last_modified` and `last_accessed` in `EntryInfo` from
1965                // `Option<Instant>` to `Instant`.
1966                Some((key, hash, true, _) | (key, hash, false, None)) => {
1967                    // `is_dirty` is true or `last_modified` is None. Skip this entry
1968                    // as it may have been updated by this or other async task but
1969                    // its `WriteOp` is not processed yet.
1970                    self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
1971                    // Set `more_to_evict` to `false` to make `run_pending_tasks` to
1972                    // return early. This will help that `schedule_write_op` to send
1973                    // the `WriteOp` to the write op channel.
1974                    more_to_evict = false;
1975                    continue;
1976                }
1977                None => {
1978                    more_to_evict = false;
1979                    break;
1980                }
1981            };
1982
1983            // Lock the key for removal if blocking removal notification is enabled.
1984            let kl = self.maybe_key_lock(&key);
1985            let _klg = &kl.as_ref().map(|kl| kl.lock());
1986
1987            // Remove the key from the map only when the entry is really
1988            // expired. This check is needed because it is possible that the entry in
1989            // the map has been updated or deleted but its deque node we checked
1990            // above has not been updated yet.
1991            let maybe_entry = self.cache.remove_if(
1992                hash,
1993                |k| k == &key,
1994                |_, v| is_expired_entry_ao(tti, va, v, now),
1995            );
1996
1997            if let Some(entry) = maybe_entry {
1998                if eviction_state.is_notifier_enabled() {
1999                    eviction_state.notify_entry_removal(key, &entry, cause);
2000                }
2001                eviction_state.counters.incr_eviction_count();
2002                Self::handle_remove_with_deques(
2003                    deq_name,
2004                    ao_deq,
2005                    wo_deq,
2006                    timer_wheel,
2007                    entry,
2008                    &mut eviction_state.counters,
2009                );
2010            } else {
2011                self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2012                more_to_evict = false;
2013            }
2014        }
2015
2016        if more_to_evict {
2017            eviction_state.more_entries_to_evict = true;
2018        }
2019    }
2020
2021    #[inline]
2022    fn skip_updated_entry_ao(
2023        &self,
2024        key: &K,
2025        hash: u64,
2026        deq_name: &str,
2027        deq: &mut Deque<KeyHashDate<K>>,
2028        write_order_deq: &mut Deque<KeyHashDate<K>>,
2029    ) {
2030        if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2031            // The key exists and the entry may have been read or updated by other
2032            // thread.
2033            Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
2034            if entry.is_dirty() {
2035                Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
2036            }
2037        } else {
2038            // Skip this entry as the key may have been invalidated by other thread.
2039            // Since the invalidated ValueEntry (which should be still in the write
2040            // op queue) has a pointer to this node, move the node to the back of the
2041            // deque instead of popping (dropping) it.
2042            deq.move_front_to_back();
2043        }
2044    }
2045
2046    #[inline]
2047    fn skip_updated_entry_wo(&self, key: &K, hash: u64, deqs: &mut Deques<K>) {
2048        if let Some(entry) = self.cache.get(hash, |k| (k.borrow() as &K) == key) {
2049            // The key exists and the entry may have been read or updated by other
2050            // thread.
2051            deqs.move_to_back_ao(&entry);
2052            deqs.move_to_back_wo(&entry);
2053        } else {
2054            // Skip this entry as the key may have been invalidated by other thread.
2055            // Since the invalidated `ValueEntry` (which should be still in the write
2056            // op queue) has a pointer to this node, move the node to the back of the
2057            // deque instead of popping (dropping) it.
2058            deqs.write_order.move_front_to_back();
2059        }
2060    }
2061
2062    #[inline]
2063    fn remove_expired_wo(
2064        &self,
2065        deqs: &mut Deques<K>,
2066        timer_wheel: &mut TimerWheel<K>,
2067        batch_size: u32,
2068        now: Instant,
2069        eviction_state: &mut EvictionState<'_, K, V>,
2070    ) where
2071        V: Clone,
2072    {
2073        let ttl = &self.expiration_policy.time_to_live();
2074        let va = &self.valid_after();
2075        let mut more_to_evict = true;
2076
2077        for _ in 0..batch_size {
2078            let maybe_key_hash_ts = deqs.write_order.peek_front().map(|node| {
2079                let elem = &node.element;
2080                (
2081                    Arc::clone(elem.key()),
2082                    elem.hash(),
2083                    elem.is_dirty(),
2084                    elem.last_modified(),
2085                )
2086            });
2087
2088            let (key, hash, cause) = match maybe_key_hash_ts {
2089                Some((key, hash, false, Some(ts))) => {
2090                    let cause = match is_entry_expired_wo_or_invalid(ttl, va, ts, now) {
2091                        (true, _) => RemovalCause::Expired,
2092                        (false, true) => RemovalCause::Explicit,
2093                        (false, false) => {
2094                            more_to_evict = false;
2095                            break;
2096                        }
2097                    };
2098                    (key, hash, cause)
2099                }
2100                // TODO: Remove the second pattern `Some((_key, false, None))` once
2101                // we change `last_modified` and `last_accessed` in `EntryInfo` from
2102                // `Option<Instant>` to `Instant`.
2103                Some((key, hash, true, _) | (key, hash, false, None)) => {
2104                    self.skip_updated_entry_wo(&key, hash, deqs);
2105                    more_to_evict = false;
2106                    continue;
2107                }
2108                None => {
2109                    more_to_evict = false;
2110                    break;
2111                }
2112            };
2113
2114            // Lock the key for removal if blocking removal notification is enabled.
2115            let kl = self.maybe_key_lock(&key);
2116            let _klg = &kl.as_ref().map(|kl| kl.lock());
2117
2118            let maybe_entry = self.cache.remove_if(
2119                hash,
2120                |k| k == &key,
2121                |_, v| is_expired_entry_wo(ttl, va, v, now),
2122            );
2123
2124            if let Some(entry) = maybe_entry {
2125                if eviction_state.is_notifier_enabled() {
2126                    eviction_state.notify_entry_removal(key, &entry, cause);
2127                }
2128                eviction_state.counters.incr_eviction_count();
2129                Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2130            } else {
2131                self.skip_updated_entry_wo(&key, hash, deqs);
2132                more_to_evict = false;
2133            }
2134        }
2135
2136        if more_to_evict {
2137            eviction_state.more_entries_to_evict = true;
2138        }
2139    }
2140
2141    fn invalidate_entries(
2142        &self,
2143        invalidator: &Invalidator<K, V, S>,
2144        deqs: &mut Deques<K>,
2145        timer_wheel: &mut TimerWheel<K>,
2146        batch_size: u32,
2147        eviction_state: &mut EvictionState<'_, K, V>,
2148    ) where
2149        V: Clone,
2150    {
2151        let now = self.current_time();
2152
2153        // If the write order queue is empty, we are done and can remove the predicates
2154        // that have been registered by now.
2155        if deqs.write_order.len() == 0 {
2156            invalidator.remove_predicates_registered_before(now);
2157            return;
2158        }
2159
2160        let mut candidates = Vec::new();
2161        let mut len = 0;
2162        let has_next;
2163        {
2164            let iter = &mut deqs.write_order.peekable();
2165
2166            while len < batch_size {
2167                if let Some(kd) = iter.next() {
2168                    if !kd.is_dirty() {
2169                        if let Some(ts) = kd.last_modified() {
2170                            let key = kd.key();
2171                            let hash = self.hash(key);
2172                            candidates.push(KeyDateLite::new(key, hash, ts));
2173                            len += 1;
2174                        }
2175                    }
2176                } else {
2177                    break;
2178                }
2179            }
2180
2181            has_next = iter.peek().is_some();
2182        }
2183
2184        if len == 0 {
2185            return;
2186        }
2187
2188        let is_truncated = len == batch_size && has_next;
2189        let (invalidated, is_done) =
2190            invalidator.scan_and_invalidate(self, candidates, is_truncated);
2191
2192        for KvEntry { key: _key, entry } in invalidated {
2193            Self::handle_remove(deqs, timer_wheel, entry, None, &mut eviction_state.counters);
2194        }
2195        if is_done {
2196            deqs.write_order.reset_cursor();
2197        }
2198        if !invalidator.is_empty() {
2199            eviction_state.more_entries_to_evict = true;
2200        }
2201    }
2202
2203    fn evict_lru_entries(
2204        &self,
2205        deqs: &mut Deques<K>,
2206        timer_wheel: &mut TimerWheel<K>,
2207        batch_size: u32,
2208        weights_to_evict: u64,
2209        eviction_state: &mut EvictionState<'_, K, V>,
2210    ) where
2211        V: Clone,
2212    {
2213        const CACHE_REGION: CacheRegion = CacheRegion::MainProbation;
2214        let deq_name = CACHE_REGION.name();
2215        let (ao_deq, wo_deq) = deqs.select_mut(CACHE_REGION);
2216        let mut evicted = 0u64;
2217        let mut more_to_evict = true;
2218
2219        for _ in 0..batch_size {
2220            if evicted >= weights_to_evict {
2221                more_to_evict = false;
2222                break;
2223            }
2224
2225            let maybe_key_hash_ts = ao_deq.peek_front().map(|node| {
2226                let entry_info = node.element.entry_info();
2227                (
2228                    Arc::clone(node.element.key()),
2229                    node.element.hash(),
2230                    entry_info.is_dirty(),
2231                    entry_info.last_accessed(),
2232                )
2233            });
2234
2235            let (key, hash, ts) = match maybe_key_hash_ts {
2236                Some((key, hash, false, Some(ts))) => (key, hash, ts),
2237                // TODO: Remove the second pattern `Some((_key, false, None))` once we change
2238                // `last_modified` and `last_accessed` in `EntryInfo` from `Option<Instant>` to
2239                // `Instant`.
2240                Some((key, hash, true, _) | (key, hash, false, None)) => {
2241                    // `is_dirty` is true or `last_modified` is None. Skip this entry
2242                    // as it may have been updated by this or other async task but
2243                    // its `WriteOp` is not processed yet.
2244                    self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2245                    // Set `more_to_evict` to `false` to make `run_pending_tasks` to
2246                    // return early. This will help that `schedule_write_op` to send
2247                    // the `WriteOp` to the write op channel.
2248                    more_to_evict = false;
2249                    continue;
2250                }
2251                None => {
2252                    more_to_evict = false;
2253                    break;
2254                }
2255            };
2256
2257            // Lock the key for removal if blocking removal notification is enabled.
2258            let kl = self.maybe_key_lock(&key);
2259            let _klg = &kl.as_ref().map(|kl| kl.lock());
2260
2261            let maybe_entry = self.cache.remove_if(
2262                hash,
2263                |k| k == &key,
2264                |_, v| {
2265                    if let Some(la) = v.last_accessed() {
2266                        la == ts
2267                    } else {
2268                        false
2269                    }
2270                },
2271            );
2272
2273            if let Some(entry) = maybe_entry {
2274                if eviction_state.is_notifier_enabled() {
2275                    eviction_state.notify_entry_removal(key, &entry, RemovalCause::Size);
2276                }
2277                eviction_state.counters.incr_eviction_count();
2278                let weight = entry.policy_weight();
2279                Self::handle_remove_with_deques(
2280                    deq_name,
2281                    ao_deq,
2282                    wo_deq,
2283                    timer_wheel,
2284                    entry,
2285                    &mut eviction_state.counters,
2286                );
2287                evicted = evicted.saturating_add(weight as u64);
2288            } else {
2289                self.skip_updated_entry_ao(&key, hash, deq_name, ao_deq, wo_deq);
2290                more_to_evict = false;
2291            }
2292        }
2293
2294        if more_to_evict {
2295            eviction_state.more_entries_to_evict = true;
2296        }
2297    }
2298}
2299
2300impl<K, V, S> Inner<K, V, S>
2301where
2302    K: Send + Sync + 'static,
2303    V: Clone + Send + Sync + 'static,
2304{
2305    pub(crate) fn notify_single_removal(
2306        &self,
2307        key: Arc<K>,
2308        entry: &MiniArc<ValueEntry<K, V>>,
2309        cause: RemovalCause,
2310    ) {
2311        if let Some(notifier) = &self.removal_notifier {
2312            notifier.notify(key, entry.value.clone(), cause);
2313        }
2314    }
2315
2316    #[inline]
2317    fn notify_upsert(
2318        &self,
2319        key: Arc<K>,
2320        entry: &MiniArc<ValueEntry<K, V>>,
2321        last_accessed: Option<Instant>,
2322        last_modified: Option<Instant>,
2323    ) {
2324        let now = self.current_time();
2325        let exp = &self.expiration_policy;
2326
2327        let mut cause = RemovalCause::Replaced;
2328
2329        if let Some(last_accessed) = last_accessed {
2330            if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2331                cause = RemovalCause::Expired;
2332            }
2333        }
2334
2335        if let Some(last_modified) = last_modified {
2336            if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2337                cause = RemovalCause::Expired;
2338            } else if is_invalid_entry(&self.valid_after(), last_modified) {
2339                cause = RemovalCause::Explicit;
2340            }
2341        }
2342
2343        self.notify_single_removal(key, entry, cause);
2344    }
2345
2346    #[inline]
2347    fn notify_invalidate(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) {
2348        let now = self.current_time();
2349        let exp = &self.expiration_policy;
2350
2351        let mut cause = RemovalCause::Explicit;
2352
2353        if let Some(last_accessed) = entry.last_accessed() {
2354            if is_expired_by_tti(&exp.time_to_idle(), last_accessed, now) {
2355                cause = RemovalCause::Expired;
2356            }
2357        }
2358
2359        if let Some(last_modified) = entry.last_modified() {
2360            if is_expired_by_ttl(&exp.time_to_live(), last_modified, now) {
2361                cause = RemovalCause::Expired;
2362            }
2363        }
2364
2365        self.notify_single_removal(Arc::clone(key), entry, cause);
2366    }
2367}
2368
2369//
2370// for testing
2371//
2372#[cfg(test)]
2373impl<K, V, S> Inner<K, V, S>
2374where
2375    K: Hash + Eq,
2376    S: BuildHasher + Clone,
2377{
2378    fn invalidation_predicate_count(&self) -> usize {
2379        if let Some(inv) = &self.invalidator {
2380            inv.predicate_count()
2381        } else {
2382            0
2383        }
2384    }
2385
2386    fn key_locks_map_is_empty(&self) -> bool {
2387        self.key_locks
2388            .as_ref()
2389            .map(|m| m.is_empty())
2390            // If key_locks is None, consider it is empty.
2391            .unwrap_or(true)
2392    }
2393}
2394
2395//
2396// private free-standing functions
2397//
2398
2399/// Returns `true` if this entry is expired by its per-entry TTL.
2400#[inline]
2401fn is_expired_by_per_entry_ttl<K>(entry_info: &MiniArc<EntryInfo<K>>, now: Instant) -> bool {
2402    if let Some(ts) = entry_info.expiration_time() {
2403        ts <= now
2404    } else {
2405        false
2406    }
2407}
2408
2409/// Returns `true` when one of the followings conditions is met:
2410///
2411/// - This entry is expired by the time-to-idle config of this cache instance.
2412/// - Or, it is invalidated by the `invalidate_all` method.
2413#[inline]
2414fn is_expired_entry_ao(
2415    time_to_idle: &Option<Duration>,
2416    valid_after: &Option<Instant>,
2417    entry: &impl AccessTime,
2418    now: Instant,
2419) -> bool {
2420    if let Some(ts) = entry.last_accessed() {
2421        is_invalid_entry(valid_after, ts) || is_expired_by_tti(time_to_idle, ts, now)
2422    } else {
2423        false
2424    }
2425}
2426
2427/// Returns `true` when one of the following conditions is met:
2428///
2429/// - This entry is expired by the time-to-live (TTL) config of this cache instance.
2430/// - Or, it is invalidated by the `invalidate_all` method.
2431#[inline]
2432fn is_expired_entry_wo(
2433    time_to_live: &Option<Duration>,
2434    valid_after: &Option<Instant>,
2435    entry: &impl AccessTime,
2436    now: Instant,
2437) -> bool {
2438    if let Some(ts) = entry.last_modified() {
2439        is_invalid_entry(valid_after, ts) || is_expired_by_ttl(time_to_live, ts, now)
2440    } else {
2441        false
2442    }
2443}
2444
2445#[inline]
2446fn is_entry_expired_ao_or_invalid(
2447    time_to_idle: &Option<Duration>,
2448    valid_after: &Option<Instant>,
2449    entry_last_accessed: Instant,
2450    now: Instant,
2451) -> (bool, bool) {
2452    let ts = entry_last_accessed;
2453    let expired = is_expired_by_tti(time_to_idle, ts, now);
2454    let invalid = is_invalid_entry(valid_after, ts);
2455    (expired, invalid)
2456}
2457
2458#[inline]
2459fn is_entry_expired_wo_or_invalid(
2460    time_to_live: &Option<Duration>,
2461    valid_after: &Option<Instant>,
2462    entry_last_modified: Instant,
2463    now: Instant,
2464) -> (bool, bool) {
2465    let ts = entry_last_modified;
2466    let expired = is_expired_by_ttl(time_to_live, ts, now);
2467    let invalid = is_invalid_entry(valid_after, ts);
2468    (expired, invalid)
2469}
2470
2471#[inline]
2472fn is_invalid_entry(valid_after: &Option<Instant>, entry_ts: Instant) -> bool {
2473    if let Some(va) = valid_after {
2474        entry_ts < *va
2475    } else {
2476        false
2477    }
2478}
2479
2480#[inline]
2481fn is_expired_by_tti(
2482    time_to_idle: &Option<Duration>,
2483    entry_last_accessed: Instant,
2484    now: Instant,
2485) -> bool {
2486    if let Some(tti) = time_to_idle {
2487        let expiration = entry_last_accessed.saturating_add(*tti);
2488        expiration <= now
2489    } else {
2490        false
2491    }
2492}
2493
2494#[inline]
2495fn is_expired_by_ttl(
2496    time_to_live: &Option<Duration>,
2497    entry_last_modified: Instant,
2498    now: Instant,
2499) -> bool {
2500    if let Some(ttl) = time_to_live {
2501        let expiration = entry_last_modified.saturating_add(*ttl);
2502        expiration <= now
2503    } else {
2504        false
2505    }
2506}
2507
2508#[cfg(test)]
2509mod tests {
2510    use crate::{
2511        common::{time::Clock, HousekeeperConfig},
2512        policy::{EvictionPolicy, ExpirationPolicy},
2513    };
2514
2515    use super::BaseCache;
2516
2517    #[cfg_attr(target_pointer_width = "16", ignore)]
2518    #[test]
2519    fn test_skt_capacity_will_not_overflow() {
2520        use std::collections::hash_map::RandomState;
2521
2522        // power of two
2523        let pot = |exp| 2u64.pow(exp);
2524
2525        let ensure_sketch_len = |max_capacity, len, name| {
2526            let cache = BaseCache::<u8, u8>::new(
2527                None,
2528                Some(max_capacity),
2529                None,
2530                RandomState::default(),
2531                None,
2532                EvictionPolicy::default(),
2533                None,
2534                ExpirationPolicy::default(),
2535                HousekeeperConfig::default(),
2536                false,
2537                Clock::default(),
2538            );
2539            cache.inner.enable_frequency_sketch_for_testing();
2540            assert_eq!(
2541                cache.inner.frequency_sketch.read().table_len(),
2542                len as usize,
2543                "{name}"
2544            );
2545        };
2546
2547        if cfg!(target_pointer_width = "32") {
2548            let pot24 = pot(24);
2549            let pot16 = pot(16);
2550            ensure_sketch_len(0, 128, "0");
2551            ensure_sketch_len(128, 128, "128");
2552            ensure_sketch_len(pot16, pot16, "pot16");
2553            // due to ceiling to next_power_of_two
2554            ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
2555            // due to ceiling to next_power_of_two
2556            ensure_sketch_len(pot24 - 1, pot24, "pot24 - 1");
2557            ensure_sketch_len(pot24, pot24, "pot24");
2558            ensure_sketch_len(pot(27), pot24, "pot(27)");
2559            ensure_sketch_len(u32::MAX as u64, pot24, "u32::MAX");
2560        } else {
2561            // target_pointer_width: 64 or larger.
2562            let pot30 = pot(30);
2563            let pot16 = pot(16);
2564            ensure_sketch_len(0, 128, "0");
2565            ensure_sketch_len(128, 128, "128");
2566            ensure_sketch_len(pot16, pot16, "pot16");
2567            // due to ceiling to next_power_of_two
2568            ensure_sketch_len(pot16 + 1, pot(17), "pot16 + 1");
2569
2570            // The following tests will allocate large memory (~8GiB).
2571            if !cfg!(skip_large_mem_tests) {
2572                // due to ceiling to next_power_of_two
2573                ensure_sketch_len(pot30 - 1, pot30, "pot30- 1");
2574                ensure_sketch_len(pot30, pot30, "pot30");
2575                ensure_sketch_len(u64::MAX, pot30, "u64::MAX");
2576            }
2577        };
2578    }
2579
2580    #[test]
2581    fn test_per_entry_expiration() {
2582        use super::InnerSync;
2583        use crate::{common::time::Clock, Entry, Expiry};
2584
2585        use std::{
2586            collections::hash_map::RandomState,
2587            sync::{Arc, Mutex},
2588            time::{Duration, Instant as StdInstant},
2589        };
2590
2591        type Key = u32;
2592        type Value = char;
2593
2594        fn current_time(cache: &BaseCache<Key, Value>) -> StdInstant {
2595            cache.inner.clock().to_std_instant(cache.current_time())
2596        }
2597
2598        fn insert(cache: &BaseCache<Key, Value>, key: Key, hash: u64, value: Value) {
2599            let (op, _now) = cache.do_insert_with_hash(Arc::new(key), hash, value);
2600            cache.write_op_ch.send(op).expect("Failed to send");
2601        }
2602
2603        macro_rules! assert_params_eq {
2604            ($left:expr, $right:expr, $param_name:expr, $line:expr) => {
2605                assert_eq!(
2606                    $left, $right,
2607                    "Mismatched `{}`s. line: {}",
2608                    $param_name, $line
2609                );
2610            };
2611        }
2612
2613        macro_rules! assert_expiry {
2614            ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => {
2615                // Increment the time.
2616                $mock.increment(Duration::from_millis($duration_secs * 1000 - 1));
2617                $cache.inner.run_pending_tasks(None, 1, 10);
2618                assert!($cache.contains_key_with_hash(&$key, $hash));
2619                assert_eq!($cache.entry_count(), 1);
2620
2621                // Increment the time by 1ms (3). The entry should be expired.
2622                $mock.increment(Duration::from_millis(1));
2623                $cache.inner.run_pending_tasks(None, 1, 10);
2624                assert!(!$cache.contains_key_with_hash(&$key, $hash));
2625
2626                // Increment the time again to ensure the entry has been evicted from the
2627                // cache.
2628                $mock.increment(Duration::from_secs(1));
2629                $cache.inner.run_pending_tasks(None, 1, 10);
2630                assert_eq!($cache.entry_count(), 0);
2631            };
2632        }
2633
2634        /// Contains expected call parameters and also a return value.
2635        #[derive(Debug)]
2636        enum ExpiryExpectation {
2637            NoCall,
2638            AfterCreate {
2639                caller_line: u32,
2640                key: Key,
2641                value: Value,
2642                current_time: StdInstant,
2643                new_duration_secs: Option<u64>,
2644            },
2645            AfterRead {
2646                caller_line: u32,
2647                key: Key,
2648                value: Value,
2649                current_time: StdInstant,
2650                current_duration_secs: Option<u64>,
2651                last_modified_at: StdInstant,
2652                new_duration_secs: Option<u64>,
2653            },
2654            AfterUpdate {
2655                caller_line: u32,
2656                key: Key,
2657                value: Value,
2658                current_time: StdInstant,
2659                current_duration_secs: Option<u64>,
2660                new_duration_secs: Option<u64>,
2661            },
2662        }
2663
2664        impl ExpiryExpectation {
2665            fn after_create(
2666                caller_line: u32,
2667                key: Key,
2668                value: Value,
2669                current_time: StdInstant,
2670                new_duration_secs: Option<u64>,
2671            ) -> Self {
2672                Self::AfterCreate {
2673                    caller_line,
2674                    key,
2675                    value,
2676                    current_time,
2677                    new_duration_secs,
2678                }
2679            }
2680
2681            fn after_read(
2682                caller_line: u32,
2683                key: Key,
2684                value: Value,
2685                current_time: StdInstant,
2686                current_duration_secs: Option<u64>,
2687                last_modified_at: StdInstant,
2688                new_duration_secs: Option<u64>,
2689            ) -> Self {
2690                Self::AfterRead {
2691                    caller_line,
2692                    key,
2693                    value,
2694                    current_time,
2695                    current_duration_secs,
2696                    last_modified_at,
2697                    new_duration_secs,
2698                }
2699            }
2700
2701            fn after_update(
2702                caller_line: u32,
2703                key: Key,
2704                value: Value,
2705                current_time: StdInstant,
2706                current_duration_secs: Option<u64>,
2707                new_duration_secs: Option<u64>,
2708            ) -> Self {
2709                Self::AfterUpdate {
2710                    caller_line,
2711                    key,
2712                    value,
2713                    current_time,
2714                    current_duration_secs,
2715                    new_duration_secs,
2716                }
2717            }
2718        }
2719
2720        let expectation = Arc::new(Mutex::new(ExpiryExpectation::NoCall));
2721
2722        struct MyExpiry {
2723            expectation: Arc<Mutex<ExpiryExpectation>>,
2724        }
2725
2726        impl Expiry<u32, char> for MyExpiry {
2727            fn expire_after_create(
2728                &self,
2729                actual_key: &u32,
2730                actual_value: &char,
2731                actual_current_time: StdInstant,
2732            ) -> Option<Duration> {
2733                use ExpiryExpectation::*;
2734
2735                let lock = &mut *self.expectation.lock().unwrap();
2736                let expected = std::mem::replace(lock, NoCall);
2737                match expected {
2738                    AfterCreate {
2739                        caller_line,
2740                        key,
2741                        value,
2742                        current_time,
2743                        new_duration_secs: new_duration,
2744                    } => {
2745                        assert_params_eq!(*actual_key, key, "key", caller_line);
2746                        assert_params_eq!(*actual_value, value, "value", caller_line);
2747                        assert_params_eq!(
2748                            actual_current_time,
2749                            current_time,
2750                            "current_time",
2751                            caller_line
2752                        );
2753                        new_duration.map(Duration::from_secs)
2754                    }
2755                    expected => {
2756                        panic!(
2757                            "Unexpected call to expire_after_create: caller_line {}, expected: {expected:?}",
2758                            line!()
2759                        );
2760                    }
2761                }
2762            }
2763
2764            fn expire_after_read(
2765                &self,
2766                actual_key: &u32,
2767                actual_value: &char,
2768                actual_current_time: StdInstant,
2769                actual_current_duration: Option<Duration>,
2770                actual_last_modified_at: StdInstant,
2771            ) -> Option<Duration> {
2772                use ExpiryExpectation::*;
2773
2774                let lock = &mut *self.expectation.lock().unwrap();
2775                let expected = std::mem::replace(lock, NoCall);
2776                match expected {
2777                    AfterRead {
2778                        caller_line,
2779                        key,
2780                        value,
2781                        current_time,
2782                        current_duration_secs,
2783                        last_modified_at,
2784                        new_duration_secs,
2785                    } => {
2786                        assert_params_eq!(*actual_key, key, "key", caller_line);
2787                        assert_params_eq!(*actual_value, value, "value", caller_line);
2788                        assert_params_eq!(
2789                            actual_current_time,
2790                            current_time,
2791                            "current_time",
2792                            caller_line
2793                        );
2794                        assert_params_eq!(
2795                            actual_current_duration,
2796                            current_duration_secs.map(Duration::from_secs),
2797                            "current_duration",
2798                            caller_line
2799                        );
2800                        assert_params_eq!(
2801                            actual_last_modified_at,
2802                            last_modified_at,
2803                            "last_modified_at",
2804                            caller_line
2805                        );
2806                        new_duration_secs.map(Duration::from_secs)
2807                    }
2808                    expected => {
2809                        panic!(
2810                            "Unexpected call to expire_after_read: caller_line {}, expected: {expected:?}",
2811                            line!()
2812                        );
2813                    }
2814                }
2815            }
2816
2817            fn expire_after_update(
2818                &self,
2819                actual_key: &u32,
2820                actual_value: &char,
2821                actual_current_time: StdInstant,
2822                actual_current_duration: Option<Duration>,
2823            ) -> Option<Duration> {
2824                use ExpiryExpectation::*;
2825
2826                let lock = &mut *self.expectation.lock().unwrap();
2827                let expected = std::mem::replace(lock, NoCall);
2828                match expected {
2829                    AfterUpdate {
2830                        caller_line,
2831                        key,
2832                        value,
2833                        current_time,
2834                        current_duration_secs,
2835                        new_duration_secs,
2836                    } => {
2837                        assert_params_eq!(*actual_key, key, "key", caller_line);
2838                        assert_params_eq!(*actual_value, value, "value", caller_line);
2839                        assert_params_eq!(
2840                            actual_current_time,
2841                            current_time,
2842                            "current_time",
2843                            caller_line
2844                        );
2845                        assert_params_eq!(
2846                            actual_current_duration,
2847                            current_duration_secs.map(Duration::from_secs),
2848                            "current_duration",
2849                            caller_line
2850                        );
2851                        new_duration_secs.map(Duration::from_secs)
2852                    }
2853                    expected => {
2854                        panic!(
2855                            "Unexpected call to expire_after_update: caller_line {}, expected: {expected:?}",
2856                            line!()
2857                        );
2858                    }
2859                }
2860            }
2861        }
2862
2863        const TTL: u64 = 16;
2864        const TTI: u64 = 7;
2865        let expiry: Option<Arc<dyn Expiry<_, _> + Send + Sync + 'static>> =
2866            Some(Arc::new(MyExpiry {
2867                expectation: Arc::clone(&expectation),
2868            }));
2869
2870        let (clock, mock) = Clock::mock();
2871
2872        let mut cache = BaseCache::<Key, Value>::new(
2873            None,
2874            None,
2875            None,
2876            RandomState::default(),
2877            None,
2878            EvictionPolicy::default(),
2879            None,
2880            ExpirationPolicy::new(
2881                Some(Duration::from_secs(TTL)),
2882                Some(Duration::from_secs(TTI)),
2883                expiry,
2884            ),
2885            HousekeeperConfig::default(),
2886            false,
2887            clock,
2888        );
2889        cache.reconfigure_for_testing();
2890
2891        // Make the cache exterior immutable.
2892        let cache = cache;
2893
2894        mock.increment(Duration::from_millis(10));
2895
2896        // ----------------------------------------------------
2897        // Case 1
2898        //
2899        // 1.  0s: Insert with per-entry TTL 1s.
2900        // 2. +1s: Expires.
2901        // ----------------------------------------------------
2902
2903        // Insert an entry (1). It will have a per-entry TTL of 1 second.
2904        let key = 1;
2905        let hash = cache.hash(&key);
2906        let value = 'a';
2907
2908        *expectation.lock().unwrap() =
2909            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(1));
2910
2911        insert(&cache, key, hash, value);
2912        // Run a sync to register the entry to the internal data structures including
2913        // the timer wheel.
2914        cache.inner.run_pending_tasks(None, 1, 10);
2915        assert_eq!(cache.entry_count(), 1);
2916
2917        assert_expiry!(cache, key, hash, mock, 1);
2918
2919        // ----------------------------------------------------
2920        // Case 2
2921        //
2922        // 1.  0s: Insert with no per-entry TTL.
2923        // 2. +1s: Get with per-entry TTL 3s.
2924        // 3. +3s: Expires.
2925        // ----------------------------------------------------
2926
2927        // Insert an entry (1).
2928        let key = 2;
2929        let hash = cache.hash(&key);
2930        let value = 'b';
2931
2932        *expectation.lock().unwrap() =
2933            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
2934        let inserted_at = current_time(&cache);
2935        insert(&cache, key, hash, value);
2936        cache.inner.run_pending_tasks(None, 1, 10);
2937        assert_eq!(cache.entry_count(), 1);
2938
2939        // Increment the time.
2940        mock.increment(Duration::from_secs(1));
2941        cache.inner.run_pending_tasks(None, 1, 10);
2942        assert!(cache.contains_key_with_hash(&key, hash));
2943
2944        // Read the entry (2).
2945        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
2946            line!(),
2947            key,
2948            value,
2949            current_time(&cache),
2950            Some(TTI - 1),
2951            inserted_at,
2952            Some(3),
2953        );
2954        assert_eq!(
2955            cache
2956                .get_with_hash(&key, hash, false)
2957                .map(Entry::into_value),
2958            Some(value)
2959        );
2960        cache.inner.run_pending_tasks(None, 1, 10);
2961
2962        assert_expiry!(cache, key, hash, mock, 3);
2963
2964        // ----------------------------------------------------
2965        // Case 3
2966        //
2967        // 1.  0s: Insert with no per-entry TTL.
2968        // 2. +1s: Get with no per-entry TTL.
2969        // 3. +2s: Update with per-entry TTL 3s.
2970        // 4. +3s: Expires.
2971        // ----------------------------------------------------
2972
2973        // Insert an entry (1).
2974        let key = 3;
2975        let hash = cache.hash(&key);
2976        let value = 'c';
2977
2978        *expectation.lock().unwrap() =
2979            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
2980        let inserted_at = current_time(&cache);
2981        insert(&cache, key, hash, value);
2982        cache.inner.run_pending_tasks(None, 1, 10);
2983        assert_eq!(cache.entry_count(), 1);
2984
2985        // Increment the time.
2986        mock.increment(Duration::from_secs(1));
2987        cache.inner.run_pending_tasks(None, 1, 10);
2988        assert!(cache.contains_key_with_hash(&key, hash));
2989
2990        // Read the entry (2).
2991        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
2992            line!(),
2993            key,
2994            value,
2995            current_time(&cache),
2996            Some(TTI - 1),
2997            inserted_at,
2998            None,
2999        );
3000        assert_eq!(
3001            cache
3002                .get_with_hash(&key, hash, false)
3003                .map(Entry::into_value),
3004            Some(value)
3005        );
3006        cache.inner.run_pending_tasks(None, 1, 10);
3007
3008        // Increment the time.
3009        mock.increment(Duration::from_secs(2));
3010        cache.inner.run_pending_tasks(None, 1, 10);
3011        assert!(cache.contains_key_with_hash(&key, hash));
3012        assert_eq!(cache.entry_count(), 1);
3013
3014        // Update the entry (3).
3015        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3016            line!(),
3017            key,
3018            value,
3019            current_time(&cache),
3020            // TTI should be reset by this update.
3021            Some(TTI),
3022            Some(3),
3023        );
3024        insert(&cache, key, hash, value);
3025        cache.inner.run_pending_tasks(None, 1, 10);
3026        assert_eq!(cache.entry_count(), 1);
3027
3028        assert_expiry!(cache, key, hash, mock, 3);
3029
3030        // ----------------------------------------------------
3031        // Case 4
3032        //
3033        // 1.  0s: Insert with no per-entry TTL.
3034        // 2. +1s: Get with no per-entry TTL.
3035        // 3. +2s: Update with no per-entry TTL.
3036        // 4. +7s: Expires by TTI (7s from step 3).
3037        // ----------------------------------------------------
3038
3039        // Insert an entry (1).
3040        let key = 4;
3041        let hash = cache.hash(&key);
3042        let value = 'd';
3043
3044        *expectation.lock().unwrap() =
3045            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
3046        let inserted_at = current_time(&cache);
3047        insert(&cache, key, hash, value);
3048        cache.inner.run_pending_tasks(None, 1, 10);
3049        assert_eq!(cache.entry_count(), 1);
3050
3051        // Increment the time.
3052        mock.increment(Duration::from_secs(1));
3053        cache.inner.run_pending_tasks(None, 1, 10);
3054        assert!(cache.contains_key_with_hash(&key, hash));
3055        assert_eq!(cache.entry_count(), 1);
3056
3057        // Read the entry (2).
3058        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3059            line!(),
3060            key,
3061            value,
3062            current_time(&cache),
3063            Some(TTI - 1),
3064            inserted_at,
3065            None,
3066        );
3067        assert_eq!(
3068            cache
3069                .get_with_hash(&key, hash, false)
3070                .map(Entry::into_value),
3071            Some(value)
3072        );
3073        cache.inner.run_pending_tasks(None, 1, 10);
3074
3075        // Increment the time.
3076        mock.increment(Duration::from_secs(2));
3077        cache.inner.run_pending_tasks(None, 1, 10);
3078        assert!(cache.contains_key_with_hash(&key, hash));
3079        assert_eq!(cache.entry_count(), 1);
3080
3081        // Update the entry (3).
3082        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3083            line!(),
3084            key,
3085            value,
3086            current_time(&cache),
3087            // TTI should be reset by this update.
3088            Some(TTI),
3089            None,
3090        );
3091        insert(&cache, key, hash, value);
3092        cache.inner.run_pending_tasks(None, 1, 10);
3093        assert_eq!(cache.entry_count(), 1);
3094
3095        assert_expiry!(cache, key, hash, mock, 7);
3096
3097        // ----------------------------------------------------
3098        // Case 5
3099        //
3100        // 1.  0s: Insert with per-entry TTL 8s.
3101        // 2. +5s: Get with per-entry TTL 8s.
3102        // 3. +7s: Expires by TTI (7s).
3103        // ----------------------------------------------------
3104
3105        // Insert an entry.
3106        let key = 5;
3107        let hash = cache.hash(&key);
3108        let value = 'e';
3109
3110        *expectation.lock().unwrap() =
3111            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3112        let inserted_at = current_time(&cache);
3113        insert(&cache, key, hash, value);
3114        cache.inner.run_pending_tasks(None, 1, 10);
3115        assert_eq!(cache.entry_count(), 1);
3116
3117        // Increment the time.
3118        mock.increment(Duration::from_secs(5));
3119        cache.inner.run_pending_tasks(None, 1, 10);
3120        assert!(cache.contains_key_with_hash(&key, hash));
3121        assert_eq!(cache.entry_count(), 1);
3122
3123        // Read the entry.
3124        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3125            line!(),
3126            key,
3127            value,
3128            current_time(&cache),
3129            Some(TTI - 5),
3130            inserted_at,
3131            Some(8),
3132        );
3133        assert_eq!(
3134            cache
3135                .get_with_hash(&key, hash, false)
3136                .map(Entry::into_value),
3137            Some(value)
3138        );
3139        cache.inner.run_pending_tasks(None, 1, 10);
3140
3141        assert_expiry!(cache, key, hash, mock, 7);
3142
3143        // ----------------------------------------------------
3144        // Case 6
3145        //
3146        // 1.  0s: Insert with per-entry TTL 8s.
3147        // 2. +5s: Get with per-entry TTL 9s.
3148        // 3. +6s: Get with per-entry TTL 10s.
3149        // 4. +5s: Expires by TTL (16s).
3150        // ----------------------------------------------------
3151
3152        // Insert an entry.
3153        let key = 6;
3154        let hash = cache.hash(&key);
3155        let value = 'f';
3156
3157        *expectation.lock().unwrap() =
3158            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
3159        let inserted_at = current_time(&cache);
3160        insert(&cache, key, hash, value);
3161        cache.inner.run_pending_tasks(None, 1, 10);
3162        assert_eq!(cache.entry_count(), 1);
3163
3164        // Increment the time.
3165        mock.increment(Duration::from_secs(5));
3166        cache.inner.run_pending_tasks(None, 1, 10);
3167        assert!(cache.contains_key_with_hash(&key, hash));
3168        assert_eq!(cache.entry_count(), 1);
3169
3170        // Read the entry.
3171        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3172            line!(),
3173            key,
3174            value,
3175            current_time(&cache),
3176            Some(TTI - 5),
3177            inserted_at,
3178            Some(9),
3179        );
3180        assert_eq!(
3181            cache
3182                .get_with_hash(&key, hash, false)
3183                .map(Entry::into_value),
3184            Some(value)
3185        );
3186        cache.inner.run_pending_tasks(None, 1, 10);
3187
3188        // Increment the time.
3189        mock.increment(Duration::from_secs(6));
3190        cache.inner.run_pending_tasks(None, 1, 10);
3191        assert!(cache.contains_key_with_hash(&key, hash));
3192        assert_eq!(cache.entry_count(), 1);
3193
3194        // Read the entry.
3195        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3196            line!(),
3197            key,
3198            value,
3199            current_time(&cache),
3200            Some(TTI - 6),
3201            inserted_at,
3202            Some(10),
3203        );
3204        assert_eq!(
3205            cache
3206                .get_with_hash(&key, hash, false)
3207                .map(Entry::into_value),
3208            Some(value)
3209        );
3210        cache.inner.run_pending_tasks(None, 1, 10);
3211
3212        assert_expiry!(cache, key, hash, mock, 5);
3213
3214        // ----------------------------------------------------
3215        // Case 7
3216        //
3217        // 1.   0s: Insert with per-entry TTL 9s.
3218        // 2.  +6s: Update with per-entry TTL 8s.
3219        // 3.  +6s: Get with per-entry TTL 9s
3220        // 4.  +6s: Get with per-entry TTL 5s.
3221        // 5.  +4s: Expires by TTL (16s from step 2).
3222        // ----------------------------------------------------
3223        // Insert an entry.
3224        let key = 7;
3225        let hash = cache.hash(&key);
3226        let value = 'g';
3227
3228        *expectation.lock().unwrap() =
3229            ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9));
3230        insert(&cache, key, hash, value);
3231        cache.inner.run_pending_tasks(None, 1, 10);
3232        assert_eq!(cache.entry_count(), 1);
3233
3234        // Increment the time.
3235        mock.increment(Duration::from_secs(6));
3236        cache.inner.run_pending_tasks(None, 1, 10);
3237        assert!(cache.contains_key_with_hash(&key, hash));
3238        assert_eq!(cache.entry_count(), 1);
3239
3240        // Update the entry (3).
3241        *expectation.lock().unwrap() = ExpiryExpectation::after_update(
3242            line!(),
3243            key,
3244            value,
3245            current_time(&cache),
3246            // From the per-entry TTL.
3247            Some(9 - 6),
3248            Some(8),
3249        );
3250        let updated_at = current_time(&cache);
3251        insert(&cache, key, hash, value);
3252        cache.inner.run_pending_tasks(None, 1, 10);
3253        assert_eq!(cache.entry_count(), 1);
3254
3255        // Increment the time.
3256        mock.increment(Duration::from_secs(6));
3257        cache.inner.run_pending_tasks(None, 1, 10);
3258        assert!(cache.contains_key_with_hash(&key, hash));
3259        assert_eq!(cache.entry_count(), 1);
3260
3261        // Read the entry.
3262        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3263            line!(),
3264            key,
3265            value,
3266            current_time(&cache),
3267            Some(TTI - 6),
3268            updated_at,
3269            Some(9),
3270        );
3271        assert_eq!(
3272            cache
3273                .get_with_hash(&key, hash, false)
3274                .map(Entry::into_value),
3275            Some(value)
3276        );
3277        cache.inner.run_pending_tasks(None, 1, 10);
3278
3279        // Increment the time.
3280        mock.increment(Duration::from_secs(6));
3281        cache.inner.run_pending_tasks(None, 1, 10);
3282        assert!(cache.contains_key_with_hash(&key, hash));
3283        assert_eq!(cache.entry_count(), 1);
3284
3285        // Read the entry.
3286        *expectation.lock().unwrap() = ExpiryExpectation::after_read(
3287            line!(),
3288            key,
3289            value,
3290            current_time(&cache),
3291            Some(TTI - 6),
3292            updated_at,
3293            Some(5),
3294        );
3295        assert_eq!(
3296            cache
3297                .get_with_hash(&key, hash, false)
3298                .map(Entry::into_value),
3299            Some(value)
3300        );
3301        cache.inner.run_pending_tasks(None, 1, 10);
3302
3303        assert_expiry!(cache, key, hash, mock, 4);
3304    }
3305}