futures_timer/native/
arc_list.rs

1//! An atomically managed intrusive linked list of `Arc` nodes
2
3use std::marker;
4use std::ops::Deref;
5use std::sync::atomic::Ordering::SeqCst;
6use std::sync::atomic::{AtomicBool, AtomicUsize};
7use std::sync::Arc;
8
9pub struct ArcList<T> {
10    list: AtomicUsize,
11    _marker: marker::PhantomData<T>,
12}
13
14impl<T> ArcList<T> {
15    pub fn new() -> ArcList<T> {
16        ArcList {
17            list: AtomicUsize::new(0),
18            _marker: marker::PhantomData,
19        }
20    }
21
22    /// Pushes the `data` provided onto this list if it's not already enqueued
23    /// in this list.
24    ///
25    /// If `data` is already enqueued in this list then this is a noop,
26    /// otherwise, the `data` here is pushed on the end of the list.
27    pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
28        if data.enqueued.swap(true, SeqCst) {
29            // note that even if our list is sealed off then the other end is
30            // still guaranteed to see us because we were previously enqueued.
31            return Ok(());
32        }
33        let mut head = self.list.load(SeqCst);
34        let node = Arc::into_raw(data.clone()) as usize;
35        loop {
36            // If we've been sealed off, abort and return an error
37            if head == 1 {
38                unsafe {
39                    drop(Arc::from_raw(node as *mut Node<T>));
40                }
41                return Err(());
42            }
43
44            // Otherwise attempt to push this node
45            data.next.store(head, SeqCst);
46            match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
47                Ok(_) => break Ok(()),
48                Err(new_head) => head = new_head,
49            }
50        }
51    }
52
53    /// Atomically empties this list, returning a new owned copy which can be
54    /// used to iterate over the entries.
55    pub fn take(&self) -> ArcList<T> {
56        let mut list = self.list.load(SeqCst);
57        loop {
58            if list == 1 {
59                break;
60            }
61            match self.list.compare_exchange(list, 0, SeqCst, SeqCst) {
62                Ok(_) => break,
63                Err(l) => list = l,
64            }
65        }
66        ArcList {
67            list: AtomicUsize::new(list),
68            _marker: marker::PhantomData,
69        }
70    }
71
72    /// Atomically empties this list and prevents further successful calls to
73    /// `push`.
74    pub fn take_and_seal(&self) -> ArcList<T> {
75        ArcList {
76            list: AtomicUsize::new(self.list.swap(1, SeqCst)),
77            _marker: marker::PhantomData,
78        }
79    }
80
81    /// Removes the head of the list of nodes, returning `None` if this is an
82    /// empty list.
83    pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
84        let head = *self.list.get_mut();
85        if head == 0 || head == 1 {
86            return None;
87        }
88        let head = unsafe { Arc::from_raw(head as *const Node<T>) };
89        *self.list.get_mut() = head.next.load(SeqCst);
90        // At this point, the node is out of the list, so store `false` so we
91        // can enqueue it again and see further changes.
92        assert!(head.enqueued.swap(false, SeqCst));
93        Some(head)
94    }
95}
96
97impl<T> Drop for ArcList<T> {
98    fn drop(&mut self) {
99        while let Some(_) = self.pop() {
100            // ...
101        }
102    }
103}
104
105pub struct Node<T> {
106    next: AtomicUsize,
107    enqueued: AtomicBool,
108    data: T,
109}
110
111impl<T> Node<T> {
112    pub fn new(data: T) -> Node<T> {
113        Node {
114            next: AtomicUsize::new(0),
115            enqueued: AtomicBool::new(false),
116            data,
117        }
118    }
119}
120
121impl<T> Deref for Node<T> {
122    type Target = T;
123
124    fn deref(&self) -> &T {
125        &self.data
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn smoke() {
135        let a = ArcList::new();
136        let n = Arc::new(Node::new(1));
137        assert!(a.push(&n).is_ok());
138
139        let mut l = a.take();
140        assert_eq!(**l.pop().unwrap(), 1);
141        assert!(l.pop().is_none());
142    }
143
144    #[test]
145    fn seal() {
146        let a = ArcList::new();
147        let n = Arc::new(Node::new(1));
148        let mut l = a.take_and_seal();
149        assert!(l.pop().is_none());
150        assert!(a.push(&n).is_err());
151
152        assert!(a.take().pop().is_none());
153        assert!(a.take_and_seal().pop().is_none());
154    }
155}