timely_communication/allocator/
thread.rs
1use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6use std::collections::VecDeque;
7
8use crate::allocator::{Allocate, AllocateBuilder};
9use crate::allocator::counters::Pusher as CountPusher;
10use crate::allocator::counters::Puller as CountPuller;
11use crate::{Push, Pull};
12
13pub struct ThreadBuilder;
15
16impl AllocateBuilder for ThreadBuilder {
17 type Allocator = Thread;
18 fn build(self) -> Self::Allocator { Thread::default() }
19}
20
21
22#[derive(Default)]
24pub struct Thread {
25 events: Rc<RefCell<Vec<usize>>>,
27}
28
29impl Allocate for Thread {
30 fn index(&self) -> usize { 0 }
31 fn peers(&self) -> usize { 1 }
32 fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
33 let (pusher, puller) = Thread::new_from(identifier, Rc::clone(&self.events));
34 (vec![Box::new(pusher)], Box::new(puller))
35 }
36 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
37 &self.events
38 }
39 fn await_events(&self, duration: Option<Duration>) {
40 if self.events.borrow().is_empty() {
41 if let Some(duration) = duration {
42 std::thread::park_timeout(duration);
43 }
44 else {
45 std::thread::park();
46 }
47 }
48 }
49}
50
51pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
53pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;
55
56impl Thread {
57 pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
59 -> (ThreadPusher<T>, ThreadPuller<T>)
60 {
61 let shared = Rc::new(RefCell::new((VecDeque::<T>::new(), VecDeque::<T>::new())));
62 let pusher = Pusher { target: Rc::clone(&shared) };
63 let pusher = CountPusher::new(pusher, identifier, Rc::clone(&events));
64 let puller = Puller { source: shared, current: None };
65 let puller = CountPuller::new(puller, identifier, events);
66 (pusher, puller)
67 }
68}
69
70
71pub struct Pusher<T> {
73 target: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
74}
75
76impl<T> Push<T> for Pusher<T> {
77 #[inline]
78 fn push(&mut self, element: &mut Option<T>) {
79 let mut borrow = self.target.borrow_mut();
80 if let Some(element) = element.take() {
81 borrow.0.push_back(element);
82 }
83 *element = borrow.1.pop_front();
84 }
85}
86
87pub struct Puller<T> {
89 current: Option<T>,
90 source: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
91}
92
93impl<T> Pull<T> for Puller<T> {
94 #[inline]
95 fn pull(&mut self) -> &mut Option<T> {
96 let mut borrow = self.source.borrow_mut();
97 self.current = borrow.0.pop_front();
104 &mut self.current
105 }
106}