1use super::bucket::{self, Bucket, BucketArray, InsertOrModifyState, RehashOp};
2
3use std::{
4 hash::{BuildHasher, Hash},
5 sync::atomic::{AtomicUsize, Ordering},
6};
7
8use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};
9
10pub(crate) struct BucketArrayRef<'a, K, V, S> {
11 pub(crate) bucket_array: &'a Atomic<BucketArray<K, V>>,
12 pub(crate) build_hasher: &'a S,
13 pub(crate) len: &'a AtomicUsize,
14}
15
16impl<K, V, S> BucketArrayRef<'_, K, V, S>
17where
18 K: Hash + Eq,
19 S: BuildHasher,
20{
21 pub(crate) fn get_key_value_and_then<T>(
22 &self,
23 hash: u64,
24 mut eq: impl FnMut(&K) -> bool,
25 with_entry: impl FnOnce(&K, &V) -> Option<T>,
26 ) -> Option<T> {
27 let guard = &crossbeam_epoch::pin();
28 let current_ref = self.get(guard);
29 let mut bucket_array_ref = current_ref;
30
31 let result;
32
33 loop {
34 match bucket_array_ref
35 .get(guard, hash, &mut eq)
36 .map(|p| unsafe { p.as_ref() })
37 {
38 Ok(Some(Bucket {
39 key,
40 maybe_value: value,
41 })) => {
42 result = with_entry(key, unsafe { &*value.as_ptr() });
43 break;
44 }
45 Ok(None) => {
46 result = None;
47 break;
48 }
49 Err(_) => {
50 if let Some(r) =
51 bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
52 {
53 bucket_array_ref = r;
54 }
55 }
56 }
57 }
58
59 self.swing(guard, current_ref, bucket_array_ref);
60
61 result
62 }
63
64 pub(crate) fn remove_entry_if_and<T>(
65 &self,
66 hash: u64,
67 mut eq: impl FnMut(&K) -> bool,
68 mut condition: impl FnMut(&K, &V) -> bool,
69 with_previous_entry: impl FnOnce(&K, &V) -> T,
70 ) -> Option<T> {
71 let guard = &crossbeam_epoch::pin();
72 let current_ref = self.get(guard);
73 let mut bucket_array_ref = current_ref;
74
75 let result;
76
77 loop {
78 loop {
79 let rehash_op = RehashOp::new(
80 bucket_array_ref.capacity(),
81 &bucket_array_ref.tombstone_count,
82 self.len,
83 );
84 if rehash_op.is_skip() {
85 break;
86 }
87 if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
88 bucket_array_ref = r;
89 }
90 }
91
92 match bucket_array_ref.remove_if(guard, hash, &mut eq, condition) {
93 Ok(previous_bucket_ptr) => {
94 if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
95 let Bucket {
96 key,
97 maybe_value: value,
98 } = previous_bucket_ref;
99 self.len.fetch_sub(1, Ordering::Relaxed);
100 bucket_array_ref
101 .tombstone_count
102 .fetch_add(1, Ordering::Relaxed);
103 result = Some(with_previous_entry(key, unsafe { &*value.as_ptr() }));
104
105 unsafe { bucket::defer_destroy_tombstone(guard, previous_bucket_ptr) };
106 } else {
107 result = None;
108 }
109
110 break;
111 }
112 Err(c) => {
113 condition = c;
114 if let Some(r) =
115 bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
116 {
117 bucket_array_ref = r;
118 }
119 }
120 }
121 }
122
123 self.swing(guard, current_ref, bucket_array_ref);
124
125 result
126 }
127
128 pub(crate) fn insert_if_not_present_and<T>(
129 &self,
130 key: K,
131 hash: u64,
132 on_insert: impl FnOnce() -> V,
133 with_existing_entry: impl FnOnce(&K, &V) -> T,
134 ) -> Option<T> {
135 use bucket::InsertionResult;
136
137 let guard = &crossbeam_epoch::pin();
138 let current_ref = self.get(guard);
139 let mut bucket_array_ref = current_ref;
140 let mut state = InsertOrModifyState::New(key, on_insert);
141
142 let result;
143
144 loop {
145 loop {
146 let rehash_op = RehashOp::new(
147 bucket_array_ref.capacity(),
148 &bucket_array_ref.tombstone_count,
149 self.len,
150 );
151 if rehash_op.is_skip() {
152 break;
153 }
154 if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
155 bucket_array_ref = r;
156 }
157 }
158
159 match bucket_array_ref.insert_if_not_present(guard, hash, state) {
160 Ok(InsertionResult::AlreadyPresent(current_bucket_ptr)) => {
161 let current_bucket_ref = unsafe { current_bucket_ptr.as_ref() }.unwrap();
162 assert!(!bucket::is_tombstone(current_bucket_ptr));
163 let Bucket {
164 key,
165 maybe_value: value,
166 } = current_bucket_ref;
167 result = Some(with_existing_entry(key, unsafe { &*value.as_ptr() }));
168 break;
169 }
170 Ok(InsertionResult::Inserted) => {
171 self.len.fetch_add(1, Ordering::Relaxed);
172 result = None;
173 break;
174 }
175 Ok(InsertionResult::ReplacedTombstone(previous_bucket_ptr)) => {
176 assert!(bucket::is_tombstone(previous_bucket_ptr));
177 self.len.fetch_add(1, Ordering::Relaxed);
178 unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) };
179 result = None;
180 break;
181 }
182 Err(s) => {
183 state = s;
184 if let Some(r) =
185 bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
186 {
187 bucket_array_ref = r;
188 }
189 }
190 }
191 }
192
193 self.swing(guard, current_ref, bucket_array_ref);
194
195 result
196 }
197
198 pub(crate) fn insert_with_or_modify_entry_and<T>(
199 &self,
200 key: K,
201 hash: u64,
202 on_insert: impl FnOnce() -> V,
203 mut on_modify: impl FnMut(&K, &V) -> V,
204 with_old_entry: impl FnOnce(&K, &V) -> T,
205 ) -> Option<T> {
206 let guard = &crossbeam_epoch::pin();
207 let current_ref = self.get(guard);
208 let mut bucket_array_ref = current_ref;
209 let mut state = InsertOrModifyState::New(key, on_insert);
210
211 let result;
212
213 loop {
214 loop {
215 let rehash_op = RehashOp::new(
216 bucket_array_ref.capacity(),
217 &bucket_array_ref.tombstone_count,
218 self.len,
219 );
220 if rehash_op.is_skip() {
221 break;
222 }
223 if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
224 bucket_array_ref = r;
225 }
226 }
227
228 match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
229 Ok(previous_bucket_ptr) => {
230 if let Some(previous_bucket_ref) = unsafe { previous_bucket_ptr.as_ref() } {
231 if bucket::is_tombstone(previous_bucket_ptr) {
232 self.len.fetch_add(1, Ordering::Relaxed);
233 result = None;
234 } else {
235 let Bucket {
236 key,
237 maybe_value: value,
238 } = previous_bucket_ref;
239 result = Some(with_old_entry(key, unsafe { &*value.as_ptr() }));
240 }
241
242 unsafe { bucket::defer_destroy_bucket(guard, previous_bucket_ptr) };
243 } else {
244 self.len.fetch_add(1, Ordering::Relaxed);
245 result = None;
246 }
247
248 break;
249 }
250 Err((s, f)) => {
251 state = s;
252 on_modify = f;
253 if let Some(r) =
254 bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
255 {
256 bucket_array_ref = r;
257 }
258 }
259 }
260 }
261
262 self.swing(guard, current_ref, bucket_array_ref);
263
264 result
265 }
266
267 pub(crate) fn keys<T>(&self, mut with_key: impl FnMut(&K) -> T) -> Vec<T> {
268 let guard = &crossbeam_epoch::pin();
269 let current_ref = self.get(guard);
270 let mut bucket_array_ref = current_ref;
271
272 let result;
273
274 loop {
275 match bucket_array_ref.keys(guard, &mut with_key) {
276 Ok(keys) => {
277 result = keys;
278 break;
279 }
280 Err(_) => {
281 if let Some(r) =
282 bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
283 {
284 bucket_array_ref = r;
285 }
286 }
287 }
288 }
289
290 self.swing(guard, current_ref, bucket_array_ref);
291
292 result
293 }
294}
295
296impl<'g, K, V, S> BucketArrayRef<'_, K, V, S> {
297 fn get(&self, guard: &'g Guard) -> &'g BucketArray<K, V> {
298 let mut maybe_new_bucket_array = None;
299
300 loop {
301 let bucket_array_ptr = self.bucket_array.load_consume(guard);
302
303 if let Some(bucket_array_ref) = unsafe { bucket_array_ptr.as_ref() } {
304 return bucket_array_ref;
305 }
306
307 let new_bucket_array =
308 maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));
309
310 match self.bucket_array.compare_exchange_weak(
311 Shared::null(),
312 new_bucket_array,
313 Ordering::AcqRel,
314 Ordering::Relaxed,
315 guard,
316 ) {
317 Ok(b) => return unsafe { b.as_ref() }.unwrap(),
318 Err(CompareExchangeError { new, .. }) => maybe_new_bucket_array = Some(new),
319 }
320 }
321 }
322
323 fn swing(
324 &self,
325 guard: &'g Guard,
326 mut current_ref: &'g BucketArray<K, V>,
327 min_ref: &'g BucketArray<K, V>,
328 ) {
329 let min_epoch = min_ref.epoch;
330
331 let mut current_ptr = (current_ref as *const BucketArray<K, V>).into();
332 let min_ptr: Shared<'g, _> = (min_ref as *const BucketArray<K, V>).into();
333
334 loop {
335 if current_ref.epoch >= min_epoch {
336 return;
337 }
338
339 match self.bucket_array.compare_exchange_weak(
340 current_ptr,
341 min_ptr,
342 Ordering::AcqRel,
343 Ordering::Relaxed,
344 guard,
345 ) {
346 Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) },
347 Err(_) => {
348 let new_ptr = self.bucket_array.load_consume(guard);
349 assert!(!new_ptr.is_null());
350
351 current_ptr = new_ptr;
352 current_ref = unsafe { new_ptr.as_ref() }.unwrap();
353 }
354 }
355 }
356 }
357}