timely_communication/allocator/zero_copy/
allocator.rs1use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use std::sync::mpsc::{Sender, Receiver};
6
7use timely_bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Push, Pull};
12use crate::allocator::{Process, ProcessBuilder, Exchangeable};
13use crate::allocator::canary::Canary;
14use crate::allocator::zero_copy::bytes_slab::BytesRefill;
15use crate::allocator::zero_copy::spill::SpillPolicyFn;
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17use super::push_pull::{Pusher, PullerInner};
18
19pub struct TcpBuilder {
26 inner: ProcessBuilder,
27 index: usize, peers: usize, futures: Vec<Receiver<MergeQueue>>, promises: Vec<Sender<MergeQueue>>, refill: BytesRefill,
33 spill: Option<SpillPolicyFn>,
35}
36
37pub(crate) fn new_vector(
50 allocators: Vec<ProcessBuilder>,
51 my_process: usize,
52 processes: usize,
53 refill: BytesRefill,
54 spill: Option<SpillPolicyFn>,
55) -> (Vec<TcpBuilder>,
56 Vec<Vec<Sender<MergeQueue>>>,
57 Vec<Vec<Receiver<MergeQueue>>>)
58{
59 let threads = allocators.len();
60
61 let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
63 let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
64
65 let builders =
66 allocators
67 .into_iter()
68 .zip(worker_promises)
69 .zip(worker_futures)
70 .enumerate()
71 .map(|(index, ((inner, promises), futures))| {
72 TcpBuilder {
73 inner,
74 index: my_process * threads + index,
75 peers: threads * processes,
76 promises,
77 futures,
78 refill: refill.clone(),
79 spill: spill.clone(),
80 }})
81 .collect();
82
83 (builders, network_promises, network_futures)
84}
85
86impl TcpBuilder {
87
88 pub fn build(self) -> TcpAllocator {
90
91 let mut recvs = Vec::with_capacity(self.peers);
93 for promise in self.promises.into_iter() {
94 let buzzer = crate::buzzer::Buzzer::default();
95 let (writer, reader) = match self.spill.as_ref() {
96 Some(build_fn) => {
97 let (w, r) = build_fn();
98 MergeQueue::new_pair(buzzer, Some(w), Some(r))
99 }
100 None => MergeQueue::new_pair(buzzer, None, None),
101 };
102 recvs.push(reader);
105 promise.send(writer).expect("Failed to send MergeQueue");
106 }
107
108 let mut sends = Vec::with_capacity(self.peers);
110 for pusher in self.futures.into_iter() {
111 let queue = pusher.recv().expect("Failed to receive push queue");
112 let sendpoint = SendEndpoint::new(queue, self.refill.clone());
113 sends.push(Rc::new(RefCell::new(sendpoint)));
114 }
115
116 TcpAllocator {
120 inner: self.inner.build(),
121 index: self.index,
122 peers: self.peers,
123 canaries: Rc::new(RefCell::new(Vec::new())),
124 channel_id_bound: None,
125 staged: Vec::new(),
126 sends,
127 recvs,
128 to_local: HashMap::new(),
129 }
130 }
131}
132
133pub struct TcpAllocator {
135
136 inner: Process, index: usize, peers: usize, staged: Vec<Bytes>, canaries: Rc<RefCell<Vec<usize>>>,
143
144 channel_id_bound: Option<usize>,
145
146 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
151
152impl Allocate for TcpAllocator {
153 fn index(&self) -> usize { self.index }
154 fn peers(&self) -> usize { self.peers }
155 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
156
157 if let Some(bound) = self.channel_id_bound {
159 assert!(bound < identifier);
160 }
161 self.channel_id_bound = Some(identifier);
162
163 let mut pushes = Vec::<Box<dyn Push<T>>>::new();
165
166 let inner_peers = self.inner.peers();
168 let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
169
170 for target_index in 0 .. self.peers() {
171
172 let mut process_id = target_index / inner_peers;
174
175 if process_id == self.index / inner_peers {
176 pushes.push(inner_sends.remove(0));
177 }
178 else {
179 let header = MessageHeader {
181 channel: identifier,
182 source: self.index,
183 target_lower: target_index,
184 target_upper: target_index + 1,
185 length: 0,
186 seqno: 0,
187 };
188
189 if process_id > self.index / inner_peers { process_id -= 1; }
191 pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id]))));
192 }
193 }
194
195 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
196
197 use crate::allocator::counters::Puller as CountPuller;
198 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
199 let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
200
201 (pushes, puller, )
202 }
203
204 fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
205
206 if let Some(bound) = self.channel_id_bound {
208 assert!(bound < identifier);
209 }
210 self.channel_id_bound = Some(identifier);
211
212 let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
215
216 let inner_peers = self.inner.peers();
218 let (inner_send, inner_recv) = self.inner.broadcast(identifier);
219
220 pushes.push(inner_send);
221 for (mut index, send) in self.sends.iter().enumerate() {
222 if index >= self.index/inner_peers { index += 1; }
225 let header = MessageHeader {
226 channel: identifier,
227 source: self.index,
228 target_lower: index * inner_peers,
229 target_upper: index * inner_peers + inner_peers,
230 length: 0,
231 seqno: 0,
232 };
233 pushes.push(Box::new(Pusher::new(header, Rc::clone(send))))
234 }
235
236 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
237
238 use crate::allocator::counters::Puller as CountPuller;
239 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
240 let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
241
242 let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
243 (pushes, puller, )
244 }
245
246 #[inline(never)]
248 fn receive(&mut self) {
249
250 let mut canaries = self.canaries.borrow_mut();
252 for dropped_channel in canaries.drain(..) {
253 let _dropped =
254 self.to_local
255 .remove(&dropped_channel)
256 .expect("non-existent channel dropped");
257 }
263 ::std::mem::drop(canaries);
264
265 self.inner.receive();
266
267 for recv in self.recvs.iter_mut() {
268 recv.drain_into(&mut self.staged);
269 }
270
271 let mut events = self.inner.events().borrow_mut();
272
273 for mut bytes in self.staged.drain(..) {
274
275 while !bytes.is_empty() {
278
279 if let Some(header) = MessageHeader::try_read(&bytes[..]) {
280
281 let mut peel = bytes.extract_to(header.required_bytes());
283 let _ = peel.extract_to(header.header_bytes());
284
285 events.push(header.channel);
288
289 match self.to_local.entry(header.channel) {
291 Entry::Vacant(entry) => {
292 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
294 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
295 .borrow_mut()
296 .push_back(peel);
297 }
298 }
299 Entry::Occupied(mut entry) => {
300 entry.get_mut().borrow_mut().push_back(peel);
301 }
302 }
303 }
304 else {
305 println!("failed to read full header!");
306 }
307 }
308 }
309 }
310
311 fn release(&mut self) {
313 for send in self.sends.iter_mut() {
315 send.borrow_mut().publish();
316 }
317
318 }
327 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
328 self.inner.events()
329 }
330 fn await_events(&self, duration: Option<std::time::Duration>) {
331 self.inner.await_events(duration);
332 }
333}