use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use timely_bytes::arc::Bytes;
use super::bytes_slab::BytesSlab;
pub trait BytesPush {
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
}
pub trait BytesPull {
fn drain_into(&mut self, vec: &mut Vec<Bytes>);
}
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Clone)]
pub struct MergeQueue {
queue: Arc<Mutex<VecDeque<Bytes>>>, buzzer: crate::buzzer::Buzzer, panic: Arc<AtomicBool>,
}
impl MergeQueue {
pub fn new(buzzer: crate::buzzer::Buzzer) -> Self {
MergeQueue {
queue: Arc::new(Mutex::new(VecDeque::new())),
buzzer,
panic: Arc::new(AtomicBool::new(false)),
}
}
pub fn is_complete(&self) -> bool {
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty()
}
}
impl BytesPush for MergeQueue {
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
let mut lock_ok = self.queue.try_lock();
while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
lock_ok = self.queue.try_lock();
}
let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
let mut iterator = iterator.into_iter();
let mut should_ping = false;
if let Some(bytes) = iterator.next() {
let mut tail = if let Some(mut tail) = queue.pop_back() {
if let Err(bytes) = tail.try_merge(bytes) {
queue.push_back(::std::mem::replace(&mut tail, bytes));
}
tail
}
else {
should_ping = true;
bytes
};
for bytes in iterator {
if let Err(bytes) = tail.try_merge(bytes) {
queue.push_back(::std::mem::replace(&mut tail, bytes));
}
}
queue.push_back(tail);
}
::std::mem::drop(queue);
if should_ping {
self.buzzer.buzz(); }
}
}
impl BytesPull for MergeQueue {
fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
let mut lock_ok = self.queue.try_lock();
while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
lock_ok = self.queue.try_lock();
}
let mut queue = lock_ok.expect("MergeQueue mutex poisoned.");
vec.extend(queue.drain(..));
}
}
impl Drop for MergeQueue {
fn drop(&mut self) {
if ::std::thread::panicking() {
self.panic.store(true, Ordering::SeqCst);
}
else {
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
}
self.queue = Arc::new(Mutex::new(VecDeque::new()));
self.buzzer.buzz();
}
}
pub struct SendEndpoint<P: BytesPush> {
send: P,
buffer: BytesSlab,
}
impl<P: BytesPush> SendEndpoint<P> {
fn send_buffer(&mut self) {
let valid_len = self.buffer.valid().len();
if valid_len > 0 {
self.send.extend(Some(self.buffer.extract(valid_len)));
}
}
pub fn new(queue: P) -> Self {
SendEndpoint {
send: queue,
buffer: BytesSlab::new(20),
}
}
pub fn make_valid(&mut self, bytes: usize) {
self.buffer.make_valid(bytes);
self.send_buffer();
}
pub fn reserve(&mut self, capacity: usize) -> &mut [u8] {
if self.buffer.empty().len() < capacity {
self.send_buffer();
self.buffer.ensure_capacity(capacity);
}
assert!(self.buffer.empty().len() >= capacity);
self.buffer.empty()
}
pub fn publish(&mut self) {
self.send_buffer();
}
}
impl<P: BytesPush> Drop for SendEndpoint<P> {
fn drop(&mut self) {
self.send_buffer();
}
}