moka/common/concurrent/
housekeeper.rs

1use super::constants::LOG_SYNC_INTERVAL_MILLIS;
2
3use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};
4use crate::common::time::{AtomicInstant, Instant};
5use crate::common::HousekeeperConfig;
6
7use parking_lot::{Mutex, MutexGuard};
8use std::{
9    sync::atomic::{AtomicBool, Ordering},
10    time::Duration,
11};
12
13pub(crate) trait InnerSync {
14    /// Runs the pending tasks. Returns `true` if there are more entries to evict in
15    /// next run.
16    fn run_pending_tasks(
17        &self,
18        timeout: Option<Duration>,
19        max_log_sync_repeats: u32,
20        eviction_batch_size: u32,
21    ) -> bool;
22
23    fn now(&self) -> Instant;
24}
25
26pub(crate) struct Housekeeper {
27    run_lock: Mutex<()>,
28    run_after: AtomicInstant,
29    /// A flag to indicate if the last call on `run_pending_tasks` method left some
30    /// entries to evict.
31    ///
32    /// Used only when the eviction listener closure is set for this cache instance
33    /// because, if not, `run_pending_tasks` will never leave entries to evict.
34    more_entries_to_evict: Option<AtomicBool>,
35    /// The timeout duration for the `run_pending_tasks` method. This is a safe-guard
36    /// to prevent cache read/write operations (that may call `run_pending_tasks`
37    /// internally) from being blocked for a long time when the user wrote a slow
38    /// eviction listener closure.
39    ///
40    /// Used only when the eviction listener closure is set for this cache instance.
41    maintenance_task_timeout: Option<Duration>,
42    /// The maximum repeat count for receiving operation logs from the read and write
43    /// log channels. Default: `MAX_LOG_SYNC_REPEATS`.
44    max_log_sync_repeats: u32,
45    /// The batch size of entries to be processed by each internal eviction method.
46    /// Default: `EVICTION_BATCH_SIZE`.
47    eviction_batch_size: u32,
48    auto_run_enabled: AtomicBool,
49}
50
51impl Housekeeper {
52    pub(crate) fn new(
53        is_eviction_listener_enabled: bool,
54        config: HousekeeperConfig,
55        now: Instant,
56    ) -> Self {
57        let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled {
58            (
59                Some(AtomicBool::new(false)),
60                Some(config.maintenance_task_timeout),
61            )
62        } else {
63            (None, None)
64        };
65
66        Self {
67            run_lock: Mutex::default(),
68            run_after: AtomicInstant::new(Self::sync_after(now)),
69            more_entries_to_evict,
70            maintenance_task_timeout,
71            max_log_sync_repeats: config.max_log_sync_repeats,
72            eviction_batch_size: config.eviction_batch_size,
73            auto_run_enabled: AtomicBool::new(true),
74        }
75    }
76
77    pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
78        self.more_entries_to_evict() || self.should_apply(ch_len, READ_LOG_FLUSH_POINT, now)
79    }
80
81    pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
82        self.more_entries_to_evict() || self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT, now)
83    }
84
85    #[inline]
86    fn more_entries_to_evict(&self) -> bool {
87        self.more_entries_to_evict
88            .as_ref()
89            .map(|v| v.load(Ordering::Acquire))
90            .unwrap_or(false)
91    }
92
93    fn set_more_entries_to_evict(&self, v: bool) {
94        if let Some(flag) = &self.more_entries_to_evict {
95            flag.store(v, Ordering::Release);
96        }
97    }
98
99    #[inline]
100    fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
101        self.auto_run_enabled.load(Ordering::Relaxed)
102            && (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap())
103    }
104
105    pub(crate) fn run_pending_tasks<T: InnerSync>(&self, cache: &T) {
106        let lock = self.run_lock.lock();
107        self.do_run_pending_tasks(cache, lock);
108    }
109
110    pub(crate) fn try_run_pending_tasks<T: InnerSync>(&self, cache: &T) -> bool {
111        if let Some(lock) = self.run_lock.try_lock() {
112            self.do_run_pending_tasks(cache, lock);
113            true
114        } else {
115            false
116        }
117    }
118
119    fn do_run_pending_tasks<T: InnerSync>(&self, cache: &T, _lock: MutexGuard<'_, ()>) {
120        let now = cache.now();
121        self.run_after.set_instant(Self::sync_after(now));
122        let timeout = self.maintenance_task_timeout;
123        let repeats = self.max_log_sync_repeats;
124        let batch_size = self.eviction_batch_size;
125        let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size);
126        self.set_more_entries_to_evict(more_to_evict);
127    }
128
129    fn sync_after(now: Instant) -> Instant {
130        let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS);
131        now.saturating_add(dur)
132    }
133}
134
135#[cfg(test)]
136impl Housekeeper {
137    pub(crate) fn disable_auto_run(&self) {
138        self.auto_run_enabled.store(false, Ordering::Relaxed);
139    }
140}