timely_communication/allocator/
thread.rsuse std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
pub struct ThreadBuilder;
impl AllocateBuilder for ThreadBuilder {
type Allocator = Thread;
fn build(self) -> Self::Allocator { Thread::new() }
}
pub struct Thread {
events: Rc<RefCell<Vec<usize>>>,
}
impl Allocate for Thread {
fn index(&self) -> usize { 0 }
fn peers(&self) -> usize { 1 }
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}
pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;
impl Thread {
pub fn new() -> Self {
Thread {
events: Rc::new(RefCell::new(Default::default())),
}
}
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
let pusher = Pusher { target: shared.clone() };
let pusher = CountPusher::new(pusher, identifier, events.clone());
let puller = Puller { source: shared, current: None };
let puller = CountPuller::new(puller, identifier, events);
(pusher, puller)
}
}
pub struct Pusher<T> {
target: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
}
impl<T> Push<T> for Pusher<T> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
let mut borrow = self.target.borrow_mut();
if let Some(element) = element.take() {
borrow.0.push_back(element);
}
*element = borrow.1.pop_front();
}
}
pub struct Puller<T> {
current: Option<T>,
source: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
}
impl<T> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<T> {
let mut borrow = self.source.borrow_mut();
self.current = borrow.0.pop_front();
&mut self.current
}
}