timely_communication/allocator/zero_copy/
bytes_exchange.rs1use std::sync::{Arc, Mutex};
4use std::collections::VecDeque;
5
6use timely_bytes::arc::Bytes;
7use super::bytes_slab::{BytesRefill, BytesSlab};
8use super::spill::{BytesFetch, SpillPolicy};
9
10pub enum QueueEntry {
13 Bytes(Bytes),
15 Paged(Box<dyn BytesFetch>),
17}
18
19pub trait BytesPush {
21 fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
23}
24pub trait BytesPull {
26 fn drain_into(&mut self, vec: &mut Vec<Bytes>);
28}
29
30use std::sync::atomic::{AtomicBool, Ordering};
31pub struct MergeQueue {
34 queue: Arc<Mutex<VecDeque<QueueEntry>>>, buzzer: crate::buzzer::Buzzer, panic: Arc<AtomicBool>, policy: Option<Box<dyn SpillPolicy>>, }
39
40impl MergeQueue {
41 pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
43 MergeQueue {
44 queue: Arc::new(Mutex::new(VecDeque::new())),
45 buzzer,
46 panic: Arc::new(AtomicBool::new(false)),
47 policy: None,
48 }
49 }
50 pub fn new_pair(
55 buzzer: crate::buzzer::Buzzer,
56 writer_policy: Option<Box<dyn SpillPolicy>>,
57 reader_policy: Option<Box<dyn SpillPolicy>>,
58 ) -> (MergeQueue, MergeQueue) {
59 let queue = Arc::new(Mutex::new(VecDeque::new()));
60 let panic = Arc::new(AtomicBool::new(false));
61 let writer = MergeQueue {
62 queue: Arc::clone(&queue),
63 buzzer: buzzer.clone(),
64 panic: Arc::clone(&panic),
65 policy: writer_policy,
66 };
67 let reader = MergeQueue {
68 queue,
69 buzzer,
70 panic,
71 policy: reader_policy,
72 };
73 (writer, reader)
74 }
75 pub fn is_complete(&self) -> bool {
77 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
78 Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
79 }
80}
81
82impl BytesPush for MergeQueue {
83 fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
84
85 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
86
87 let mut lock_ok = self.queue.try_lock();
89 while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
90 lock_ok = self.queue.try_lock();
91 }
92 let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
93
94 let mut iterator = iterator.into_iter();
95 let mut should_ping = false;
96 if let Some(bytes) = iterator.next() {
97 let mut tail = match queue.pop_back() {
98 Some(QueueEntry::Bytes(mut tail)) => {
99 if let Err(bytes) = tail.try_merge(bytes) {
100 queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, bytes)));
101 }
102 tail
103 }
104 Some(paged @ QueueEntry::Paged(_)) => {
105 queue.push_back(paged);
106 bytes
107 }
108 None => {
109 should_ping = true;
110 bytes
111 }
112 };
113
114 for more_bytes in iterator {
115 if let Err(more_bytes) = tail.try_merge(more_bytes) {
116 queue.push_back(QueueEntry::Bytes(::std::mem::replace(&mut tail, more_bytes)));
117 }
118 }
119 queue.push_back(QueueEntry::Bytes(tail));
120 }
121
122 if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); }
124
125 ::std::mem::drop(queue);
127 if should_ping {
128 self.buzzer.buzz(); }
130 }
131}
132
133impl BytesPull for MergeQueue {
134 fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
135 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
136
137 let mut lock_ok = self.queue.try_lock();
139 while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
140 lock_ok = self.queue.try_lock();
141 }
142 let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
143
144 if let Some(policy) = self.policy.as_mut() { policy.apply(&mut queue); }
147
148 while let Some(QueueEntry::Bytes(_)) = queue.front() {
151 if let Some(QueueEntry::Bytes(b)) = queue.pop_front() {
152 vec.push(b);
153 }
154 }
155
156 if vec.is_empty() && !queue.is_empty() {
160 self.buzzer.buzz();
161 }
162 }
163}
164
165impl Drop for MergeQueue {
168 fn drop(&mut self) {
169 if ::std::thread::panicking() {
171 self.panic.store(true, Ordering::SeqCst);
172 }
173 else {
174 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
176 }
177 self.queue = Arc::new(Mutex::new(VecDeque::new()));
179 self.policy = None;
180 self.buzzer.buzz();
181 }
182}
183
184
185pub struct SendEndpoint<P: BytesPush> {
187 send: P,
188 buffer: BytesSlab,
189}
190
191impl<P: BytesPush> SendEndpoint<P> {
192
193 fn send_buffer(&mut self) {
195 let valid_len = self.buffer.valid().len();
196 if valid_len > 0 {
197 self.send.extend(Some(self.buffer.extract(valid_len)));
198 }
199 }
200
201 pub fn new(queue: P, refill: BytesRefill) -> Self {
203 SendEndpoint {
204 send: queue,
205 buffer: BytesSlab::new(20, refill),
206 }
207 }
208 pub fn make_valid(&mut self, bytes: usize) {
212 self.buffer.make_valid(bytes);
213 self.send_buffer();
214 }
215 pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
217
218 if self.buffer.empty().len() < capacity {
219 self.send_buffer();
220 self.buffer.ensure_capacity(capacity);
221 }
222
223 assert!(self.buffer.empty().len() >= capacity);
224 self.buffer.empty()
225 }
226 pub fn publish(&mut self) {
228 self.send_buffer();
229 }
230}
231
232impl<P: BytesPush> Drop for SendEndpoint<P> {
233 fn drop(&mut self) {
234 self.send_buffer();
235 }
236}