timely_communication/allocator/zero_copy/
bytes_exchange.rs

1//! Types and traits for sharing `Bytes`.
2
3use std::sync::{Arc, Mutex};
4use std::collections::VecDeque;
5
6use timely_bytes::arc::Bytes;
7use super::bytes_slab::{BytesRefill, BytesSlab};
8
9/// A target for `Bytes`.
10pub trait BytesPush {
11    // /// Pushes bytes at the instance.
12    // fn push(&mut self, bytes: Bytes);
13    /// Pushes many bytes at the instance.
14    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
15}
16/// A source for `Bytes`.
17pub trait BytesPull {
18    // /// Pulls bytes from the instance.
19    // fn pull(&mut self) -> Option<Bytes>;
20    /// Drains many bytes from the instance.
21    fn drain_into(&mut self, vec: &mut Vec<Bytes>);
22}
23
24use std::sync::atomic::{AtomicBool, Ordering};
25/// An unbounded queue of bytes intended for point-to-point communication
26/// between threads. Cloning returns another handle to the same queue.
27///
28/// TODO: explain "extend"
29#[derive(Clone)]
30pub struct MergeQueue {
31    queue: Arc<Mutex<VecDeque<Bytes>>>, // queue of bytes.
32    buzzer: crate::buzzer::Buzzer,  // awakens receiver thread.
33    panic: Arc<AtomicBool>,
34}
35
36impl MergeQueue {
37    /// Allocates a new queue with an associated signal.
38    pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
39        MergeQueue {
40            queue: Arc::new(Mutex::new(VecDeque::new())),
41            buzzer,
42            panic: Arc::new(AtomicBool::new(false)),
43        }
44    }
45    /// Indicates that all input handles to the queue have dropped.
46    pub fn is_complete(&self) -> bool {
47        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
48        Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
49    }
50}
51
52impl BytesPush for MergeQueue {
53    fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
54
55        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
56
57        // try to acquire lock without going to sleep (Rust's lock() might yield)
58        let mut lock_ok = self.queue.try_lock();
59        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
60            lock_ok = self.queue.try_lock();
61        }
62        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
63
64        let mut iterator = iterator.into_iter();
65        let mut should_ping = false;
66        if let Some(bytes) = iterator.next() {
67            let mut tail = if let Some(mut tail) = queue.pop_back() {
68                if let Err(bytes) = tail.try_merge(bytes) {
69                    queue.push_back(::std::mem::replace(&mut tail, bytes));
70                }
71                tail
72            }
73            else {
74                should_ping = true;
75                bytes
76            };
77
78            for more_bytes in iterator {
79                if let Err(more_bytes) = tail.try_merge(more_bytes) {
80                    queue.push_back(::std::mem::replace(&mut tail, more_bytes));
81                }
82            }
83            queue.push_back(tail);
84        }
85
86        // Wakeup corresponding thread *after* releasing the lock
87        ::std::mem::drop(queue);
88        if should_ping {
89            self.buzzer.buzz();  // only signal from empty to non-empty.
90        }
91    }
92}
93
94impl BytesPull for MergeQueue {
95    fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
96        if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
97
98        // try to acquire lock without going to sleep (Rust's lock() might yield)
99        let mut lock_ok = self.queue.try_lock();
100        while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
101            lock_ok = self.queue.try_lock();
102        }
103        let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
104
105        vec.extend(queue.drain(..));
106    }
107}
108
109// We want to ping in the drop because a channel closing can unblock a thread waiting on
110// the next bit of data to show up.
111impl Drop for MergeQueue {
112    fn drop(&mut self) {
113        // Propagate panic information, to distinguish between clean and unclean shutdown.
114        if ::std::thread::panicking() {
115            self.panic.store(true, Ordering::SeqCst);
116        }
117        else {
118            // TODO: Perhaps this aggressive ordering can relax orderings elsewhere.
119            if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
120        }
121        // Drop the queue before pinging.
122        self.queue = Arc::new(Mutex::new(VecDeque::new()));
123        self.buzzer.buzz();
124    }
125}
126
127
128/// A `BytesPush` wrapper which stages writes.
129pub struct SendEndpoint<P: BytesPush> {
130    send: P,
131    buffer: BytesSlab,
132}
133
134impl<P: BytesPush> SendEndpoint<P> {
135
136    /// Moves `self.buffer` into `self.send`, replaces with empty buffer.
137    fn send_buffer(&mut self) {
138        let valid_len = self.buffer.valid().len();
139        if valid_len > 0 {
140            self.send.extend(Some(self.buffer.extract(valid_len)));
141        }
142    }
143
144    /// Allocates a new `BytesSendEndpoint` from a shared queue.
145    pub fn new(queue: P, refill: BytesRefill) -> Self {
146        SendEndpoint {
147            send: queue,
148            buffer: BytesSlab::new(20, refill),
149        }
150    }
151    /// Makes the next `bytes` bytes valid.
152    ///
153    /// The current implementation also sends the bytes, to ensure early visibility.
154    pub fn make_valid(&mut self, bytes: usize) {
155        self.buffer.make_valid(bytes);
156        self.send_buffer();
157    }
158    /// Acquires a prefix of `self.empty()` of length at least `capacity`.
159    pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
160
161        if self.buffer.empty().len() < capacity {
162            self.send_buffer();
163            self.buffer.ensure_capacity(capacity);
164        }
165
166        assert!(self.buffer.empty().len() >= capacity);
167        self.buffer.empty()
168    }
169    /// Marks all written data as valid, makes visible.
170    pub fn publish(&mut self) {
171        self.send_buffer();
172    }
173}
174
175impl<P: BytesPush> Drop for SendEndpoint<P> {
176    fn drop(&mut self) {
177        self.send_buffer();
178    }
179}