Skip to main content

timely_communication/allocator/zero_copy/
bytes_exchange.rs

1//! Types and traits for sharing `Bytes`.
2
3use std::sync::{Arc, Mutex};
4use std::collections::VecDeque;
5
6use timely_bytes::arc::Bytes;
7use super::bytes_slab::{BytesRefill, BytesSlab};
8use super::spill::{BytesFetch, SpillPolicy};
9
10/// An entry in a `MergeQueue`. Either `Bytes` resident in memory, or a
11/// handle to bytes previously written out via a `SpillPolicy`.
12pub enum QueueEntry {
13    /// Bytes resident in memory, ready to be consumed directly.
14    Bytes(Bytes),
15    /// Bytes spilled to a backing store, fetched via the handle.
16    Paged(Box<dyn BytesFetch>),
17}
18
19/// A target for `Bytes`.
20pub trait BytesPush {
21    /// Pushes many bytes at the instance.
22    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
23}
24/// A source for `Bytes`.
25pub trait BytesPull {
26    /// Drains many bytes from the instance.
27    fn drain_into(&mut self, vec: &mut Vec<Bytes>);
28}
29
30use std::sync::atomic::{AtomicBool, Ordering};
31/// An unbounded queue of bytes intended for point-to-point communication between threads.
32/// Writer/reader handle pairs are obtained via [`MergeQueue::new_pair`].
33pub struct MergeQueue {
34    queue: Arc<Mutex<VecDeque<QueueEntry>>>,    // queue of entries.
35    buzzer: crate::buzzer::Buzzer,              // awakens receiver thread.
36    panic: Arc<AtomicBool>,                     // used to poison the queue.
37    policy: Option<Box<dyn SpillPolicy>>,       // local policy; extend or drain dispatches it.
38}
39
40impl MergeQueue {
41    /// Allocates a new queue with an associated signal.
42    pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
43        MergeQueue {
44            queue: Arc::new(Mutex::new(VecDeque::new())),
45            buzzer,
46            panic: Arc::new(AtomicBool::new(false)),
47            policy: None,
48        }
49    }
50    /// Allocates a matched pair of handles on the same underlying queue,
51    /// each carrying its own policy. The first (writer) runs its policy
52    /// after each `extend`; the second (reader) runs its policy before
53    /// each `drain_into`.
54    pub fn new_pair(
55        buzzer: crate::buzzer::Buzzer,
56        writer_policy: Option<Box<dyn SpillPolicy>>,
57        reader_policy: Option<Box<dyn SpillPolicy>>,
58    ) -> (MergeQueue, MergeQueue) {
59        let queue = Arc::new(Mutex::new(VecDeque::new()));
60        let panic = Arc::new(AtomicBool::new(false));
61        let writer = MergeQueue {
62            queue: Arc::clone(&queue),
63            buzzer: buzzer.clone(),
64            panic: Arc::clone(&panic),
65            policy: writer_policy,
66        };
67        let reader = MergeQueue {
68            queue,
69            buzzer,
70            panic,
71            policy: reader_policy,
72        };
73        (writer, reader)
74    }
75    /// Indicates that all input handles to the queue have dropped.
76    pub fn is_complete(&self) -> bool {
77        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
78        Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
79    }
80}
81
82impl BytesPush for MergeQueue {
83    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
84
85        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
86
87        // try to acquire lock without going to sleep (Rust's lock() might yield)
88        let mut lock_ok = self.queue.try_lock();
89        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
90            lock_ok = self.queue.try_lock();
91        }
92        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
93
94        let mut iterator = iterator.into_iter();
95        let mut should_ping = false;
96        if let Some(bytes) = iterator.next() {
97            let mut tail = match queue.pop_back() {
98                Some(QueueEntry::Bytes(mut tail)) => {
99                    if let Err(bytes) = tail.try_merge(bytes) {
100                        queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, bytes)));
101                    }
102                    tail
103                }
104                Some(paged @ QueueEntry::Paged(_)) => {
105                    queue.push_back(paged);
106                    bytes
107                }
108                None => {
109                    should_ping = true;
110                    bytes
111                }
112            };
113
114            for more_bytes in iterator {
115                if let Err(more_bytes) = tail.try_merge(more_bytes) {
116                    queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, more_bytes)));
117                }
118            }
119            queue.push_back(QueueEntry::Bytes(tail));
120        }
121
122        // Dispatch the spill policy, if any, while the lock is still held.
123        if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); }
124
125        // Wakeup corresponding thread *after* releasing the lock
126        ::std::mem::drop(queue);
127        if should_ping {
128            self.buzzer.buzz();  // only signal from empty to non-empty.
129        }
130    }
131}
132
133impl BytesPull for MergeQueue {
134    fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
135        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
136
137        // try to acquire lock without going to sleep (Rust's lock() might yield)
138        let mut lock_ok = self.queue.try_lock();
139        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
140            lock_ok = self.queue.try_lock();
141        }
142        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
143
144        // If a reader-side policy is installed, let it materialize Paged
145        // entries near the front of the queue (up to its own budget).
146        if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); }
147
148        // Drain Bytes entries from the front. Stop at the first Paged entry
149        // (which the policy chose not to materialize) or the empty queue.
150        while let Some(QueueEntry::Bytes(_)) = queue.front() {
151            if let Some(QueueEntry::Bytes(b)) = queue.pop_front() {
152                vec.push(b);
153            }
154        }
155
156        // If we produced nothing but the queue isn't empty, something is
157        // stuck (failed fetch, no reader policy, budget exhausted). Buzz
158        // to ensure the consumer retries rather than parking.
159        if vec.is_empty() && !queue.is_empty() {
160            self.buzzer.buzz();
161        }
162    }
163}
164
165// We want to ping in the drop because a channel closing can unblock a thread waiting on
166// the next bit of data to show up.
167impl Drop for MergeQueue {
168    fn drop(&mut self) {
169        // Propagate panic information, to distinguish between clean and unclean shutdown.
170        if ::std::thread::panicking() {
171            self.panic.store(true, Ordering::SeqCst);
172        }
173        else {
174            // TODO: Perhaps this aggressive ordering can relax orderings elsewhere.
175            if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
176        }
177        // Drop the queue before pinging.
178        self.queue = Arc::new(Mutex::new(VecDeque::new()));
179        self.policy = None;
180        self.buzzer.buzz();
181    }
182}
183
184
185/// A `BytesPush` wrapper which stages writes.
186pub struct SendEndpoint<P: BytesPush> {
187    send: P,
188    buffer: BytesSlab,
189}
190
191impl<P: BytesPush> SendEndpoint<P> {
192
193    /// Moves `self.buffer` into `self.send`, replaces with empty buffer.
194    fn send_buffer(&mut self) {
195        let valid_len = self.buffer.valid().len();
196        if valid_len > 0 {
197            self.send.extend(Some(self.buffer.extract(valid_len)));
198        }
199    }
200
201    /// Allocates a new `BytesSendEndpoint` from a shared queue.
202    pub fn new(queue: P, refill: BytesRefill) -> Self {
203        SendEndpoint {
204            send: queue,
205            buffer: BytesSlab::new(20, refill),
206        }
207    }
208    /// Makes the next `bytes` bytes valid.
209    ///
210    /// The current implementation also sends the bytes, to ensure early visibility.
211    pub fn make_valid(&mut self, bytes: usize) {
212        self.buffer.make_valid(bytes);
213        self.send_buffer();
214    }
215    /// Acquires a prefix of `self.empty()` of length at least `capacity`.
216    pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
217
218        if self.buffer.empty().len() < capacity {
219            self.send_buffer();
220            self.buffer.ensure_capacity(capacity);
221        }
222
223        assert!(self.buffer.empty().len() >= capacity);
224        self.buffer.empty()
225    }
226    /// Marks all written data as valid, makes visible.
227    pub fn publish(&mut self) {
228        self.send_buffer();
229    }
230}
231
232impl<P: BytesPush> Drop for SendEndpoint<P> {
233    fn drop(&mut self) {
234        self.send_buffer();
235    }
236}