1use crate::notify::{GenericNotify, Internal, Notification};
7use crate::sync::atomic::Ordering;
8use crate::sync::cell::{Cell, UnsafeCell};
9use crate::{RegisterResult, State, TaskRef};
10
11#[cfg(feature = "critical-section")]
12use core::cell::RefCell;
13#[cfg(all(feature = "std", not(feature = "critical-section")))]
14use core::ops::{Deref, DerefMut};
15
16use core::marker::PhantomPinned;
17use core::mem;
18use core::pin::Pin;
19use core::ptr::NonNull;
20
21pub(super) struct List<T>(
22 #[cfg(all(feature = "std", not(feature = "critical-section")))]
24 crate::sync::Mutex<Inner<T>>,
25 #[cfg(feature = "critical-section")]
27 critical_section::Mutex<RefCell<Inner<T>>>,
28);
29
30struct Inner<T> {
31 head: Option<NonNull<Link<T>>>,
33
34 tail: Option<NonNull<Link<T>>>,
36
37 next: Option<NonNull<Link<T>>>,
39
40 len: usize,
42
43 notified: usize,
45}
46
47impl<T> List<T> {
48 pub(super) fn new() -> Self {
50 let inner = Inner {
51 head: None,
52 tail: None,
53 next: None,
54 len: 0,
55 notified: 0,
56 };
57
58 #[cfg(feature = "critical-section")]
59 {
60 Self(critical_section::Mutex::new(RefCell::new(inner)))
61 }
62
63 #[cfg(not(feature = "critical-section"))]
64 Self(crate::sync::Mutex::new(inner))
65 }
66
67 #[cfg(all(feature = "std", not(feature = "critical-section")))]
69 pub(crate) fn try_total_listeners(&self) -> Option<usize> {
70 self.0.try_lock().ok().map(|list| list.len)
71 }
72
73 #[cfg(feature = "critical-section")]
75 pub(crate) fn try_total_listeners(&self) -> Option<usize> {
76 Some(self.total_listeners())
77 }
78
79 #[cfg(all(feature = "std", not(feature = "critical-section")))]
81 pub(crate) fn total_listeners(&self) -> usize {
82 self.0.lock().unwrap_or_else(|e| e.into_inner()).len
83 }
84
85 #[cfg(feature = "critical-section")]
87 #[allow(unused)]
88 pub(crate) fn total_listeners(&self) -> usize {
89 critical_section::with(|cs| self.0.borrow(cs).borrow().len)
90 }
91}
92
93impl<T> crate::Inner<T> {
94 #[cfg(all(feature = "std", not(feature = "critical-section")))]
95 fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
96 struct ListLock<'a, 'b, T> {
97 lock: crate::sync::MutexGuard<'a, Inner<T>>,
98 inner: &'b crate::Inner<T>,
99 }
100
101 impl<T> Deref for ListLock<'_, '_, T> {
102 type Target = Inner<T>;
103
104 fn deref(&self) -> &Self::Target {
105 &self.lock
106 }
107 }
108
109 impl<T> DerefMut for ListLock<'_, '_, T> {
110 fn deref_mut(&mut self) -> &mut Self::Target {
111 &mut self.lock
112 }
113 }
114
115 impl<T> Drop for ListLock<'_, '_, T> {
116 fn drop(&mut self) {
117 update_notified(&self.inner.notified, &self.lock);
118 }
119 }
120
121 let mut list = ListLock {
122 inner: self,
123 lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
124 };
125 f(&mut list)
126 }
127
128 #[cfg(feature = "critical-section")]
129 fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
130 struct ListWrapper<'a, T> {
131 inner: &'a crate::Inner<T>,
132 list: &'a mut Inner<T>,
133 }
134
135 impl<T> Drop for ListWrapper<'_, T> {
136 fn drop(&mut self) {
137 update_notified(&self.inner.notified, self.list);
138 }
139 }
140
141 critical_section::with(move |cs| {
142 let mut list = self.list.0.borrow_ref_mut(cs);
143 let wrapper = ListWrapper {
144 inner: self,
145 list: &mut *list,
146 };
147
148 f(wrapper.list)
149 })
150 }
151
152 pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
154 self.with_inner(|inner| {
155 listener.as_mut().set(Some(Listener {
156 link: UnsafeCell::new(Link {
157 state: Cell::new(State::Created),
158 prev: Cell::new(inner.tail),
159 next: Cell::new(None),
160 }),
161 _pin: PhantomPinned,
162 }));
163 let listener = listener.as_pin_mut().unwrap();
164
165 {
166 let entry_guard = listener.link.get();
167 let entry = unsafe { entry_guard.deref() };
169
170 match mem::replace(&mut inner.tail, Some(entry.into())) {
172 None => inner.head = Some(entry.into()),
173 Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
174 };
175 }
176
177 if inner.next.is_none() {
179 inner.next = inner.tail;
180 }
181
182 inner.len += 1;
184 });
185 }
186
187 pub(crate) fn remove(
189 &self,
190 listener: Pin<&mut Option<Listener<T>>>,
191 propagate: bool,
192 ) -> Option<State<T>> {
193 self.with_inner(|inner| inner.remove(listener, propagate))
194 }
195
196 #[cold]
198 pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
199 self.with_inner(|inner| inner.notify(notify))
200 }
201
202 pub(crate) fn register(
207 &self,
208 mut listener: Pin<&mut Option<Listener<T>>>,
209 task: TaskRef<'_>,
210 ) -> RegisterResult<T> {
211 self.with_inner(|inner| {
212 let entry_guard = match listener.as_mut().as_pin_mut() {
213 Some(listener) => listener.link.get(),
214 None => return RegisterResult::NeverInserted,
215 };
216 let entry = unsafe { entry_guard.deref() };
218
219 match entry.state.replace(State::NotifiedTaken) {
221 State::Notified { tag, .. } => {
222 inner.remove(listener, false);
224 RegisterResult::Notified(tag)
225 }
226
227 State::Task(other_task) => {
228 entry.state.set(State::Task({
230 if !task.will_wake(other_task.as_task_ref()) {
231 task.into_task()
232 } else {
233 other_task
234 }
235 }));
236
237 RegisterResult::Registered
238 }
239
240 _ => {
241 entry.state.set(State::Task(task.into_task()));
243 RegisterResult::Registered
244 }
245 }
246 })
247 }
248}
249
250impl<T> Inner<T> {
251 fn remove(
252 &mut self,
253 mut listener: Pin<&mut Option<Listener<T>>>,
254 propagate: bool,
255 ) -> Option<State<T>> {
256 let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
257 let entry = unsafe { entry_guard.deref() };
258
259 let prev = entry.prev.get();
260 let next = entry.next.get();
261
262 match prev {
264 None => self.head = next,
265 Some(p) => unsafe {
266 p.as_ref().next.set(next);
267 },
268 }
269
270 match next {
272 None => self.tail = prev,
273 Some(n) => unsafe {
274 n.as_ref().prev.set(prev);
275 },
276 }
277
278 if self.next == Some(entry.into()) {
280 self.next = next;
281 }
282
283 let entry = unsafe {
285 listener
286 .get_unchecked_mut()
287 .take()
288 .unwrap()
289 .link
290 .into_inner()
291 };
292
293 let mut state = entry.state.replace(State::Created);
298
299 if state.is_notified() {
301 self.notified -= 1;
302
303 if propagate {
304 let state = mem::replace(&mut state, State::NotifiedTaken);
305 if let State::Notified { additional, tag } = state {
306 let tags = {
307 let mut tag = Some(tag);
308 move || tag.take().expect("tag already taken")
309 };
310 self.notify(GenericNotify::new(1, additional, tags));
311 }
312 }
313 }
314 self.len -= 1;
315
316 Some(state)
317 }
318
319 #[cold]
320 fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
321 let mut n = notify.count(Internal::new());
322 let is_additional = notify.is_additional(Internal::new());
323
324 if !is_additional {
325 if n < self.notified {
326 return 0;
327 }
328 n -= self.notified;
329 }
330
331 let original_count = n;
332 while n > 0 {
333 n -= 1;
334
335 match self.next {
337 None => return original_count - n - 1,
338
339 Some(e) => {
340 let entry = unsafe { e.as_ref() };
342 self.next = entry.next.get();
343
344 let tag = notify.next_tag(Internal::new());
346 if let State::Task(task) = entry.state.replace(State::Notified {
347 additional: is_additional,
348 tag,
349 }) {
350 task.wake();
351 }
352
353 self.notified += 1;
355 }
356 }
357 }
358
359 original_count - n
360 }
361}
362
363fn update_notified<T>(slot: &crate::sync::atomic::AtomicUsize, list: &Inner<T>) {
364 let notified = if list.notified < list.len {
366 list.notified
367 } else {
368 usize::MAX
369 };
370
371 slot.store(notified, Ordering::Release);
372}
373
374pub(crate) struct Listener<T> {
375 link: UnsafeCell<Link<T>>,
381
382 _pin: PhantomPinned,
384}
385
386struct Link<T> {
387 state: Cell<State<T>>,
389
390 prev: Cell<Option<NonNull<Link<T>>>>,
392
393 next: Cell<Option<NonNull<Link<T>>>>,
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use futures_lite::pin;
401
402 #[cfg(target_family = "wasm")]
403 use wasm_bindgen_test::wasm_bindgen_test as test;
404
405 macro_rules! make_listeners {
406 ($($id:ident),*) => {
407 $(
408 let $id = Option::<Listener<()>>::None;
409 pin!($id);
410 )*
411 };
412 }
413
414 #[test]
415 fn insert() {
416 let inner = crate::Inner::new();
417 make_listeners!(listen1, listen2, listen3);
418
419 inner.insert(listen1.as_mut());
421 inner.insert(listen2.as_mut());
422 inner.insert(listen3.as_mut());
423
424 assert_eq!(inner.list.try_total_listeners(), Some(3));
425
426 assert_eq!(inner.remove(listen2, false), Some(State::Created));
428 assert_eq!(inner.list.try_total_listeners(), Some(2));
429
430 assert_eq!(inner.remove(listen1, false), Some(State::Created));
432 assert_eq!(inner.list.try_total_listeners(), Some(1));
433 }
434
435 #[test]
436 fn drop_non_notified() {
437 let inner = crate::Inner::new();
438 make_listeners!(listen1, listen2, listen3);
439
440 inner.insert(listen1.as_mut());
442 inner.insert(listen2.as_mut());
443 inner.insert(listen3.as_mut());
444
445 inner.notify(GenericNotify::new(1, false, || ()));
447
448 inner.remove(listen3, true);
450
451 inner.remove(listen1, true);
453 inner.remove(listen2, true);
454 }
455}