futures_timer/native/arc_list.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
//! An atomically managed intrusive linked list of `Arc` nodes
use std::marker;
use std::ops::Deref;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
pub struct ArcList<T> {
list: AtomicUsize,
_marker: marker::PhantomData<T>,
}
impl<T> ArcList<T> {
pub fn new() -> ArcList<T> {
ArcList {
list: AtomicUsize::new(0),
_marker: marker::PhantomData,
}
}
/// Pushes the `data` provided onto this list if it's not already enqueued
/// in this list.
///
/// If `data` is already enqueued in this list then this is a noop,
/// otherwise, the `data` here is pushed on the end of the list.
pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
if data.enqueued.swap(true, SeqCst) {
// note that even if our list is sealed off then the other end is
// still guaranteed to see us because we were previously enqueued.
return Ok(());
}
let mut head = self.list.load(SeqCst);
let node = Arc::into_raw(data.clone()) as usize;
loop {
// If we've been sealed off, abort and return an error
if head == 1 {
unsafe {
drop(Arc::from_raw(node as *mut Node<T>));
}
return Err(());
}
// Otherwise attempt to push this node
data.next.store(head, SeqCst);
match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
Ok(_) => break Ok(()),
Err(new_head) => head = new_head,
}
}
}
/// Atomically empties this list, returning a new owned copy which can be
/// used to iterate over the entries.
pub fn take(&self) -> ArcList<T> {
let mut list = self.list.load(SeqCst);
loop {
if list == 1 {
break;
}
match self.list.compare_exchange(list, 0, SeqCst, SeqCst) {
Ok(_) => break,
Err(l) => list = l,
}
}
ArcList {
list: AtomicUsize::new(list),
_marker: marker::PhantomData,
}
}
/// Atomically empties this list and prevents further successful calls to
/// `push`.
pub fn take_and_seal(&self) -> ArcList<T> {
ArcList {
list: AtomicUsize::new(self.list.swap(1, SeqCst)),
_marker: marker::PhantomData,
}
}
/// Removes the head of the list of nodes, returning `None` if this is an
/// empty list.
pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
let head = *self.list.get_mut();
if head == 0 || head == 1 {
return None;
}
let head = unsafe { Arc::from_raw(head as *const Node<T>) };
*self.list.get_mut() = head.next.load(SeqCst);
// At this point, the node is out of the list, so store `false` so we
// can enqueue it again and see further changes.
assert!(head.enqueued.swap(false, SeqCst));
Some(head)
}
}
impl<T> Drop for ArcList<T> {
fn drop(&mut self) {
while let Some(_) = self.pop() {
// ...
}
}
}
pub struct Node<T> {
next: AtomicUsize,
enqueued: AtomicBool,
data: T,
}
impl<T> Node<T> {
pub fn new(data: T) -> Node<T> {
Node {
next: AtomicUsize::new(0),
enqueued: AtomicBool::new(false),
data,
}
}
}
impl<T> Deref for Node<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn smoke() {
let a = ArcList::new();
let n = Arc::new(Node::new(1));
assert!(a.push(&n).is_ok());
let mut l = a.take();
assert_eq!(**l.pop().unwrap(), 1);
assert!(l.pop().is_none());
}
#[test]
fn seal() {
let a = ArcList::new();
let n = Arc::new(Node::new(1));
let mut l = a.take_and_seal();
assert!(l.pop().is_none());
assert!(a.push(&n).is_err());
assert!(a.take().pop().is_none());
assert!(a.take_and_seal().pop().is_none());
}
}