timely_communication/allocator/zero_copy/
allocator_process.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use std::sync::mpsc::{Sender, Receiver};
7
8use timely_bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Push, Pull};
13use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
14use crate::allocator::canary::Canary;
15use crate::allocator::zero_copy::bytes_slab::BytesRefill;
16use crate::allocator::zero_copy::spill::SpillPolicyFn;
17use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
18
19use super::push_pull::{Pusher, Puller};
20
21pub struct ProcessBuilder {
28 index: usize, peers: usize, pushers: Vec<Receiver<MergeQueue>>, pullers: Vec<Sender<MergeQueue>>, refill: BytesRefill,
33 spill: Option<SpillPolicyFn>, }
35
36impl PeerBuilder for ProcessBuilder {
37 type Peer = ProcessBuilder;
38 fn new_vector(count: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<ProcessBuilder> {
42
43 let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
45
46 pushers_vec
47 .into_iter()
48 .zip(pullers_vec)
49 .enumerate()
50 .map(|(index, (pushers, pullers))|
51 ProcessBuilder {
52 index,
53 peers: count,
54 pushers,
55 pullers,
56 refill: refill.clone(),
57 spill: spill.clone(),
58 }
59 )
60 .collect()
61 }
62}
63
64impl ProcessBuilder {
65 pub fn build(self) -> ProcessAllocator {
67
68 let mut recvs = Vec::with_capacity(self.peers);
70 for puller in self.pullers.into_iter() {
71 let buzzer = crate::buzzer::Buzzer::default();
72 let (writer, reader) = match self.spill.as_ref() {
73 Some(build_fn) => {
74 let (w, r) = build_fn();
75 MergeQueue::new_pair(buzzer, Some(w), Some(r))
76 }
77 None => MergeQueue::new_pair(buzzer, None, None),
78 };
79 recvs.push(reader);
82 puller.send(writer).expect("Failed to send MergeQueue");
83 }
84
85 let mut sends = Vec::with_capacity(self.peers);
87 for pusher in self.pushers.into_iter() {
88 let queue = pusher.recv().expect("Failed to receive MergeQueue");
89 let sendpoint = SendEndpoint::new(queue, self.refill.clone());
90 sends.push(Rc::new(RefCell::new(sendpoint)));
91 }
92
93 ProcessAllocator {
94 index: self.index,
95 peers: self.peers,
96 events: Rc::new(RefCell::new(Default::default())),
97 canaries: Rc::new(RefCell::new(Vec::new())),
98 channel_id_bound: None,
99 staged: Vec::new(),
100 sends,
101 recvs,
102 to_local: HashMap::new(),
103 }
104 }
105}
106
107impl AllocateBuilder for ProcessBuilder {
108 type Allocator = ProcessAllocator;
109 fn build(self) -> Self::Allocator {
111 self.build()
112 }
113
114}
115
116pub struct ProcessAllocator {
118
119 index: usize, peers: usize, events: Rc<RefCell<Vec<usize>>>,
123
124 canaries: Rc<RefCell<Vec<usize>>>,
125
126 channel_id_bound: Option<usize>,
127
128 staged: Vec<Bytes>,
130 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
134
135impl Allocate for ProcessAllocator {
136 fn index(&self) -> usize { self.index }
137 fn peers(&self) -> usize { self.peers }
138 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
139
140 if let Some(bound) = self.channel_id_bound {
142 assert!(bound < identifier);
143 }
144 self.channel_id_bound = Some(identifier);
145
146 let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());
147
148 for target_index in 0 .. self.peers() {
149
150 let header = MessageHeader {
152 channel: identifier,
153 source: self.index,
154 target_lower: target_index,
155 target_upper: target_index+1,
156 length: 0,
157 seqno: 0,
158 };
159
160 pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[target_index]))));
162 }
163
164 let channel = Rc::clone(self.to_local.entry(identifier).or_default());
165
166 use crate::allocator::counters::Puller as CountPuller;
167 let canary = Canary::new(identifier, Rc::clone(&self.canaries));
168 let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, Rc::clone(self.events())));
169
170 (pushes, puller)
171 }
172
173 #[inline(never)]
175 fn receive(&mut self) {
176
177 let mut canaries = self.canaries.borrow_mut();
179 for dropped_channel in canaries.drain(..) {
180 let _dropped =
181 self.to_local
182 .remove(&dropped_channel)
183 .expect("non-existent channel dropped");
184 }
190 std::mem::drop(canaries);
191
192 let mut events = self.events.borrow_mut();
193
194 for recv in self.recvs.iter_mut() {
195 recv.drain_into(&mut self.staged);
196 }
197
198 for mut bytes in self.staged.drain(..) {
199
200 while !bytes.is_empty() {
203
204 if let Some(header) = MessageHeader::try_read(&bytes[..]) {
205
206 let mut peel = bytes.extract_to(header.required_bytes());
208 let _ = peel.extract_to(header.header_bytes());
209
210 events.push(header.channel);
213
214 match self.to_local.entry(header.channel) {
216 Entry::Vacant(entry) => {
217 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
219 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
220 .borrow_mut()
221 .push_back(peel);
222 }
223 }
224 Entry::Occupied(mut entry) => {
225 entry.get_mut().borrow_mut().push_back(peel);
226 }
227 }
228 }
229 else {
230 println!("failed to read full header!");
231 }
232 }
233 }
234 }
235
236 fn release(&mut self) {
238 for send in self.sends.iter_mut() {
240 send.borrow_mut().publish();
241 }
242
243 }
252
253 fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
254 &self.events
255 }
256 fn await_events(&self, duration: Option<std::time::Duration>) {
257 if self.events.borrow().is_empty() {
258 if let Some(duration) = duration {
259 std::thread::park_timeout(duration);
260 }
261 else {
262 std::thread::park();
263 }
264 }
265 }
266}