futures_timer/native/
arc_list.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! An atomically managed intrusive linked list of `Arc` nodes

use std::marker;
use std::ops::Deref;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;

pub struct ArcList<T> {
    list: AtomicUsize,
    _marker: marker::PhantomData<T>,
}

impl<T> ArcList<T> {
    pub fn new() -> ArcList<T> {
        ArcList {
            list: AtomicUsize::new(0),
            _marker: marker::PhantomData,
        }
    }

    /// Pushes the `data` provided onto this list if it's not already enqueued
    /// in this list.
    ///
    /// If `data` is already enqueued in this list then this is a noop,
    /// otherwise, the `data` here is pushed on the end of the list.
    pub fn push(&self, data: &Arc<Node<T>>) -> Result<(), ()> {
        if data.enqueued.swap(true, SeqCst) {
            // note that even if our list is sealed off then the other end is
            // still guaranteed to see us because we were previously enqueued.
            return Ok(());
        }
        let mut head = self.list.load(SeqCst);
        let node = Arc::into_raw(data.clone()) as usize;
        loop {
            // If we've been sealed off, abort and return an error
            if head == 1 {
                unsafe {
                    drop(Arc::from_raw(node as *mut Node<T>));
                }
                return Err(());
            }

            // Otherwise attempt to push this node
            data.next.store(head, SeqCst);
            match self.list.compare_exchange(head, node, SeqCst, SeqCst) {
                Ok(_) => break Ok(()),
                Err(new_head) => head = new_head,
            }
        }
    }

    /// Atomically empties this list, returning a new owned copy which can be
    /// used to iterate over the entries.
    pub fn take(&self) -> ArcList<T> {
        let mut list = self.list.load(SeqCst);
        loop {
            if list == 1 {
                break;
            }
            match self.list.compare_exchange(list, 0, SeqCst, SeqCst) {
                Ok(_) => break,
                Err(l) => list = l,
            }
        }
        ArcList {
            list: AtomicUsize::new(list),
            _marker: marker::PhantomData,
        }
    }

    /// Atomically empties this list and prevents further successful calls to
    /// `push`.
    pub fn take_and_seal(&self) -> ArcList<T> {
        ArcList {
            list: AtomicUsize::new(self.list.swap(1, SeqCst)),
            _marker: marker::PhantomData,
        }
    }

    /// Removes the head of the list of nodes, returning `None` if this is an
    /// empty list.
    pub fn pop(&mut self) -> Option<Arc<Node<T>>> {
        let head = *self.list.get_mut();
        if head == 0 || head == 1 {
            return None;
        }
        let head = unsafe { Arc::from_raw(head as *const Node<T>) };
        *self.list.get_mut() = head.next.load(SeqCst);
        // At this point, the node is out of the list, so store `false` so we
        // can enqueue it again and see further changes.
        assert!(head.enqueued.swap(false, SeqCst));
        Some(head)
    }
}

impl<T> Drop for ArcList<T> {
    fn drop(&mut self) {
        while let Some(_) = self.pop() {
            // ...
        }
    }
}

pub struct Node<T> {
    next: AtomicUsize,
    enqueued: AtomicBool,
    data: T,
}

impl<T> Node<T> {
    pub fn new(data: T) -> Node<T> {
        Node {
            next: AtomicUsize::new(0),
            enqueued: AtomicBool::new(false),
            data,
        }
    }
}

impl<T> Deref for Node<T> {
    type Target = T;

    fn deref(&self) -> &T {
        &self.data
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn smoke() {
        let a = ArcList::new();
        let n = Arc::new(Node::new(1));
        assert!(a.push(&n).is_ok());

        let mut l = a.take();
        assert_eq!(**l.pop().unwrap(), 1);
        assert!(l.pop().is_none());
    }

    #[test]
    fn seal() {
        let a = ArcList::new();
        let n = Arc::new(Node::new(1));
        let mut l = a.take_and_seal();
        assert!(l.pop().is_none());
        assert!(a.push(&n).is_err());

        assert!(a.take().pop().is_none());
        assert!(a.take_and_seal().pop().is_none());
    }
}