moka/common/concurrent/
housekeeper.rs
1use super::constants::LOG_SYNC_INTERVAL_MILLIS;
2
3use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};
4use crate::common::time::{AtomicInstant, Instant};
5use crate::common::HousekeeperConfig;
6
7use parking_lot::{Mutex, MutexGuard};
8use std::{
9 sync::atomic::{AtomicBool, Ordering},
10 time::Duration,
11};
12
13pub(crate) trait InnerSync {
14 fn run_pending_tasks(
17 &self,
18 timeout: Option<Duration>,
19 max_log_sync_repeats: u32,
20 eviction_batch_size: u32,
21 ) -> bool;
22
23 fn now(&self) -> Instant;
24}
25
26pub(crate) struct Housekeeper {
27 run_lock: Mutex<()>,
28 run_after: AtomicInstant,
29 more_entries_to_evict: Option<AtomicBool>,
35 maintenance_task_timeout: Option<Duration>,
42 max_log_sync_repeats: u32,
45 eviction_batch_size: u32,
48 auto_run_enabled: AtomicBool,
49}
50
51impl Housekeeper {
52 pub(crate) fn new(
53 is_eviction_listener_enabled: bool,
54 config: HousekeeperConfig,
55 now: Instant,
56 ) -> Self {
57 let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled {
58 (
59 Some(AtomicBool::new(false)),
60 Some(config.maintenance_task_timeout),
61 )
62 } else {
63 (None, None)
64 };
65
66 Self {
67 run_lock: Mutex::default(),
68 run_after: AtomicInstant::new(Self::sync_after(now)),
69 more_entries_to_evict,
70 maintenance_task_timeout,
71 max_log_sync_repeats: config.max_log_sync_repeats,
72 eviction_batch_size: config.eviction_batch_size,
73 auto_run_enabled: AtomicBool::new(true),
74 }
75 }
76
77 pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
78 self.more_entries_to_evict() || self.should_apply(ch_len, READ_LOG_FLUSH_POINT, now)
79 }
80
81 pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
82 self.more_entries_to_evict() || self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT, now)
83 }
84
85 #[inline]
86 fn more_entries_to_evict(&self) -> bool {
87 self.more_entries_to_evict
88 .as_ref()
89 .map(|v| v.load(Ordering::Acquire))
90 .unwrap_or(false)
91 }
92
93 fn set_more_entries_to_evict(&self, v: bool) {
94 if let Some(flag) = &self.more_entries_to_evict {
95 flag.store(v, Ordering::Release);
96 }
97 }
98
99 #[inline]
100 fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
101 self.auto_run_enabled.load(Ordering::Relaxed)
102 && (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap())
103 }
104
105 pub(crate) fn run_pending_tasks<T: InnerSync>(&self, cache: &T) {
106 let lock = self.run_lock.lock();
107 self.do_run_pending_tasks(cache, lock);
108 }
109
110 pub(crate) fn try_run_pending_tasks<T: InnerSync>(&self, cache: &T) -> bool {
111 if let Some(lock) = self.run_lock.try_lock() {
112 self.do_run_pending_tasks(cache, lock);
113 true
114 } else {
115 false
116 }
117 }
118
119 fn do_run_pending_tasks<T: InnerSync>(&self, cache: &T, _lock: MutexGuard<'_, ()>) {
120 let now = cache.now();
121 self.run_after.set_instant(Self::sync_after(now));
122 let timeout = self.maintenance_task_timeout;
123 let repeats = self.max_log_sync_repeats;
124 let batch_size = self.eviction_batch_size;
125 let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size);
126 self.set_more_entries_to_evict(more_to_evict);
127 }
128
129 fn sync_after(now: Instant) -> Instant {
130 let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS);
131 now.saturating_add(dur)
132 }
133}
134
135#[cfg(test)]
136impl Housekeeper {
137 pub(crate) fn disable_auto_run(&self) {
138 self.auto_run_enabled.store(false, Ordering::Relaxed);
139 }
140}