1//! An atomically managed intrusive linked list of `Arc` nodes
23use std::marker;
4use std::ops::Deref;
5use std::sync::atomic::Ordering::SeqCst;
6use std::sync::atomic::{AtomicBool, AtomicUsize};
7use std::sync::Arc;
89pub struct ArcList<T> {
10 list: AtomicUsize,
11 _marker: marker::PhantomData<T>,
12}
1314impl<T> ArcList<T> {
15pub fn new() -> ArcList<T> {
16 ArcList {
17 list: AtomicUsize::new(0),
18 _marker: marker::PhantomData,
19 }
20 }
2122/// Pushes the `data` provided onto this list if it's not already enqueued
23 /// in this list.
24 ///
25 /// If `data` is already enqueued in this list then this is a noop,
26 /// otherwise, the `data` here is pushed on the end of the list.
27pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
28if data.enqueued.swap(true, SeqCst) {
29// note that even if our list is sealed off then the other end is
30 // still guaranteed to see us because we were previously enqueued.
31return Ok(());
32 }
33let mut head = self.list.load(SeqCst);
34let node = Arc::into_raw(data.clone()) as usize;
35loop {
36// If we've been sealed off, abort and return an error
37if head == 1 {
38unsafe {
39 drop(Arc::from_raw(node as *mut Node<T>));
40 }
41return Err(());
42 }
4344// Otherwise attempt to push this node
45data.next.store(head, SeqCst);
46match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
47Ok(_) => break Ok(()),
48Err(new_head) => head = new_head,
49 }
50 }
51 }
5253/// Atomically empties this list, returning a new owned copy which can be
54 /// used to iterate over the entries.
55pub fn take(&self) -> ArcList<T> {
56let mut list = self.list.load(SeqCst);
57loop {
58if list == 1 {
59break;
60 }
61match self.list.compare_exchange(list, 0, SeqCst, SeqCst) {
62Ok(_) => break,
63Err(l) => list = l,
64 }
65 }
66 ArcList {
67 list: AtomicUsize::new(list),
68 _marker: marker::PhantomData,
69 }
70 }
7172/// Atomically empties this list and prevents further successful calls to
73 /// `push`.
74pub fn take_and_seal(&self) -> ArcList<T> {
75 ArcList {
76 list: AtomicUsize::new(self.list.swap(1, SeqCst)),
77 _marker: marker::PhantomData,
78 }
79 }
8081/// Removes the head of the list of nodes, returning `None` if this is an
82 /// empty list.
83pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
84let head = *self.list.get_mut();
85if head == 0 || head == 1 {
86return None;
87 }
88let head = unsafe { Arc::from_raw(head as *const Node<T>) };
89*self.list.get_mut() = head.next.load(SeqCst);
90// At this point, the node is out of the list, so store `false` so we
91 // can enqueue it again and see further changes.
92assert!(head.enqueued.swap(false, SeqCst));
93Some(head)
94 }
95}
9697impl<T> Drop for ArcList<T> {
98fn drop(&mut self) {
99while let Some(_) = self.pop() {
100// ...
101}
102 }
103}
104105pub struct Node<T> {
106 next: AtomicUsize,
107 enqueued: AtomicBool,
108 data: T,
109}
110111impl<T> Node<T> {
112pub fn new(data: T) -> Node<T> {
113 Node {
114 next: AtomicUsize::new(0),
115 enqueued: AtomicBool::new(false),
116 data,
117 }
118 }
119}
120121impl<T> Deref for Node<T> {
122type Target = T;
123124fn deref(&self) -> &T {
125&self.data
126 }
127}
128129#[cfg(test)]
130mod tests {
131use super::*;
132133#[test]
134fn smoke() {
135let a = ArcList::new();
136let n = Arc::new(Node::new(1));
137assert!(a.push(&n).is_ok());
138139let mut l = a.take();
140assert_eq!(**l.pop().unwrap(), 1);
141assert!(l.pop().is_none());
142 }
143144#[test]
145fn seal() {
146let a = ArcList::new();
147let n = Arc::new(Node::new(1));
148let mut l = a.take_and_seal();
149assert!(l.pop().is_none());
150assert!(a.push(&n).is_err());
151152assert!(a.take().pop().is_none());
153assert!(a.take_and_seal().pop().is_none());
154 }
155}