1use super::{base_cache::Inner, PredicateId, PredicateIdStr};
2use crate::{
3 common::{
4 concurrent::{arc::MiniArc, AccessTime, KvEntry, ValueEntry},
5 time::Instant,
6 },
7 notification::RemovalCause,
8 PredicateError,
9};
10
11use parking_lot::{Mutex, MutexGuard};
12use std::{
13 hash::{BuildHasher, Hash},
14 sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17 },
18};
19use uuid::Uuid;
20
21pub(crate) type PredicateFun<K, V> = Arc<dyn Fn(&K, &V) -> bool + Send + Sync + 'static>;
22
23const PREDICATE_MAP_NUM_SEGMENTS: usize = 16;
24
25pub(crate) struct KeyDateLite<K> {
26 key: Arc<K>,
27 hash: u64,
28 timestamp: Instant,
29}
30
31impl<K> Clone for KeyDateLite<K> {
32 fn clone(&self) -> Self {
33 Self {
34 key: Arc::clone(&self.key),
35 hash: self.hash,
36 timestamp: self.timestamp,
37 }
38 }
39}
40
41impl<K> KeyDateLite<K> {
42 pub(crate) fn new(key: &Arc<K>, hash: u64, timestamp: Instant) -> Self {
43 Self {
44 key: Arc::clone(key),
45 hash,
46 timestamp,
47 }
48 }
49}
50
51pub(crate) struct Invalidator<K, V, S> {
52 predicates: crate::cht::SegmentedHashMap<PredicateId, Predicate<K, V>, S>,
53 is_empty: AtomicBool,
54 scan_context: Arc<ScanContext<K, V>>,
55}
56
57impl<K, V, S> Invalidator<K, V, S> {
61 pub(crate) fn new(hasher: S) -> Self
62 where
63 S: BuildHasher,
64 {
65 const CAPACITY: usize = 0;
66 let predicates = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
67 PREDICATE_MAP_NUM_SEGMENTS,
68 CAPACITY,
69 hasher,
70 );
71 Self {
72 predicates,
73 is_empty: AtomicBool::new(true),
74 scan_context: Arc::new(ScanContext::default()),
75 }
76 }
77
78 pub(crate) fn is_empty(&self) -> bool {
79 self.is_empty.load(Ordering::Acquire)
80 }
81
82 pub(crate) fn remove_predicates_registered_before(&self, ts: Instant)
83 where
84 K: Hash + Eq + Send + Sync + 'static,
85 V: Clone + Send + Sync + 'static,
86 S: BuildHasher,
87 {
88 let pred_map = &self.predicates;
89
90 let removing_ids = pred_map
91 .iter()
92 .filter(|(_, pred)| pred.registered_at <= ts)
93 .map(|(id, _)| id)
94 .collect::<Vec<_>>();
95
96 for id in removing_ids {
97 let hash = pred_map.hash(&id);
98 pred_map.remove(hash, |k| k == &id);
99 }
100
101 if pred_map.is_empty() {
102 self.is_empty.store(true, Ordering::Release);
103 }
104 }
105
106 pub(crate) fn register_predicate(
107 &self,
108 predicate: PredicateFun<K, V>,
109 registered_at: Instant,
110 ) -> Result<PredicateId, PredicateError>
111 where
112 K: Hash + Eq,
113 S: BuildHasher,
114 {
115 const MAX_RETRY: usize = 1_000;
116 let mut tries = 0;
117 let preds = &self.predicates;
118
119 while tries < MAX_RETRY {
120 let id = Uuid::new_v4().as_hyphenated().to_string();
121
122 let hash = preds.hash(&id);
123 if preds.contains_key(hash, |k| k == &id) {
124 tries += 1;
125
126 continue; }
128 let pred = Predicate::new(&id, predicate, registered_at);
129 preds.insert_entry_and(id.clone(), hash, pred, |_, _| ());
130 self.is_empty.store(false, Ordering::Release);
131
132 return Ok(id);
133 }
134
135 panic!("Cannot assign a new PredicateId to a predicate");
139 }
140
141 #[inline]
143 pub(crate) fn apply_predicates(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
144 where
145 K: Hash + Eq + Send + Sync + 'static,
146 V: Clone + Send + Sync + 'static,
147 S: BuildHasher,
148 {
149 if self.is_empty() {
150 false
151 } else if let Some(ts) = entry.last_modified() {
152 Self::do_apply_predicates(
153 self.predicates.iter().map(|(_, v)| v),
154 key,
155 &entry.value,
156 ts,
157 )
158 } else {
159 false
160 }
161 }
162
163 pub(crate) fn scan_and_invalidate(
164 &self,
165 cache: &Inner<K, V, S>,
166 candidates: Vec<KeyDateLite<K>>,
167 is_truncated: bool,
168 ) -> (Vec<KvEntry<K, V>>, bool)
169 where
170 K: Hash + Eq + Send + Sync + 'static,
171 V: Clone + Send + Sync + 'static,
172 S: BuildHasher,
173 {
174 let mut predicates = self.scan_context.predicates.lock();
175 if predicates.is_empty() {
176 *predicates = self.predicates.iter().map(|(_k, v)| v).collect();
177 }
178
179 let mut invalidated = Vec::default();
180 let mut newest_timestamp = None;
181
182 for candidate in &candidates {
183 let key = &candidate.key;
184 let hash = candidate.hash;
185 let ts = candidate.timestamp;
186 if self.apply(&predicates, cache, key, hash, ts) {
187 if let Some(entry) = Self::invalidate(cache, key, hash, ts) {
188 invalidated.push(KvEntry {
189 key: Arc::clone(key),
190 entry,
191 });
192 }
193 }
194 newest_timestamp = Some(ts);
195 }
196
197 self.remove_finished_predicates(predicates, is_truncated, newest_timestamp);
198
199 (invalidated, self.predicates.is_empty())
200 }
201}
202
203impl<K, V, S> Invalidator<K, V, S>
207where
208 K: Hash + Eq,
209 S: BuildHasher,
210{
211 #[inline]
212 fn do_apply_predicates<I>(predicates: I, key: &K, value: &V, ts: Instant) -> bool
213 where
214 I: Iterator<Item = Predicate<K, V>>,
215 {
216 for predicate in predicates {
217 if predicate.is_applicable(ts) && predicate.apply(key, value) {
218 return true;
219 }
220 }
221 false
222 }
223
224 fn remove_finished_predicates(
225 &self,
226 mut predicates: MutexGuard<'_, Vec<Predicate<K, V>>>,
227 is_truncated: bool,
228 newest_timestamp: Option<Instant>,
229 ) where
230 K: Hash + Eq,
231 S: BuildHasher,
232 {
233 let predicates = &mut *predicates;
234 if is_truncated {
235 if let Some(ts) = newest_timestamp {
236 let (active, finished): (Vec<_>, Vec<_>) =
237 predicates.drain(..).partition(|p| p.is_applicable(ts));
238
239 self.remove_predicates(&finished);
241 *predicates = active;
243 } else {
244 unreachable!();
245 }
246 } else {
247 self.remove_predicates(predicates);
249 predicates.clear();
250 }
251 }
252
253 fn remove_predicates(&self, predicates: &[Predicate<K, V>])
254 where
255 K: Hash + Eq,
256 S: BuildHasher,
257 {
258 let pred_map = &self.predicates;
259 for p in predicates.iter() {
260 let hash = pred_map.hash(p.id());
261 pred_map.remove(hash, |k| k == p.id());
262 }
263
264 if pred_map.is_empty() {
265 self.is_empty.store(true, Ordering::Release);
266 }
267 }
268
269 fn apply(
270 &self,
271 predicates: &[Predicate<K, V>],
272 cache: &Inner<K, V, S>,
273 key: &Arc<K>,
274 hash: u64,
275 ts: Instant,
276 ) -> bool {
277 if let Some(entry) = cache.cache.get(hash, |k| k == key) {
278 if let Some(lm) = entry.last_modified() {
279 if lm == ts {
280 return Invalidator::<_, _, S>::do_apply_predicates(
281 predicates.iter().cloned(),
282 key,
283 &entry.value,
284 lm,
285 );
286 }
287 }
288 }
289
290 false
291 }
292
293 fn invalidate(
294 cache: &Inner<K, V, S>,
295 key: &Arc<K>,
296 hash: u64,
297 ts: Instant,
298 ) -> Option<MiniArc<ValueEntry<K, V>>>
299 where
300 K: Send + Sync + 'static,
301 V: Clone + Send + Sync + 'static,
302 {
303 let kl = cache.maybe_key_lock(key);
305 let _klg = &kl.as_ref().map(|kl| kl.lock());
306
307 let maybe_entry = cache.cache.remove_if(
308 hash,
309 |k| k == key,
310 |_, v| {
311 if let Some(lm) = v.last_modified() {
312 lm == ts
313 } else {
314 false
315 }
316 },
317 );
318 if let Some(entry) = &maybe_entry {
319 if cache.is_removal_notifier_enabled() {
320 cache.notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit);
321 }
322 }
323 maybe_entry
324 }
325}
326
327#[cfg(test)]
331impl<K, V, S> Invalidator<K, V, S> {
332 pub(crate) fn predicate_count(&self) -> usize {
333 self.predicates.len()
334 }
335}
336
337struct ScanContext<K, V> {
338 predicates: Mutex<Vec<Predicate<K, V>>>,
339}
340
341impl<K, V> Default for ScanContext<K, V> {
342 fn default() -> Self {
343 Self {
344 predicates: Mutex::new(Vec::default()),
345 }
346 }
347}
348
349struct Predicate<K, V> {
350 id: PredicateId,
351 f: PredicateFun<K, V>,
352 registered_at: Instant,
353}
354
355impl<K, V> Clone for Predicate<K, V> {
356 fn clone(&self) -> Self {
357 Self {
358 id: self.id.clone(),
359 f: Arc::clone(&self.f),
360 registered_at: self.registered_at,
361 }
362 }
363}
364
365impl<K, V> Predicate<K, V> {
366 fn new(id: PredicateIdStr<'_>, f: PredicateFun<K, V>, registered_at: Instant) -> Self {
367 Self {
368 id: id.to_string(),
369 f,
370 registered_at,
371 }
372 }
373
374 fn id(&self) -> PredicateIdStr<'_> {
375 &self.id
376 }
377
378 fn is_applicable(&self, last_modified: Instant) -> bool {
379 last_modified <= self.registered_at
380 }
381
382 fn apply(&self, key: &K, value: &V) -> bool {
383 (self.f)(key, value)
384 }
385}