timely_communication/allocator/zero_copy/
allocator.rs

1//! Zero-copy allocator based on TCP.
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use std::sync::mpsc::{Sender, Receiver};
6
7use timely_bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Push, Pull};
12use crate::allocator::{AllocateBuilder, Exchangeable};
13use crate::allocator::canary::Canary;
14use crate::allocator::zero_copy::bytes_slab::BytesRefill;
15use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
16use super::push_pull::{Pusher, PullerInner};
17
18/// Builds an instance of a TcpAllocator.
19///
20/// Builders are required because some of the state in a `TcpAllocator` cannot be sent between
21/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
22/// shared between threads here, and then provide a method that will instantiate the non-movable
23/// members once in the destination thread.
24pub struct TcpBuilder<A: AllocateBuilder> {
25    inner:  A,
26    index:  usize,                      // number out of peers
27    peers:  usize,                      // number of peer allocators.
28    futures:   Vec<Receiver<MergeQueue>>,  // to receive queues to each network thread.
29    promises:   Vec<Sender<MergeQueue>>,    // to send queues from each network thread.
30    /// Byte slab refill function.
31    refill: BytesRefill,
32}
33
34/// Creates a vector of builders, sharing appropriate state.
35///
36/// `threads` is the number of workers in a single process, `processes` is the
37/// total number of processes.
38/// The returned tuple contains
39/// ```ignore
40/// (
41///   AllocateBuilder for local threads,
42///   info to spawn egress comm threads,
43///   info to spawn ingress comm thresds,
44/// )
45/// ```
46pub fn new_vector<A: AllocateBuilder>(
47    allocators: Vec<A>,
48    my_process: usize,
49    processes: usize,
50    refill: BytesRefill,
51) -> (Vec<TcpBuilder<A>>,
52    Vec<Vec<Sender<MergeQueue>>>,
53    Vec<Vec<Receiver<MergeQueue>>>)
54{
55    let threads = allocators.len();
56
57    // For queues from worker threads to network threads, and vice versa.
58    let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
59    let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
60
61    let builders =
62    allocators
63        .into_iter()
64        .zip(worker_promises)
65        .zip(worker_futures)
66        .enumerate()
67        .map(|(index, ((inner, promises), futures))| {
68            TcpBuilder {
69                inner,
70                index: my_process * threads + index,
71                peers: threads * processes,
72                promises,
73                futures,
74                refill: refill.clone(),
75            }})
76        .collect();
77
78    (builders, network_promises, network_futures)
79}
80
81impl<A: AllocateBuilder> TcpBuilder<A> {
82
83    /// Builds a `TcpAllocator`, instantiating `Rc<RefCell<_>>` elements.
84    pub fn build(self) -> TcpAllocator<A::Allocator> {
85
86        // Fulfill puller obligations.
87        let mut recvs = Vec::with_capacity(self.peers);
88        for promise in self.promises.into_iter() {
89            let buzzer = crate::buzzer::Buzzer::default();
90            let queue = MergeQueue::new(buzzer);
91            promise.send(queue.clone()).expect("Failed to send MergeQueue");
92            recvs.push(queue.clone());
93        }
94
95        // Extract pusher commitments.
96        let mut sends = Vec::with_capacity(self.peers);
97        for pusher in self.futures.into_iter() {
98            let queue = pusher.recv().expect("Failed to receive push queue");
99            let sendpoint = SendEndpoint::new(queue, self.refill.clone());
100            sends.push(Rc::new(RefCell::new(sendpoint)));
101        }
102
103        // let sends: Vec<_> = self.sends.into_iter().map(
104        //     |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();
105
106        TcpAllocator {
107            inner: self.inner.build(),
108            index: self.index,
109            peers: self.peers,
110            canaries: Rc::new(RefCell::new(Vec::new())),
111            channel_id_bound: None,
112            staged: Vec::new(),
113            sends,
114            recvs,
115            to_local: HashMap::new(),
116        }
117    }
118}
119
120/// A TCP-based allocator for inter-process communication.
121pub struct TcpAllocator<A: Allocate> {
122
123    inner:      A,                                  // A non-serialized inner allocator for process-local peers.
124
125    index:      usize,                              // number out of peers
126    peers:      usize,                              // number of peer allocators (for typed channel allocation).
127
128    staged:     Vec<Bytes>,                         // staging area for incoming Bytes
129    canaries:   Rc<RefCell<Vec<usize>>>,
130
131    channel_id_bound: Option<usize>,
132
133    // sending, receiving, and responding to binary buffers.
134    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>,     // sends[x] -> goes to process x.
135    recvs:      Vec<MergeQueue>,                                // recvs[x] <- from process x.
136    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,   // to worker-local typed pullers.
137}
138
139impl<A: Allocate> Allocate for TcpAllocator<A> {
140    fn index(&self) -> usize { self.index }
141    fn peers(&self) -> usize { self.peers }
142    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
143
144        // Assume and enforce in-order identifier allocation.
145        if let Some(bound) = self.channel_id_bound {
146            assert!(bound < identifier);
147        }
148        self.channel_id_bound = Some(identifier);
149
150        // Result list of boxed pushers.
151        let mut pushes = Vec::<Box<dyn Push<T>>>::new();
152
153        // Inner exchange allocations.
154        let inner_peers = self.inner.peers();
155        let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
156
157        for target_index in 0 .. self.peers() {
158
159            // TODO: crappy place to hardcode this rule.
160            let mut process_id = target_index / inner_peers;
161
162            if process_id == self.index / inner_peers {
163                pushes.push(inner_sends.remove(0));
164            }
165            else {
166                // message header template.
167                let header = MessageHeader {
168                    channel:    identifier,
169                    source:     self.index,
170                    target_lower:     target_index,
171                    target_upper:     target_index + 1,
172                    length:     0,
173                    seqno:      0,
174                };
175
176                // create, box, and stash new process_binary pusher.
177                if process_id > self.index / inner_peers { process_id -= 1; }
178                pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id]))));
179            }
180        }
181
182        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
183
184        use crate::allocator::counters::Puller as CountPuller;
185        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
186        let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
187
188        (pushes, puller, )
189    }
190
191    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
192
193        // Assume and enforce in-order identifier allocation.
194        if let Some(bound) = self.channel_id_bound {
195            assert!(bound < identifier);
196        }
197        self.channel_id_bound = Some(identifier);
198
199        // Result list of boxed pushers.
200        // One entry for each process.
201        let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
202
203        // Inner exchange allocations.
204        let inner_peers = self.inner.peers();
205        let (inner_send, inner_recv) = self.inner.broadcast(identifier);
206
207        pushes.push(inner_send);
208        for (mut index, send) in self.sends.iter().enumerate() {
209            // The span of worker indexes jumps by `inner_peers` as we skip our own process.
210            // We bump `index` by one as we pass `self.index/inner_peers` to effect this.
211            if index >= self.index/inner_peers { index += 1; }
212            let header = MessageHeader {
213                channel: identifier,
214                source: self.index,
215                target_lower: index * inner_peers,
216                target_upper: index * inner_peers + inner_peers,
217                length: 0,
218                seqno: 0,
219            };
220            pushes.push(Box::new(Pusher::new(header, Rc::clone(send))))
221        }
222
223        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
224
225        use crate::allocator::counters::Puller as CountPuller;
226        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
227        let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
228
229        let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
230        (pushes, puller, )
231    }
232
233    // Perform preparatory work, most likely reading binary buffers from self.recv.
234    #[inline(never)]
235    fn receive(&mut self) {
236
237        // Check for channels whose `Puller` has been dropped.
238        let mut canaries = self.canaries.borrow_mut();
239        for dropped_channel in canaries.drain(..) {
240            let _dropped =
241            self.to_local
242                .remove(&dropped_channel)
243                .expect("non-existent channel dropped");
244            // Borrowed channels may be non-empty, if the dataflow was forcibly
245            // dropped. The contract is that if a dataflow is dropped, all other
246            // workers will drop the dataflow too, without blocking indefinitely
247            // on events from it.
248            // assert!(dropped.borrow().is_empty());
249        }
250        ::std::mem::drop(canaries);
251
252        self.inner.receive();
253
254        for recv in self.recvs.iter_mut() {
255            recv.drain_into(&mut self.staged);
256        }
257
258        let mut events = self.inner.events().borrow_mut();
259
260        for mut bytes in self.staged.drain(..) {
261
262            // We expect that `bytes` contains an integral number of messages.
263            // No splitting occurs across allocations.
264            while !bytes.is_empty() {
265
266                if let Some(header) = MessageHeader::try_read(&bytes[..]) {
267
268                    // Get the header and payload, ditch the header.
269                    let mut peel = bytes.extract_to(header.required_bytes());
270                    let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
271
272                    // Increment message count for channel.
273                    // Safe to do this even if the channel has been dropped.
274                    events.push(header.channel);
275
276                    // Ensure that a queue exists.
277                    match self.to_local.entry(header.channel) {
278                        Entry::Vacant(entry) => {
279                            // We may receive data before allocating, and shouldn't block.
280                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
281                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
282                                    .borrow_mut()
283                                    .push_back(peel);
284                            }
285                        }
286                        Entry::Occupied(mut entry) => {
287                            entry.get_mut().borrow_mut().push_back(peel);
288                        }
289                    }
290                }
291                else {
292                    println!("failed to read full header!");
293                }
294            }
295        }
296    }
297
298    // Perform postparatory work, most likely sending un-full binary buffers.
299    fn release(&mut self) {
300        // Publish outgoing byte ledgers.
301        for send in self.sends.iter_mut() {
302            send.borrow_mut().publish();
303        }
304
305        // OPTIONAL: Tattle on channels sitting on borrowed data.
306        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
307        // for (index, list) in self.to_local.iter() {
308        //     let len = list.borrow_mut().len();
309        //     if len > 0 {
310        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
311        //     }
312        // }
313    }
314    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
315        self.inner.events()
316    }
317    fn await_events(&self, duration: Option<std::time::Duration>) {
318        self.inner.await_events(duration);
319    }
320}