moka/cht/map/
bucket_array_ref.rs

1use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState, RehashOp};
2
3use std::{
4    hash::{BuildHasher, Hash},
5    sync::atomic::{AtomicUsize, Ordering},
6};
7
8use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};
9
10pub(crate) struct BucketArrayRef<'a, K, V, S> {
11    pub(crate) bucket_array: &'a Atomic<BucketArray<K, V>>,
12    pub(crate) build_hasher: &'a S,
13    pub(crate) len: &'a AtomicUsize,
14}
15
16impl<K, V, S> BucketArrayRef<'_, K, V, S>
17where
18    K: Hash + Eq,
19    S: BuildHasher,
20{
21    pub(crate) fn get_key_value_and_then<T>(
22        &self,
23        hash: u64,
24        mut eq: impl FnMut(&K) -> bool,
25        with_entry: impl FnOnce(&K, &V) -> Option<T>,
26    ) -> Option<T> {
27        let guard = &crossbeam_epoch::pin();
28        let current_ref = self.get(guard);
29        let mut bucket_array_ref = current_ref;
30
31        let result;
32
33        loop {
34            match bucket_array_ref
35                .get(guard, hash, &mut eq)
36                .map(|p| unsafe { p.as_ref() })
37            {
38                Ok(Some(Bucket {
39                    key,
40                    maybe_value: value,
41                })) => {
42                    result = with_entry(key, unsafe { &*value.as_ptr() });
43                    break;
44                }
45                Ok(None) => {
46                    result = None;
47                    break;
48                }
49                Err(_) => {
50                    if let Some(r) =
51                        bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
52                    {
53                        bucket_array_ref = r;
54                    }
55                }
56            }
57        }
58
59        self.swing(guard, current_ref, bucket_array_ref);
60
61        result
62    }
63
64    pub(crate) fn remove_entry_if_and<T>(
65        &self,
66        hash: u64,
67        mut eq: impl FnMut(&K) -> bool,
68        mut condition: impl FnMut(&K, &V) -> bool,
69        with_previous_entry: impl FnOnce(&K, &V) -> T,
70    ) -> Option<T> {
71        let guard = &crossbeam_epoch::pin();
72        let current_ref = self.get(guard);
73        let mut bucket_array_ref = current_ref;
74
75        let result;
76
77        loop {
78            loop {
79                let rehash_op = RehashOp::new(
80                    bucket_array_ref.capacity(),
81                    &bucket_array_ref.tombstone_count,
82                    self.len,
83                );
84                if rehash_op.is_skip() {
85                    break;
86                }
87                if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
88                    bucket_array_ref = r;
89                }
90            }
91
92            match bucket_array_ref.remove_if(guard, hash, &mut eq, condition) {
93                Ok(previous_bucket_ptr) => {
94                    if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
95                        let Bucket {
96                            key,
97                            maybe_value: value,
98                        } = previous_bucket_ref;
99                        self.len.fetch_sub(1, Ordering::Relaxed);
100                        bucket_array_ref
101                            .tombstone_count
102                            .fetch_add(1, Ordering::Relaxed);
103                        result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));
104
105                        unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) };
106                    } else {
107                        result = None;
108                    }
109
110                    break;
111                }
112                Err(c) => {
113                    condition = c;
114                    if let Some(r) =
115                        bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
116                    {
117                        bucket_array_ref = r;
118                    }
119                }
120            }
121        }
122
123        self.swing(guard, current_ref, bucket_array_ref);
124
125        result
126    }
127
128    pub(crate) fn insert_if_not_present_and<T>(
129        &self,
130        key: K,
131        hash: u64,
132        on_insert: impl FnOnce() -> V,
133        with_existing_entry: impl FnOnce(&K, &V) -> T,
134    ) -> Option<T> {
135        use bucket::InsertionResult;
136
137        let guard = &crossbeam_epoch::pin();
138        let current_ref = self.get(guard);
139        let mut bucket_array_ref = current_ref;
140        let mut state = InsertOrModifyState::New(key, on_insert);
141
142        let result;
143
144        loop {
145            loop {
146                let rehash_op = RehashOp::new(
147                    bucket_array_ref.capacity(),
148                    &bucket_array_ref.tombstone_count,
149                    self.len,
150                );
151                if rehash_op.is_skip() {
152                    break;
153                }
154                if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
155                    bucket_array_ref = r;
156                }
157            }
158
159            match bucket_array_ref.insert_if_not_present(guard, hash, state) {
160                Ok(InsertionResult::AlreadyPresent(current_bucket_ptr)) => {
161                    let current_bucket_ref = unsafe { current_bucket_ptr.as_ref() }.unwrap();
162                    assert!(!bucket::is_tombstone(current_bucket_ptr));
163                    let Bucket {
164                        key,
165                        maybe_value: value,
166                    } = current_bucket_ref;
167                    result = Some(with_existing_entry(key, unsafe { &*value.as_ptr() }));
168                    break;
169                }
170                Ok(InsertionResult::Inserted) => {
171                    self.len.fetch_add(1, Ordering::Relaxed);
172                    result = None;
173                    break;
174                }
175                Ok(InsertionResult::ReplacedTombstone(previous_bucket_ptr)) => {
176                    assert!(bucket::is_tombstone(previous_bucket_ptr));
177                    self.len.fetch_add(1, Ordering::Relaxed);
178                    unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) };
179                    result = None;
180                    break;
181                }
182                Err(s) => {
183                    state = s;
184                    if let Some(r) =
185                        bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
186                    {
187                        bucket_array_ref = r;
188                    }
189                }
190            }
191        }
192
193        self.swing(guard, current_ref, bucket_array_ref);
194
195        result
196    }
197
198    pub(crate) fn insert_with_or_modify_entry_and<T>(
199        &self,
200        key: K,
201        hash: u64,
202        on_insert: impl FnOnce() -> V,
203        mut on_modify: impl FnMut(&K, &V) -> V,
204        with_old_entry: impl FnOnce(&K, &V) -> T,
205    ) -> Option<T> {
206        let guard = &crossbeam_epoch::pin();
207        let current_ref = self.get(guard);
208        let mut bucket_array_ref = current_ref;
209        let mut state = InsertOrModifyState::New(key, on_insert);
210
211        let result;
212
213        loop {
214            loop {
215                let rehash_op = RehashOp::new(
216                    bucket_array_ref.capacity(),
217                    &bucket_array_ref.tombstone_count,
218                    self.len,
219                );
220                if rehash_op.is_skip() {
221                    break;
222                }
223                if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
224                    bucket_array_ref = r;
225                }
226            }
227
228            match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
229                Ok(previous_bucket_ptr) => {
230                    if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
231                        if bucket::is_tombstone(previous_bucket_ptr) {
232                            self.len.fetch_add(1, Ordering::Relaxed);
233                            result = None;
234                        } else {
235                            let Bucket {
236                                key,
237                                maybe_value: value,
238                            } = previous_bucket_ref;
239                            result = Some(with_old_entry(key, unsafe { &*value.as_ptr() }));
240                        }
241
242                        unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) };
243                    } else {
244                        self.len.fetch_add(1, Ordering::Relaxed);
245                        result = None;
246                    }
247
248                    break;
249                }
250                Err((s, f)) => {
251                    state = s;
252                    on_modify = f;
253                    if let Some(r) =
254                        bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
255                    {
256                        bucket_array_ref = r;
257                    }
258                }
259            }
260        }
261
262        self.swing(guard, current_ref, bucket_array_ref);
263
264        result
265    }
266
267    pub(crate) fn keys<T>(&self, mut with_key: impl FnMut(&K) -> T) -> Vec<T> {
268        let guard = &crossbeam_epoch::pin();
269        let current_ref = self.get(guard);
270        let mut bucket_array_ref = current_ref;
271
272        let result;
273
274        loop {
275            match bucket_array_ref.keys(guard, &mut with_key) {
276                Ok(keys) => {
277                    result = keys;
278                    break;
279                }
280                Err(_) => {
281                    if let Some(r) =
282                        bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
283                    {
284                        bucket_array_ref = r;
285                    }
286                }
287            }
288        }
289
290        self.swing(guard, current_ref, bucket_array_ref);
291
292        result
293    }
294}
295
296impl<'g, K, V, S> BucketArrayRef<'_, K, V, S> {
297    fn get(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
298        let mut maybe_new_bucket_array = None;
299
300        loop {
301            let bucket_array_ptr = self.bucket_array.load_consume(guard);
302
303            if let Some(bucket_array_ref) = unsafe { bucket_array_ptr.as_ref() } {
304                return bucket_array_ref;
305            }
306
307            let new_bucket_array =
308                maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));
309
310            match self.bucket_array.compare_exchange_weak(
311                Shared::null(),
312                new_bucket_array,
313                Ordering::AcqRel,
314                Ordering::Relaxed,
315                guard,
316            ) {
317                Ok(b) => return unsafe { b.as_ref() }.unwrap(),
318                Err(CompareExchangeError { new, .. }) => maybe_new_bucket_array = Some(new),
319            }
320        }
321    }
322
323    fn swing(
324        &self,
325        guard: &'g Guard,
326        mut current_ref: &'g BucketArray<K, V>,
327        min_ref: &'g BucketArray<K, V>,
328    ) {
329        let min_epoch = min_ref.epoch;
330
331        let mut current_ptr = (current_ref as *const BucketArray<K, V>).into();
332        let min_ptr: Shared<'g, _> = (min_ref as *const BucketArray<K, V>).into();
333
334        loop {
335            if current_ref.epoch >= min_epoch {
336                return;
337            }
338
339            match self.bucket_array.compare_exchange_weak(
340                current_ptr,
341                min_ptr,
342                Ordering::AcqRel,
343                Ordering::Relaxed,
344                guard,
345            ) {
346                Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) },
347                Err(_) => {
348                    let new_ptr = self.bucket_array.load_consume(guard);
349                    assert!(!new_ptr.is_null());
350
351                    current_ptr = new_ptr;
352                    current_ref = unsafe { new_ptr.as_ref() }.unwrap();
353                }
354            }
355        }
356    }
357}