moka/common/
concurrent.rs

1use crate::common::{concurrent::arc::MiniArc, deque::DeqNode, time::Instant};
2
3use parking_lot::Mutex;
4use std::{fmt, ptr::NonNull, sync::Arc};
5use tagptr::TagNonNull;
6
7pub(crate) mod arc;
8pub(crate) mod constants;
9pub(crate) mod deques;
10pub(crate) mod entry_info;
11
12#[cfg(feature = "sync")]
13pub(crate) mod housekeeper;
14
15#[cfg(feature = "unstable-debug-counters")]
16pub(crate) mod debug_counters;
17
18use self::entry_info::EntryInfo;
19
20use super::timer_wheel::TimerNode;
21
22pub(crate) type Weigher<K, V> = Arc<dyn Fn(&K, &V) -> u32 + Send + Sync + 'static>;
23
24pub(crate) trait AccessTime {
25    fn last_accessed(&self) -> Option<Instant>;
26    fn set_last_accessed(&self, timestamp: Instant);
27    fn last_modified(&self) -> Option<Instant>;
28    fn set_last_modified(&self, timestamp: Instant);
29}
30
31#[derive(Debug)]
32pub(crate) struct KeyHash<K> {
33    pub(crate) key: Arc<K>,
34    pub(crate) hash: u64,
35}
36
37impl<K> KeyHash<K> {
38    pub(crate) fn new(key: Arc<K>, hash: u64) -> Self {
39        Self { key, hash }
40    }
41}
42
43impl<K> Clone for KeyHash<K> {
44    fn clone(&self) -> Self {
45        Self {
46            key: Arc::clone(&self.key),
47            hash: self.hash,
48        }
49    }
50}
51
52pub(crate) struct KeyHashDate<K> {
53    entry_info: MiniArc<EntryInfo<K>>,
54}
55
56impl<K> KeyHashDate<K> {
57    pub(crate) fn new(entry_info: &MiniArc<EntryInfo<K>>) -> Self {
58        Self {
59            entry_info: MiniArc::clone(entry_info),
60        }
61    }
62
63    pub(crate) fn key(&self) -> &Arc<K> {
64        &self.entry_info.key_hash().key
65    }
66
67    pub(crate) fn hash(&self) -> u64 {
68        self.entry_info.key_hash().hash
69    }
70
71    pub(crate) fn entry_info(&self) -> &EntryInfo<K> {
72        &self.entry_info
73    }
74
75    pub(crate) fn last_modified(&self) -> Option<Instant> {
76        self.entry_info.last_modified()
77    }
78
79    pub(crate) fn last_accessed(&self) -> Option<Instant> {
80        self.entry_info.last_accessed()
81    }
82
83    pub(crate) fn is_dirty(&self) -> bool {
84        self.entry_info.is_dirty()
85    }
86}
87
88pub(crate) struct KvEntry<K, V> {
89    pub(crate) key: Arc<K>,
90    pub(crate) entry: MiniArc<ValueEntry<K, V>>,
91}
92
93impl<K, V> KvEntry<K, V> {
94    pub(crate) fn new(key: Arc<K>, entry: MiniArc<ValueEntry<K, V>>) -> Self {
95        Self { key, entry }
96    }
97}
98
99impl<K, V> Clone for KvEntry<K, V> {
100    fn clone(&self) -> Self {
101        Self {
102            key: Arc::clone(&self.key),
103            entry: MiniArc::clone(&self.entry),
104        }
105    }
106}
107
108impl<K> AccessTime for DeqNode<KeyHashDate<K>> {
109    #[inline]
110    fn last_accessed(&self) -> Option<Instant> {
111        self.element.entry_info.last_accessed()
112    }
113
114    #[inline]
115    fn set_last_accessed(&self, timestamp: Instant) {
116        self.element.entry_info.set_last_accessed(timestamp);
117    }
118
119    #[inline]
120    fn last_modified(&self) -> Option<Instant> {
121        self.element.entry_info.last_modified()
122    }
123
124    #[inline]
125    fn set_last_modified(&self, timestamp: Instant) {
126        self.element.entry_info.set_last_modified(timestamp);
127    }
128}
129
130// DeqNode for an access order queue.
131type KeyDeqNodeAo<K> = TagNonNull<DeqNode<KeyHashDate<K>>, 2>;
132
133// DeqNode for the write order queue.
134type KeyDeqNodeWo<K> = NonNull<DeqNode<KeyHashDate<K>>>;
135
136// DeqNode for the timer wheel.
137type DeqNodeTimer<K> = NonNull<DeqNode<TimerNode<K>>>;
138
139pub(crate) struct DeqNodes<K> {
140    access_order_q_node: Option<KeyDeqNodeAo<K>>,
141    write_order_q_node: Option<KeyDeqNodeWo<K>>,
142    timer_node: Option<DeqNodeTimer<K>>,
143}
144
145impl<K> Default for DeqNodes<K> {
146    fn default() -> Self {
147        Self {
148            access_order_q_node: None,
149            write_order_q_node: None,
150            timer_node: None,
151        }
152    }
153}
154
155// We need this `unsafe impl` as DeqNodes have NonNull pointers.
156unsafe impl<K> Send for DeqNodes<K> {}
157
158impl<K> DeqNodes<K> {
159    pub(crate) fn set_timer_node(&mut self, timer_node: Option<DeqNodeTimer<K>>) {
160        self.timer_node = timer_node;
161    }
162}
163
164pub(crate) struct ValueEntry<K, V> {
165    pub(crate) value: V,
166    info: MiniArc<EntryInfo<K>>,
167    nodes: MiniArc<Mutex<DeqNodes<K>>>,
168}
169
170impl<K, V> ValueEntry<K, V> {
171    pub(crate) fn new(value: V, entry_info: MiniArc<EntryInfo<K>>) -> Self {
172        #[cfg(feature = "unstable-debug-counters")]
173        self::debug_counters::InternalGlobalDebugCounters::value_entry_created();
174
175        Self {
176            value,
177            info: entry_info,
178            nodes: MiniArc::new(Mutex::new(DeqNodes::default())),
179        }
180    }
181
182    pub(crate) fn new_from(value: V, entry_info: MiniArc<EntryInfo<K>>, other: &Self) -> Self {
183        #[cfg(feature = "unstable-debug-counters")]
184        self::debug_counters::InternalGlobalDebugCounters::value_entry_created();
185        Self {
186            value,
187            info: entry_info,
188            nodes: MiniArc::clone(&other.nodes),
189        }
190    }
191
192    pub(crate) fn entry_info(&self) -> &MiniArc<EntryInfo<K>> {
193        &self.info
194    }
195
196    pub(crate) fn is_admitted(&self) -> bool {
197        self.info.is_admitted()
198    }
199
200    pub(crate) fn set_admitted(&self, value: bool) {
201        self.info.set_admitted(value);
202    }
203
204    pub(crate) fn is_dirty(&self) -> bool {
205        self.info.is_dirty()
206    }
207
208    #[inline]
209    pub(crate) fn policy_weight(&self) -> u32 {
210        self.info.policy_weight()
211    }
212
213    pub(crate) fn deq_nodes(&self) -> &MiniArc<Mutex<DeqNodes<K>>> {
214        &self.nodes
215    }
216
217    pub(crate) fn access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
218        self.nodes.lock().access_order_q_node
219    }
220
221    pub(crate) fn set_access_order_q_node(&self, node: Option<KeyDeqNodeAo<K>>) {
222        self.nodes.lock().access_order_q_node = node;
223    }
224
225    pub(crate) fn take_access_order_q_node(&self) -> Option<KeyDeqNodeAo<K>> {
226        self.nodes.lock().access_order_q_node.take()
227    }
228
229    pub(crate) fn write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
230        self.nodes.lock().write_order_q_node
231    }
232
233    pub(crate) fn set_write_order_q_node(&self, node: Option<KeyDeqNodeWo<K>>) {
234        self.nodes.lock().write_order_q_node = node;
235    }
236
237    pub(crate) fn take_write_order_q_node(&self) -> Option<KeyDeqNodeWo<K>> {
238        self.nodes.lock().write_order_q_node.take()
239    }
240
241    pub(crate) fn timer_node(&self) -> Option<DeqNodeTimer<K>> {
242        self.nodes.lock().timer_node
243    }
244
245    pub(crate) fn set_timer_node(&self, node: Option<DeqNodeTimer<K>>) {
246        self.nodes.lock().timer_node = node;
247    }
248
249    pub(crate) fn take_timer_node(&self) -> Option<DeqNodeTimer<K>> {
250        self.nodes.lock().timer_node.take()
251    }
252
253    pub(crate) fn unset_q_nodes(&self) {
254        let mut nodes = self.nodes.lock();
255        nodes.access_order_q_node = None;
256        nodes.write_order_q_node = None;
257    }
258}
259
260#[cfg(feature = "unstable-debug-counters")]
261impl<K, V> Drop for ValueEntry<K, V> {
262    fn drop(&mut self) {
263        self::debug_counters::InternalGlobalDebugCounters::value_entry_dropped();
264    }
265}
266
267impl<K, V> AccessTime for MiniArc<ValueEntry<K, V>> {
268    #[inline]
269    fn last_accessed(&self) -> Option<Instant> {
270        self.info.last_accessed()
271    }
272
273    #[inline]
274    fn set_last_accessed(&self, timestamp: Instant) {
275        self.info.set_last_accessed(timestamp);
276    }
277
278    #[inline]
279    fn last_modified(&self) -> Option<Instant> {
280        self.info.last_modified()
281    }
282
283    #[inline]
284    fn set_last_modified(&self, timestamp: Instant) {
285        self.info.set_last_modified(timestamp);
286    }
287}
288
289pub(crate) enum ReadOp<K, V> {
290    Hit {
291        value_entry: MiniArc<ValueEntry<K, V>>,
292        is_expiry_modified: bool,
293    },
294    // u64 is the hash of the key.
295    Miss(u64),
296}
297
298pub(crate) enum WriteOp<K, V> {
299    Upsert {
300        key_hash: KeyHash<K>,
301        value_entry: MiniArc<ValueEntry<K, V>>,
302        /// Entry generation after the operation.
303        entry_gen: u16,
304        old_weight: u32,
305        new_weight: u32,
306    },
307    Remove {
308        kv_entry: KvEntry<K, V>,
309        entry_gen: u16,
310    },
311}
312
313/// Cloning a `WriteOp` is safe and cheap because it uses `Arc` and `MiniArc` pointers to
314/// the actual data.
315impl<K, V> Clone for WriteOp<K, V> {
316    fn clone(&self) -> Self {
317        match self {
318            Self::Upsert {
319                key_hash,
320                value_entry,
321                entry_gen,
322                old_weight,
323                new_weight,
324            } => Self::Upsert {
325                key_hash: key_hash.clone(),
326                value_entry: MiniArc::clone(value_entry),
327                entry_gen: *entry_gen,
328                old_weight: *old_weight,
329                new_weight: *new_weight,
330            },
331            Self::Remove {
332                kv_entry,
333                entry_gen,
334            } => Self::Remove {
335                kv_entry: kv_entry.clone(),
336                entry_gen: *entry_gen,
337            },
338        }
339    }
340}
341
342impl<K, V> fmt::Debug for WriteOp<K, V> {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        match self {
345            Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
346            Self::Remove { .. } => f.debug_tuple("Remove").finish(),
347        }
348    }
349}
350
351impl<K, V> WriteOp<K, V> {
352    pub(crate) fn new_upsert(
353        key: &Arc<K>,
354        hash: u64,
355        value_entry: &MiniArc<ValueEntry<K, V>>,
356        entry_generation: u16,
357        old_weight: u32,
358        new_weight: u32,
359    ) -> Self {
360        let key_hash = KeyHash::new(Arc::clone(key), hash);
361        let value_entry = MiniArc::clone(value_entry);
362        Self::Upsert {
363            key_hash,
364            value_entry,
365            entry_gen: entry_generation,
366            old_weight,
367            new_weight,
368        }
369    }
370}
371
372pub(crate) struct OldEntryInfo<K, V> {
373    pub(crate) entry: MiniArc<ValueEntry<K, V>>,
374    pub(crate) last_accessed: Option<Instant>,
375    pub(crate) last_modified: Option<Instant>,
376}
377
378impl<K, V> OldEntryInfo<K, V> {
379    pub(crate) fn new(entry: &MiniArc<ValueEntry<K, V>>) -> Self {
380        Self {
381            entry: MiniArc::clone(entry),
382            last_accessed: entry.last_accessed(),
383            last_modified: entry.last_modified(),
384        }
385    }
386}