timely_communication/allocator/
process.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::sync::{Arc, Mutex};
6use std::any::Any;
7use std::time::Duration;
8use std::collections::{HashMap};
9use std::sync::mpsc::{Sender, Receiver};
10
11use crate::allocator::thread::{ThreadBuilder};
12use crate::allocator::{Allocate, AllocateBuilder, PeerBuilder, Thread};
13use crate::{Push, Pull};
14use crate::buzzer::Buzzer;
15
16pub struct ProcessBuilder {
18 inner: ThreadBuilder,
19 index: usize,
20 peers: usize,
21 channels: Arc<Mutex<HashMap<usize, Box<dyn Any+Send>>>>,
23
24 buzzers_send: Vec<Sender<Buzzer>>,
26 buzzers_recv: Vec<Receiver<Buzzer>>,
27
28 counters_send: Vec<Sender<usize>>,
29 counters_recv: Receiver<usize>,
30}
31
32impl AllocateBuilder for ProcessBuilder {
33 type Allocator = Process;
34 fn build(self) -> Self::Allocator {
35
36 for worker in self.buzzers_send.iter() {
38 let buzzer = Buzzer::default();
39 worker.send(buzzer).expect("Failed to send buzzer");
40 }
41 let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
42 for worker in self.buzzers_recv.iter() {
43 buzzers.push(worker.recv().expect("Failed to recv buzzer"));
44 }
45
46 Process {
47 inner: self.inner.build(),
48 index: self.index,
49 peers: self.peers,
50 channels: self.channels,
51 buzzers,
52 counters_send: self.counters_send,
53 counters_recv: self.counters_recv,
54 }
55 }
56}
57
58pub struct Process {
60 inner: Thread,
61 index: usize,
62 peers: usize,
63 channels: Arc<Mutex<HashMap<usize, Box<dyn Any+Send>>>>,
65 buzzers: Vec<Buzzer>,
66 counters_send: Vec<Sender<usize>>,
67 counters_recv: Receiver<usize>,
68}
69
70impl Process {
71 pub fn inner(&mut self) -> &mut Thread { &mut self.inner }
73}
74
75impl PeerBuilder for Process {
76 type Peer = ProcessBuilder;
77 fn new_vector(
79 peers: usize,
80 _refill: crate::allocator::BytesRefill,
81 _spill: Option<crate::allocator::zero_copy::spill::SpillPolicyFn>,
82 ) -> Vec<ProcessBuilder> {
83
84 let mut counters_send = Vec::with_capacity(peers);
85 let mut counters_recv = Vec::with_capacity(peers);
86 for _ in 0 .. peers {
87 let (send, recv) = std::sync::mpsc::channel();
88 counters_send.push(send);
89 counters_recv.push(recv);
90 }
91
92 let channels = Arc::new(Mutex::new(HashMap::with_capacity(peers)));
93
94 let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);
96
97 counters_recv
98 .into_iter()
99 .zip(buzzers_send)
100 .zip(buzzers_recv)
101 .enumerate()
102 .map(|(index, ((recv, bsend), brecv))| {
103 ProcessBuilder {
104 inner: ThreadBuilder,
105 index,
106 peers,
107 buzzers_send: bsend,
108 buzzers_recv: brecv,
109 channels: Arc::clone(&channels),
110 counters_send: counters_send.clone(),
111 counters_recv: recv,
112 }
113 })
114 .collect()
115 }
116}
117
118impl Allocate for Process {
119 fn index(&self) -> usize { self.index }
120 fn peers(&self) -> usize { self.peers }
121 fn allocate<T: Any+Send>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
122
123 let mut channels = self.channels.lock().expect("mutex error?");
128
129 let (sends, recv, empty) = {
130
131 let entry = channels.entry(identifier).or_insert_with(|| {
133
134 let mut pushers = Vec::with_capacity(self.peers);
135 let mut pullers = Vec::with_capacity(self.peers);
136 for buzzer in self.buzzers.iter() {
137 let (s, r): (Sender<T>, Receiver<T>) = std::sync::mpsc::channel();
138 pushers.push((Pusher { target: s }, buzzer.clone()));
140 pullers.push(Puller { source: r, current: None });
141 }
142
143 let mut to_box = Vec::with_capacity(pullers.len());
144 for recv in pullers.into_iter() {
145 to_box.push(Some((pushers.clone(), recv)));
146 }
147
148 Box::new(to_box)
149 });
150
151 let vector =
152 entry
153 .downcast_mut::<Vec<Option<(Vec<(Pusher<T>, Buzzer)>, Puller<T>)>>>()
154 .expect("failed to correctly cast channel");
155
156 let (sends, recv) =
157 vector[self.index]
158 .take()
159 .expect("channel already consumed");
160
161 let empty = vector.iter().all(|x| x.is_none());
162
163 (sends, recv, empty)
164 };
165
166 if empty { channels.remove(&identifier); }
169
170 use crate::allocator::counters::ArcPusher as CountPusher;
171 use crate::allocator::counters::Puller as CountPuller;
172
173 let sends =
174 sends.into_iter()
175 .zip(self.counters_send.iter())
176 .map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
177 .map(|s| Box::new(s) as Box<dyn Push<T>>)
178 .collect::<Vec<_>>();
179
180 let recv = Box::new(CountPuller::new(recv, identifier, Rc::clone(self.inner.events()))) as Box<dyn Pull<T>>;
181
182 (sends, recv)
183 }
184
185 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
186 self.inner.events()
187 }
188
189 fn await_events(&self, duration: Option<Duration>) {
190 self.inner.await_events(duration);
191 }
192
193 fn receive(&mut self) {
194 let mut events = self.inner.events().borrow_mut();
195 while let Ok(index) = self.counters_recv.try_recv() {
196 events.push(index);
197 }
198 }
199}
200
201struct Pusher<T> {
203 target: Sender<T>,
204}
205
206impl<T> Clone for Pusher<T> {
207 fn clone(&self) -> Self {
208 Self {
209 target: self.target.clone(),
210 }
211 }
212}
213
214impl<T> Push<T> for Pusher<T> {
215 #[inline] fn push(&mut self, element: &mut Option<T>) {
216 if let Some(element) = element.take() {
217 let _ = self.target.send(element);
220 }
221 }
222}
223
224struct Puller<T> {
226 current: Option<T>,
227 source: Receiver<T>,
228}
229
230impl<T> Pull<T> for Puller<T> {
231 #[inline]
232 fn pull(&mut self) -> &mut Option<T> {
233 self.current = self.source.try_recv().ok();
234 &mut self.current
235 }
236}