timely_communication/allocator/zero_copy/
allocator.rs1use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use std::sync::mpsc::{Sender, Receiver};
6
7use timely_bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Push, Pull};
12use crate::allocator::{AllocateBuilder, Exchangeable};
13use crate::allocator::canary::Canary;
14use crate::allocator::zero_copy::bytes_slab::BytesRefill;
15use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
16use super::push_pull::{Pusher, PullerInner};
17
18pub struct TcpBuilder<A: AllocateBuilder> {
25 inner: A,
26 index: usize, peers: usize, futures: Vec<Receiver<MergeQueue>>, promises: Vec<Sender<MergeQueue>>, refill: BytesRefill,
32}
33
34pub fn new_vector<A: AllocateBuilder>(
47 allocators: Vec<A>,
48 my_process: usize,
49 processes: usize,
50 refill: BytesRefill,
51) -> (Vec<TcpBuilder<A>>,
52 Vec<Vec<Sender<MergeQueue>>>,
53 Vec<Vec<Receiver<MergeQueue>>>)
54{
55 let threads = allocators.len();
56
57 let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
59 let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
60
61 let builders =
62 allocators
63 .into_iter()
64 .zip(worker_promises)
65 .zip(worker_futures)
66 .enumerate()
67 .map(|(index, ((inner, promises), futures))| {
68 TcpBuilder {
69 inner,
70 index: my_process * threads + index,
71 peers: threads * processes,
72 promises,
73 futures,
74 refill: refill.clone(),
75 }})
76 .collect();
77
78 (builders, network_promises, network_futures)
79}
80
81impl<A: AllocateBuilder> TcpBuilder<A> {
82
83 pub fn build(self) -> TcpAllocator<A::Allocator> {
85
86 let mut recvs = Vec::with_capacity(self.peers);
88 for promise in self.promises.into_iter() {
89 let buzzer = crate::buzzer::Buzzer::default();
90 let queue = MergeQueue::new(buzzer);
91 promise.send(queue.clone()).expect("Failed to send MergeQueue");
92 recvs.push(queue.clone());
93 }
94
95 let mut sends = Vec::with_capacity(self.peers);
97 for pusher in self.futures.into_iter() {
98 let queue = pusher.recv().expect("Failed to receive push queue");
99 let sendpoint = SendEndpoint::new(queue, self.refill.clone());
100 sends.push(Rc::new(RefCell::new(sendpoint)));
101 }
102
103 TcpAllocator {
107 inner: self.inner.build(),
108 index: self.index,
109 peers: self.peers,
110 canaries: Rc::new(RefCell::new(Vec::new())),
111 channel_id_bound: None,
112 staged: Vec::new(),
113 sends,
114 recvs,
115 to_local: HashMap::new(),
116 }
117 }
118}
119
120pub struct TcpAllocator<A: Allocate> {
122
123 inner: A, index: usize, peers: usize, staged: Vec<Bytes>, canaries: Rc<RefCell<Vec<usize>>>,
130
131 channel_id_bound: Option<usize>,
132
133 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
138
139impl<A: Allocate> Allocate for TcpAllocator<A> {
140 fn index(&self) -> usize { self.index }
141 fn peers(&self) -> usize { self.peers }
142 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
143
144 if let Some(bound) = self.channel_id_bound {
146 assert!(bound < identifier);
147 }
148 self.channel_id_bound = Some(identifier);
149
150 let mut pushes = Vec::<Box<dyn Push<T>>>::new();
152
153 let inner_peers = self.inner.peers();
155 let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
156
157 for target_index in 0 .. self.peers() {
158
159 let mut process_id = target_index / inner_peers;
161
162 if process_id == self.index / inner_peers {
163 pushes.push(inner_sends.remove(0));
164 }
165 else {
166 let header = MessageHeader {
168 channel: identifier,
169 source: self.index,
170 target_lower: target_index,
171 target_upper: target_index + 1,
172 length: 0,
173 seqno: 0,
174 };
175
176 if process_id > self.index / inner_peers { process_id -= 1; }
178 pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id]))));
179 }
180 }
181
182 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
183
184 use crate::allocator::counters::Puller as CountPuller;
185 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
186 let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
187
188 (pushes, puller, )
189 }
190
191 fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
192
193 if let Some(bound) = self.channel_id_bound {
195 assert!(bound < identifier);
196 }
197 self.channel_id_bound = Some(identifier);
198
199 let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
202
203 let inner_peers = self.inner.peers();
205 let (inner_send, inner_recv) = self.inner.broadcast(identifier);
206
207 pushes.push(inner_send);
208 for (mut index, send) in self.sends.iter().enumerate() {
209 if index >= self.index/inner_peers { index += 1; }
212 let header = MessageHeader {
213 channel: identifier,
214 source: self.index,
215 target_lower: index * inner_peers,
216 target_upper: index * inner_peers + inner_peers,
217 length: 0,
218 seqno: 0,
219 };
220 pushes.push(Box::new(Pusher::new(header, Rc::clone(send))))
221 }
222
223 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
224
225 use crate::allocator::counters::Puller as CountPuller;
226 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
227 let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
228
229 let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
230 (pushes, puller, )
231 }
232
233 #[inline(never)]
235 fn receive(&mut self) {
236
237 let mut canaries = self.canaries.borrow_mut();
239 for dropped_channel in canaries.drain(..) {
240 let _dropped =
241 self.to_local
242 .remove(&dropped_channel)
243 .expect("non-existent channel dropped");
244 }
250 ::std::mem::drop(canaries);
251
252 self.inner.receive();
253
254 for recv in self.recvs.iter_mut() {
255 recv.drain_into(&mut self.staged);
256 }
257
258 let mut events = self.inner.events().borrow_mut();
259
260 for mut bytes in self.staged.drain(..) {
261
262 while !bytes.is_empty() {
265
266 if let Some(header) = MessageHeader::try_read(&bytes[..]) {
267
268 let mut peel = bytes.extract_to(header.required_bytes());
270 let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
271
272 events.push(header.channel);
275
276 match self.to_local.entry(header.channel) {
278 Entry::Vacant(entry) => {
279 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
281 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
282 .borrow_mut()
283 .push_back(peel);
284 }
285 }
286 Entry::Occupied(mut entry) => {
287 entry.get_mut().borrow_mut().push_back(peel);
288 }
289 }
290 }
291 else {
292 println!("failed to read full header!");
293 }
294 }
295 }
296 }
297
298 fn release(&mut self) {
300 for send in self.sends.iter_mut() {
302 send.borrow_mut().publish();
303 }
304
305 }
314 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
315 self.inner.events()
316 }
317 fn await_events(&self, duration: Option<std::time::Duration>) {
318 self.inner.await_events(duration);
319 }
320}