event_listener/
intrusive.rs

1//! Intrusive linked list-based implementation of `event-listener`.
2//!
3//! This implementation crates an intrusive linked list of listeners. This list
4//! is secured using either a libstd mutex or a critical section.
5
6use crate::notify::{GenericNotify, Internal, Notification};
7use crate::sync::atomic::Ordering;
8use crate::sync::cell::{Cell, UnsafeCell};
9use crate::{RegisterResult, State, TaskRef};
10
11#[cfg(feature = "critical-section")]
12use core::cell::RefCell;
13#[cfg(all(feature = "std", not(feature = "critical-section")))]
14use core::ops::{Deref, DerefMut};
15
16use core::marker::PhantomPinned;
17use core::mem;
18use core::pin::Pin;
19use core::ptr::NonNull;
20
21pub(super) struct List<T>(
22    /// libstd-based implementation uses a normal Muetx to secure the data.
23    #[cfg(all(feature = "std", not(feature = "critical-section")))]
24    crate::sync::Mutex<Inner<T>>,
25    /// Critical-section-based implementation uses a CS cell that wraps a RefCell.
26    #[cfg(feature = "critical-section")]
27    critical_section::Mutex<RefCell<Inner<T>>>,
28);
29
30struct Inner<T> {
31    /// The head of the linked list.
32    head: Option<NonNull<Link<T>>>,
33
34    /// The tail of the linked list.
35    tail: Option<NonNull<Link<T>>>,
36
37    /// The first unnotified listener.
38    next: Option<NonNull<Link<T>>>,
39
40    /// Total number of listeners.
41    len: usize,
42
43    /// The number of notified listeners.
44    notified: usize,
45}
46
47impl<T> List<T> {
48    /// Create a new, empty event listener list.
49    pub(super) fn new() -> Self {
50        let inner = Inner {
51            head: None,
52            tail: None,
53            next: None,
54            len: 0,
55            notified: 0,
56        };
57
58        #[cfg(feature = "critical-section")]
59        {
60            Self(critical_section::Mutex::new(RefCell::new(inner)))
61        }
62
63        #[cfg(not(feature = "critical-section"))]
64        Self(crate::sync::Mutex::new(inner))
65    }
66
67    /// Get the total number of listeners without blocking.
68    #[cfg(all(feature = "std", not(feature = "critical-section")))]
69    pub(crate) fn try_total_listeners(&self) -> Option<usize> {
70        self.0.try_lock().ok().map(|list| list.len)
71    }
72
73    /// Get the total number of listeners without blocking.
74    #[cfg(feature = "critical-section")]
75    pub(crate) fn try_total_listeners(&self) -> Option<usize> {
76        Some(self.total_listeners())
77    }
78
79    /// Get the total number of listeners with blocking.
80    #[cfg(all(feature = "std", not(feature = "critical-section")))]
81    pub(crate) fn total_listeners(&self) -> usize {
82        self.0.lock().unwrap_or_else(|e| e.into_inner()).len
83    }
84
85    /// Get the total number of listeners with blocking.
86    #[cfg(feature = "critical-section")]
87    #[allow(unused)]
88    pub(crate) fn total_listeners(&self) -> usize {
89        critical_section::with(|cs| self.0.borrow(cs).borrow().len)
90    }
91}
92
93impl<T> crate::Inner<T> {
94    #[cfg(all(feature = "std", not(feature = "critical-section")))]
95    fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
96        struct ListLock<'a, 'b, T> {
97            lock: crate::sync::MutexGuard<'a, Inner<T>>,
98            inner: &'b crate::Inner<T>,
99        }
100
101        impl<T> Deref for ListLock<'_, '_, T> {
102            type Target = Inner<T>;
103
104            fn deref(&self) -> &Self::Target {
105                &self.lock
106            }
107        }
108
109        impl<T> DerefMut for ListLock<'_, '_, T> {
110            fn deref_mut(&mut self) -> &mut Self::Target {
111                &mut self.lock
112            }
113        }
114
115        impl<T> Drop for ListLock<'_, '_, T> {
116            fn drop(&mut self) {
117                update_notified(&self.inner.notified, &self.lock);
118            }
119        }
120
121        let mut list = ListLock {
122            inner: self,
123            lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
124        };
125        f(&mut list)
126    }
127
128    #[cfg(feature = "critical-section")]
129    fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
130        struct ListWrapper<'a, T> {
131            inner: &'a crate::Inner<T>,
132            list: &'a mut Inner<T>,
133        }
134
135        impl<T> Drop for ListWrapper<'_, T> {
136            fn drop(&mut self) {
137                update_notified(&self.inner.notified, self.list);
138            }
139        }
140
141        critical_section::with(move |cs| {
142            let mut list = self.list.0.borrow_ref_mut(cs);
143            let wrapper = ListWrapper {
144                inner: self,
145                list: &mut *list,
146            };
147
148            f(wrapper.list)
149        })
150    }
151
152    /// Add a new listener to the list.
153    pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
154        self.with_inner(|inner| {
155            listener.as_mut().set(Some(Listener {
156                link: UnsafeCell::new(Link {
157                    state: Cell::new(State::Created),
158                    prev: Cell::new(inner.tail),
159                    next: Cell::new(None),
160                }),
161                _pin: PhantomPinned,
162            }));
163            let listener = listener.as_pin_mut().unwrap();
164
165            {
166                let entry_guard = listener.link.get();
167                // SAFETY: We are locked, so we can access the inner `link`.
168                let entry = unsafe { entry_guard.deref() };
169
170                // Replace the tail with the new entry.
171                match mem::replace(&mut inner.tail, Some(entry.into())) {
172                    None => inner.head = Some(entry.into()),
173                    Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
174                };
175            }
176
177            // If there are no unnotified entries, this is the first one.
178            if inner.next.is_none() {
179                inner.next = inner.tail;
180            }
181
182            // Bump the entry count.
183            inner.len += 1;
184        });
185    }
186
187    /// Remove a listener from the list.
188    pub(crate) fn remove(
189        &self,
190        listener: Pin<&mut Option<Listener<T>>>,
191        propagate: bool,
192    ) -> Option<State<T>> {
193        self.with_inner(|inner| inner.remove(listener, propagate))
194    }
195
196    /// Notifies a number of entries.
197    #[cold]
198    pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
199        self.with_inner(|inner| inner.notify(notify))
200    }
201
202    /// Register a task to be notified when the event is triggered.
203    ///
204    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
205    /// isn't inserted, returns `None`.
206    pub(crate) fn register(
207        &self,
208        mut listener: Pin<&mut Option<Listener<T>>>,
209        task: TaskRef<'_>,
210    ) -> RegisterResult<T> {
211        self.with_inner(|inner| {
212            let entry_guard = match listener.as_mut().as_pin_mut() {
213                Some(listener) => listener.link.get(),
214                None => return RegisterResult::NeverInserted,
215            };
216            // SAFETY: We are locked, so we can access the inner `link`.
217            let entry = unsafe { entry_guard.deref() };
218
219            // Take out the state and check it.
220            match entry.state.replace(State::NotifiedTaken) {
221                State::Notified { tag, .. } => {
222                    // We have been notified, remove the listener.
223                    inner.remove(listener, false);
224                    RegisterResult::Notified(tag)
225                }
226
227                State::Task(other_task) => {
228                    // Only replace the task if it's different.
229                    entry.state.set(State::Task({
230                        if !task.will_wake(other_task.as_task_ref()) {
231                            task.into_task()
232                        } else {
233                            other_task
234                        }
235                    }));
236
237                    RegisterResult::Registered
238                }
239
240                _ => {
241                    // We have not been notified, register the task.
242                    entry.state.set(State::Task(task.into_task()));
243                    RegisterResult::Registered
244                }
245            }
246        })
247    }
248}
249
250impl<T> Inner<T> {
251    fn remove(
252        &mut self,
253        mut listener: Pin<&mut Option<Listener<T>>>,
254        propagate: bool,
255    ) -> Option<State<T>> {
256        let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
257        let entry = unsafe { entry_guard.deref() };
258
259        let prev = entry.prev.get();
260        let next = entry.next.get();
261
262        // Unlink from the previous entry.
263        match prev {
264            None => self.head = next,
265            Some(p) => unsafe {
266                p.as_ref().next.set(next);
267            },
268        }
269
270        // Unlink from the next entry.
271        match next {
272            None => self.tail = prev,
273            Some(n) => unsafe {
274                n.as_ref().prev.set(prev);
275            },
276        }
277
278        // If this was the first unnotified entry, update the next pointer.
279        if self.next == Some(entry.into()) {
280            self.next = next;
281        }
282
283        // The entry is now fully unlinked, so we can now take it out safely.
284        let entry = unsafe {
285            listener
286                .get_unchecked_mut()
287                .take()
288                .unwrap()
289                .link
290                .into_inner()
291        };
292
293        // This State::Created is immediately dropped and exists as a workaround for the absence of
294        // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
295        //
296        // refs: https://github.com/tokio-rs/loom/pull/341
297        let mut state = entry.state.replace(State::Created);
298
299        // Update the notified count.
300        if state.is_notified() {
301            self.notified -= 1;
302
303            if propagate {
304                let state = mem::replace(&mut state, State::NotifiedTaken);
305                if let State::Notified { additional, tag } = state {
306                    let tags = {
307                        let mut tag = Some(tag);
308                        move || tag.take().expect("tag already taken")
309                    };
310                    self.notify(GenericNotify::new(1, additional, tags));
311                }
312            }
313        }
314        self.len -= 1;
315
316        Some(state)
317    }
318
319    #[cold]
320    fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
321        let mut n = notify.count(Internal::new());
322        let is_additional = notify.is_additional(Internal::new());
323
324        if !is_additional {
325            if n < self.notified {
326                return 0;
327            }
328            n -= self.notified;
329        }
330
331        let original_count = n;
332        while n > 0 {
333            n -= 1;
334
335            // Notify the next entry.
336            match self.next {
337                None => return original_count - n - 1,
338
339                Some(e) => {
340                    // Get the entry and move the pointer forwards.
341                    let entry = unsafe { e.as_ref() };
342                    self.next = entry.next.get();
343
344                    // Set the state to `Notified` and notify.
345                    let tag = notify.next_tag(Internal::new());
346                    if let State::Task(task) = entry.state.replace(State::Notified {
347                        additional: is_additional,
348                        tag,
349                    }) {
350                        task.wake();
351                    }
352
353                    // Bump the notified count.
354                    self.notified += 1;
355                }
356            }
357        }
358
359        original_count - n
360    }
361}
362
363fn update_notified<T>(slot: &crate::sync::atomic::AtomicUsize, list: &Inner<T>) {
364    // Update the notified count.
365    let notified = if list.notified < list.len {
366        list.notified
367    } else {
368        usize::MAX
369    };
370
371    slot.store(notified, Ordering::Release);
372}
373
374pub(crate) struct Listener<T> {
375    /// The inner link in the linked list.
376    ///
377    /// # Safety
378    ///
379    /// This can only be accessed while the central mutex is locked.
380    link: UnsafeCell<Link<T>>,
381
382    /// This listener cannot be moved after being pinned.
383    _pin: PhantomPinned,
384}
385
386struct Link<T> {
387    /// The current state of the listener.
388    state: Cell<State<T>>,
389
390    /// The previous link in the linked list.
391    prev: Cell<Option<NonNull<Link<T>>>>,
392
393    /// The next link in the linked list.
394    next: Cell<Option<NonNull<Link<T>>>>,
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use futures_lite::pin;
401
402    #[cfg(target_family = "wasm")]
403    use wasm_bindgen_test::wasm_bindgen_test as test;
404
405    macro_rules! make_listeners {
406        ($($id:ident),*) => {
407            $(
408                let $id = Option::<Listener<()>>::None;
409                pin!($id);
410            )*
411        };
412    }
413
414    #[test]
415    fn insert() {
416        let inner = crate::Inner::new();
417        make_listeners!(listen1, listen2, listen3);
418
419        // Register the listeners.
420        inner.insert(listen1.as_mut());
421        inner.insert(listen2.as_mut());
422        inner.insert(listen3.as_mut());
423
424        assert_eq!(inner.list.try_total_listeners(), Some(3));
425
426        // Remove one.
427        assert_eq!(inner.remove(listen2, false), Some(State::Created));
428        assert_eq!(inner.list.try_total_listeners(), Some(2));
429
430        // Remove another.
431        assert_eq!(inner.remove(listen1, false), Some(State::Created));
432        assert_eq!(inner.list.try_total_listeners(), Some(1));
433    }
434
435    #[test]
436    fn drop_non_notified() {
437        let inner = crate::Inner::new();
438        make_listeners!(listen1, listen2, listen3);
439
440        // Register the listeners.
441        inner.insert(listen1.as_mut());
442        inner.insert(listen2.as_mut());
443        inner.insert(listen3.as_mut());
444
445        // Notify one.
446        inner.notify(GenericNotify::new(1, false, || ()));
447
448        // Remove one.
449        inner.remove(listen3, true);
450
451        // Remove the rest.
452        inner.remove(listen1, true);
453        inner.remove(listen2, true);
454    }
455}