timely_communication/allocator/zero_copy/
bytes_exchange.rs1use std::sync::{Arc, Mutex};
4use std::collections::VecDeque;
5
6use timely_bytes::arc::Bytes;
7use super::bytes_slab::{BytesRefill, BytesSlab};
8
9pub trait BytesPush {
11 fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
15}
16pub trait BytesPull {
18 fn drain_into(&mut self, vec: &mut Vec<Bytes>);
22}
23
24use std::sync::atomic::{AtomicBool, Ordering};
25#[derive(Clone)]
30pub struct MergeQueue {
31 queue: Arc<Mutex<VecDeque<Bytes>>>, buzzer: crate::buzzer::Buzzer, panic: Arc<AtomicBool>,
34}
35
36impl MergeQueue {
37 pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
39 MergeQueue {
40 queue: Arc::new(Mutex::new(VecDeque::new())),
41 buzzer,
42 panic: Arc::new(AtomicBool::new(false)),
43 }
44 }
45 pub fn is_complete(&self) -> bool {
47 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
48 Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
49 }
50}
51
52impl BytesPush for MergeQueue {
53 fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
54
55 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
56
57 let mut lock_ok = self.queue.try_lock();
59 while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
60 lock_ok = self.queue.try_lock();
61 }
62 let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
63
64 let mut iterator = iterator.into_iter();
65 let mut should_ping = false;
66 if let Some(bytes) = iterator.next() {
67 let mut tail = if let Some(mut tail) = queue.pop_back() {
68 if let Err(bytes) = tail.try_merge(bytes) {
69 queue.push_back(::std::mem::replace(&mut tail, bytes));
70 }
71 tail
72 }
73 else {
74 should_ping = true;
75 bytes
76 };
77
78 for more_bytes in iterator {
79 if let Err(more_bytes) = tail.try_merge(more_bytes) {
80 queue.push_back(::std::mem::replace(&mut tail, more_bytes));
81 }
82 }
83 queue.push_back(tail);
84 }
85
86 ::std::mem::drop(queue);
88 if should_ping {
89 self.buzzer.buzz(); }
91 }
92}
93
94impl BytesPull for MergeQueue {
95 fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
96 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
97
98 let mut lock_ok = self.queue.try_lock();
100 while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
101 lock_ok = self.queue.try_lock();
102 }
103 let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
104
105 vec.extend(queue.drain(..));
106 }
107}
108
109impl Drop for MergeQueue {
112 fn drop(&mut self) {
113 if ::std::thread::panicking() {
115 self.panic.store(true, Ordering::SeqCst);
116 }
117 else {
118 if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
120 }
121 self.queue = Arc::new(Mutex::new(VecDeque::new()));
123 self.buzzer.buzz();
124 }
125}
126
127
128pub struct SendEndpoint<P: BytesPush> {
130 send: P,
131 buffer: BytesSlab,
132}
133
134impl<P: BytesPush> SendEndpoint<P> {
135
136 fn send_buffer(&mut self) {
138 let valid_len = self.buffer.valid().len();
139 if valid_len > 0 {
140 self.send.extend(Some(self.buffer.extract(valid_len)));
141 }
142 }
143
144 pub fn new(queue: P, refill: BytesRefill) -> Self {
146 SendEndpoint {
147 send: queue,
148 buffer: BytesSlab::new(20, refill),
149 }
150 }
151 pub fn make_valid(&mut self, bytes: usize) {
155 self.buffer.make_valid(bytes);
156 self.send_buffer();
157 }
158 pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
160
161 if self.buffer.empty().len() < capacity {
162 self.send_buffer();
163 self.buffer.ensure_capacity(capacity);
164 }
165
166 assert!(self.buffer.empty().len() >= capacity);
167 self.buffer.empty()
168 }
169 pub fn publish(&mut self) {
171 self.send_buffer();
172 }
173}
174
175impl<P: BytesPush> Drop for SendEndpoint<P> {
176 fn drop(&mut self) {
177 self.send_buffer();
178 }
179}