moka/sync_base/
invalidator.rs

1use super::{base_cache::Inner, PredicateId, PredicateIdStr};
2use crate::{
3    common::{
4        concurrent::{arc::MiniArc, AccessTime, KvEntry, ValueEntry},
5        time::Instant,
6    },
7    notification::RemovalCause,
8    PredicateError,
9};
10
11use parking_lot::{Mutex, MutexGuard};
12use std::{
13    hash::{BuildHasher, Hash},
14    sync::{
15        atomic::{AtomicBool, Ordering},
16        Arc,
17    },
18};
19use uuid::Uuid;
20
21pub(crate) type PredicateFun<K, V> = Arc<dyn Fn(&K, &V) -> bool + Send + Sync + 'static>;
22
23const PREDICATE_MAP_NUM_SEGMENTS: usize = 16;
24
25pub(crate) struct KeyDateLite<K> {
26    key: Arc<K>,
27    hash: u64,
28    timestamp: Instant,
29}
30
31impl<K> Clone for KeyDateLite<K> {
32    fn clone(&self) -> Self {
33        Self {
34            key: Arc::clone(&self.key),
35            hash: self.hash,
36            timestamp: self.timestamp,
37        }
38    }
39}
40
41impl<K> KeyDateLite<K> {
42    pub(crate) fn new(key: &Arc<K>, hash: u64, timestamp: Instant) -> Self {
43        Self {
44            key: Arc::clone(key),
45            hash,
46            timestamp,
47        }
48    }
49}
50
51pub(crate) struct Invalidator<K, V, S> {
52    predicates: crate::cht::SegmentedHashMap<PredicateId, Predicate<K, V>, S>,
53    is_empty: AtomicBool,
54    scan_context: Arc<ScanContext<K, V>>,
55}
56
57//
58// Crate public methods.
59//
60impl<K, V, S> Invalidator<K, V, S> {
61    pub(crate) fn new(hasher: S) -> Self
62    where
63        S: BuildHasher,
64    {
65        const CAPACITY: usize = 0;
66        let predicates = crate::cht::SegmentedHashMap::with_num_segments_capacity_and_hasher(
67            PREDICATE_MAP_NUM_SEGMENTS,
68            CAPACITY,
69            hasher,
70        );
71        Self {
72            predicates,
73            is_empty: AtomicBool::new(true),
74            scan_context: Arc::new(ScanContext::default()),
75        }
76    }
77
78    pub(crate) fn is_empty(&self) -> bool {
79        self.is_empty.load(Ordering::Acquire)
80    }
81
82    pub(crate) fn remove_predicates_registered_before(&self, ts: Instant)
83    where
84        K: Hash + Eq + Send + Sync + 'static,
85        V: Clone + Send + Sync + 'static,
86        S: BuildHasher,
87    {
88        let pred_map = &self.predicates;
89
90        let removing_ids = pred_map
91            .iter()
92            .filter(|(_, pred)| pred.registered_at <= ts)
93            .map(|(id, _)| id)
94            .collect::<Vec<_>>();
95
96        for id in removing_ids {
97            let hash = pred_map.hash(&id);
98            pred_map.remove(hash, |k| k == &id);
99        }
100
101        if pred_map.is_empty() {
102            self.is_empty.store(true, Ordering::Release);
103        }
104    }
105
106    pub(crate) fn register_predicate(
107        &self,
108        predicate: PredicateFun<K, V>,
109        registered_at: Instant,
110    ) -> Result<PredicateId, PredicateError>
111    where
112        K: Hash + Eq,
113        S: BuildHasher,
114    {
115        const MAX_RETRY: usize = 1_000;
116        let mut tries = 0;
117        let preds = &self.predicates;
118
119        while tries < MAX_RETRY {
120            let id = Uuid::new_v4().as_hyphenated().to_string();
121
122            let hash = preds.hash(&id);
123            if preds.contains_key(hash, |k| k == &id) {
124                tries += 1;
125
126                continue; // Retry
127            }
128            let pred = Predicate::new(&id, predicate, registered_at);
129            preds.insert_entry_and(id.clone(), hash, pred, |_, _| ());
130            self.is_empty.store(false, Ordering::Release);
131
132            return Ok(id);
133        }
134
135        // Since we are using 128-bit UUID for the ID and we do retries for MAX_RETRY
136        // times, this panic should extremely unlikely occur (unless there is a bug in
137        // UUID generation).
138        panic!("Cannot assign a new PredicateId to a predicate");
139    }
140
141    // This method will be called by the get method of Cache.
142    #[inline]
143    pub(crate) fn apply_predicates(&self, key: &Arc<K>, entry: &MiniArc<ValueEntry<K, V>>) -> bool
144    where
145        K: Hash + Eq + Send + Sync + 'static,
146        V: Clone + Send + Sync + 'static,
147        S: BuildHasher,
148    {
149        if self.is_empty() {
150            false
151        } else if let Some(ts) = entry.last_modified() {
152            Self::do_apply_predicates(
153                self.predicates.iter().map(|(_, v)| v),
154                key,
155                &entry.value,
156                ts,
157            )
158        } else {
159            false
160        }
161    }
162
163    pub(crate) fn scan_and_invalidate(
164        &self,
165        cache: &Inner<K, V, S>,
166        candidates: Vec<KeyDateLite<K>>,
167        is_truncated: bool,
168    ) -> (Vec<KvEntry<K, V>>, bool)
169    where
170        K: Hash + Eq + Send + Sync + 'static,
171        V: Clone + Send + Sync + 'static,
172        S: BuildHasher,
173    {
174        let mut predicates = self.scan_context.predicates.lock();
175        if predicates.is_empty() {
176            *predicates = self.predicates.iter().map(|(_k, v)| v).collect();
177        }
178
179        let mut invalidated = Vec::default();
180        let mut newest_timestamp = None;
181
182        for candidate in &candidates {
183            let key = &candidate.key;
184            let hash = candidate.hash;
185            let ts = candidate.timestamp;
186            if self.apply(&predicates, cache, key, hash, ts) {
187                if let Some(entry) = Self::invalidate(cache, key, hash, ts) {
188                    invalidated.push(KvEntry {
189                        key: Arc::clone(key),
190                        entry,
191                    });
192                }
193            }
194            newest_timestamp = Some(ts);
195        }
196
197        self.remove_finished_predicates(predicates, is_truncated, newest_timestamp);
198
199        (invalidated, self.predicates.is_empty())
200    }
201}
202
203//
204// Private methods.
205//
206impl<K, V, S> Invalidator<K, V, S>
207where
208    K: Hash + Eq,
209    S: BuildHasher,
210{
211    #[inline]
212    fn do_apply_predicates<I>(predicates: I, key: &K, value: &V, ts: Instant) -> bool
213    where
214        I: Iterator<Item = Predicate<K, V>>,
215    {
216        for predicate in predicates {
217            if predicate.is_applicable(ts) && predicate.apply(key, value) {
218                return true;
219            }
220        }
221        false
222    }
223
224    fn remove_finished_predicates(
225        &self,
226        mut predicates: MutexGuard<'_, Vec<Predicate<K, V>>>,
227        is_truncated: bool,
228        newest_timestamp: Option<Instant>,
229    ) where
230        K: Hash + Eq,
231        S: BuildHasher,
232    {
233        let predicates = &mut *predicates;
234        if is_truncated {
235            if let Some(ts) = newest_timestamp {
236                let (active, finished): (Vec<_>, Vec<_>) =
237                    predicates.drain(..).partition(|p| p.is_applicable(ts));
238
239                // Remove finished predicates from the predicate registry.
240                self.remove_predicates(&finished);
241                // Set the active predicates to the scan context.
242                *predicates = active;
243            } else {
244                unreachable!();
245            }
246        } else {
247            // Remove all the predicates from the predicate registry and scan context.
248            self.remove_predicates(predicates);
249            predicates.clear();
250        }
251    }
252
253    fn remove_predicates(&self, predicates: &[Predicate<K, V>])
254    where
255        K: Hash + Eq,
256        S: BuildHasher,
257    {
258        let pred_map = &self.predicates;
259        for p in predicates.iter() {
260            let hash = pred_map.hash(p.id());
261            pred_map.remove(hash, |k| k == p.id());
262        }
263
264        if pred_map.is_empty() {
265            self.is_empty.store(true, Ordering::Release);
266        }
267    }
268
269    fn apply(
270        &self,
271        predicates: &[Predicate<K, V>],
272        cache: &Inner<K, V, S>,
273        key: &Arc<K>,
274        hash: u64,
275        ts: Instant,
276    ) -> bool {
277        if let Some(entry) = cache.cache.get(hash, |k| k == key) {
278            if let Some(lm) = entry.last_modified() {
279                if lm == ts {
280                    return Invalidator::<_, _, S>::do_apply_predicates(
281                        predicates.iter().cloned(),
282                        key,
283                        &entry.value,
284                        lm,
285                    );
286                }
287            }
288        }
289
290        false
291    }
292
293    fn invalidate(
294        cache: &Inner<K, V, S>,
295        key: &Arc<K>,
296        hash: u64,
297        ts: Instant,
298    ) -> Option<MiniArc<ValueEntry<K, V>>>
299    where
300        K: Send + Sync + 'static,
301        V: Clone + Send + Sync + 'static,
302    {
303        // Lock the key for removal if blocking removal notification is enabled.
304        let kl = cache.maybe_key_lock(key);
305        let _klg = &kl.as_ref().map(|kl| kl.lock());
306
307        let maybe_entry = cache.cache.remove_if(
308            hash,
309            |k| k == key,
310            |_, v| {
311                if let Some(lm) = v.last_modified() {
312                    lm == ts
313                } else {
314                    false
315                }
316            },
317        );
318        if let Some(entry) = &maybe_entry {
319            if cache.is_removal_notifier_enabled() {
320                cache.notify_single_removal(Arc::clone(key), entry, RemovalCause::Explicit);
321            }
322        }
323        maybe_entry
324    }
325}
326
327//
328// for testing
329//
330#[cfg(test)]
331impl<K, V, S> Invalidator<K, V, S> {
332    pub(crate) fn predicate_count(&self) -> usize {
333        self.predicates.len()
334    }
335}
336
337struct ScanContext<K, V> {
338    predicates: Mutex<Vec<Predicate<K, V>>>,
339}
340
341impl<K, V> Default for ScanContext<K, V> {
342    fn default() -> Self {
343        Self {
344            predicates: Mutex::new(Vec::default()),
345        }
346    }
347}
348
349struct Predicate<K, V> {
350    id: PredicateId,
351    f: PredicateFun<K, V>,
352    registered_at: Instant,
353}
354
355impl<K, V> Clone for Predicate<K, V> {
356    fn clone(&self) -> Self {
357        Self {
358            id: self.id.clone(),
359            f: Arc::clone(&self.f),
360            registered_at: self.registered_at,
361        }
362    }
363}
364
365impl<K, V> Predicate<K, V> {
366    fn new(id: PredicateIdStr<'_>, f: PredicateFun<K, V>, registered_at: Instant) -> Self {
367        Self {
368            id: id.to_string(),
369            f,
370            registered_at,
371        }
372    }
373
374    fn id(&self) -> PredicateIdStr<'_> {
375        &self.id
376    }
377
378    fn is_applicable(&self, last_modified: Instant) -> bool {
379        last_modified <= self.registered_at
380    }
381
382    fn apply(&self, key: &K, value: &V) -> bool {
383        (self.f)(key, value)
384    }
385}