timely_communication/allocator/zero_copy/
allocator_process.rs
1use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use std::sync::mpsc::{Sender, Receiver};
7
8use timely_bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Push, Pull};
13use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
14use crate::allocator::canary::Canary;
15use crate::allocator::zero_copy::bytes_slab::BytesRefill;
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17
18use super::push_pull::{Pusher, Puller};
19
20pub struct ProcessBuilder {
27 index: usize, peers: usize, pushers: Vec<Receiver<MergeQueue>>, pullers: Vec<Sender<MergeQueue>>, refill: BytesRefill,
32}
33
34impl PeerBuilder for ProcessBuilder {
35 type Peer = ProcessBuilder;
36 fn new_vector(count: usize, refill: BytesRefill) -> Vec<ProcessBuilder> {
40
41 let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
43
44 pushers_vec
45 .into_iter()
46 .zip(pullers_vec)
47 .enumerate()
48 .map(|(index, (pushers, pullers))|
49 ProcessBuilder {
50 index,
51 peers: count,
52 pushers,
53 pullers,
54 refill: refill.clone(),
55 }
56 )
57 .collect()
58 }
59}
60
61impl ProcessBuilder {
62 pub fn build(self) -> ProcessAllocator {
64
65 let mut recvs = Vec::with_capacity(self.peers);
67 for puller in self.pullers.into_iter() {
68 let buzzer = crate::buzzer::Buzzer::default();
69 let queue = MergeQueue::new(buzzer);
70 puller.send(queue.clone()).expect("Failed to send MergeQueue");
71 recvs.push(queue.clone());
72 }
73
74 let mut sends = Vec::with_capacity(self.peers);
76 for pusher in self.pushers.into_iter() {
77 let queue = pusher.recv().expect("Failed to receive MergeQueue");
78 let sendpoint = SendEndpoint::new(queue, self.refill.clone());
79 sends.push(Rc::new(RefCell::new(sendpoint)));
80 }
81
82 ProcessAllocator {
83 index: self.index,
84 peers: self.peers,
85 events: Rc::new(RefCell::new(Default::default())),
86 canaries: Rc::new(RefCell::new(Vec::new())),
87 channel_id_bound: None,
88 staged: Vec::new(),
89 sends,
90 recvs,
91 to_local: HashMap::new(),
92 }
93 }
94}
95
96impl AllocateBuilder for ProcessBuilder {
97 type Allocator = ProcessAllocator;
98 fn build(self) -> Self::Allocator {
100 self.build()
101 }
102
103}
104
105pub struct ProcessAllocator {
107
108 index: usize, peers: usize, events: Rc<RefCell<Vec<usize>>>,
112
113 canaries: Rc<RefCell<Vec<usize>>>,
114
115 channel_id_bound: Option<usize>,
116
117 staged: Vec<Bytes>,
119 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
123
124impl Allocate for ProcessAllocator {
125 fn index(&self) -> usize { self.index }
126 fn peers(&self) -> usize { self.peers }
127 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
128
129 if let Some(bound) = self.channel_id_bound {
131 assert!(bound < identifier);
132 }
133 self.channel_id_bound = Some(identifier);
134
135 let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());
136
137 for target_index in 0 .. self.peers() {
138
139 let header = MessageHeader {
141 channel: identifier,
142 source: self.index,
143 target_lower: target_index,
144 target_upper: target_index+1,
145 length: 0,
146 seqno: 0,
147 };
148
149 pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[target_index]))));
151 }
152
153 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
154
155 use crate::allocator::counters::Puller as CountPuller;
156 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
157 let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, Rc::clone(self.events())));
158
159 (pushes, puller)
160 }
161
162 #[inline(never)]
164 fn receive(&mut self) {
165
166 let mut canaries = self.canaries.borrow_mut();
168 for dropped_channel in canaries.drain(..) {
169 let _dropped =
170 self.to_local
171 .remove(&dropped_channel)
172 .expect("non-existent channel dropped");
173 }
179 std::mem::drop(canaries);
180
181 let mut events = self.events.borrow_mut();
182
183 for recv in self.recvs.iter_mut() {
184 recv.drain_into(&mut self.staged);
185 }
186
187 for mut bytes in self.staged.drain(..) {
188
189 while bytes.len() > 0 {
192
193 if let Some(header) = MessageHeader::try_read(&bytes[..]) {
194
195 let mut peel = bytes.extract_to(header.required_bytes());
197 let _ = peel.extract_to(header.header_bytes());
198
199 events.push(header.channel);
202
203 match self.to_local.entry(header.channel) {
205 Entry::Vacant(entry) => {
206 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
208 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
209 .borrow_mut()
210 .push_back(peel);
211 }
212 }
213 Entry::Occupied(mut entry) => {
214 entry.get_mut().borrow_mut().push_back(peel);
215 }
216 }
217 }
218 else {
219 println!("failed to read full header!");
220 }
221 }
222 }
223 }
224
225 fn release(&mut self) {
227 for send in self.sends.iter_mut() {
229 send.borrow_mut().publish();
230 }
231
232 }
241
242 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
243 &self.events
244 }
245 fn await_events(&self, duration: Option<std::time::Duration>) {
246 if self.events.borrow().is_empty() {
247 if let Some(duration) = duration {
248 std::thread::park_timeout(duration);
249 }
250 else {
251 std::thread::park();
252 }
253 }
254 }
255}