Skip to main content

timely_communication/allocator/zero_copy/
allocator.rs

1//! Zero-copy allocator based on TCP.
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use std::sync::mpsc::{Sender, Receiver};
6
7use timely_bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Push, Pull};
12use crate::allocator::{Process, ProcessBuilder, Exchangeable};
13use crate::allocator::canary::Canary;
14use crate::allocator::zero_copy::bytes_slab::BytesRefill;
15use crate::allocator::zero_copy::spill::SpillPolicyFn;
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17use super::push_pull::{Pusher, PullerInner};
18
19/// Builds an instance of a TcpAllocator.
20///
21/// Builders are required because some of the state in a `TcpAllocator` cannot be sent between
22/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
23/// shared between threads here, and then provide a method that will instantiate the non-movable
24/// members once in the destination thread.
25pub struct TcpBuilder {
26    inner:  ProcessBuilder,
27    index:  usize,                      // number out of peers
28    peers:  usize,                      // number of peer allocators.
29    futures:   Vec<Receiver<MergeQueue>>,  // to receive queues to each network thread.
30    promises:   Vec<Sender<MergeQueue>>,    // to send queues from each network thread.
31    /// Byte slab refill function.
32    refill: BytesRefill,
33    /// Optional spill factory for recv queues constructed in `build()`.
34    spill: Option<SpillPolicyFn>,
35}
36
37/// Creates a vector of builders, sharing appropriate state.
38///
39/// `threads` is the number of workers in a single process, `processes` is the
40/// total number of processes.
41/// The returned tuple contains
42/// ```ignore
43/// (
44///   builders for local threads,
45///   info to spawn egress comm threads,
46///   info to spawn ingress comm thresds,
47/// )
48/// ```
49pub(crate) fn new_vector(
50    allocators: Vec<ProcessBuilder>,
51    my_process: usize,
52    processes: usize,
53    refill: BytesRefill,
54    spill: Option<SpillPolicyFn>,
55) -> (Vec<TcpBuilder>,
56    Vec<Vec<Sender<MergeQueue>>>,
57    Vec<Vec<Receiver<MergeQueue>>>)
58{
59    let threads = allocators.len();
60
61    // For queues from worker threads to network threads, and vice versa.
62    let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
63    let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
64
65    let builders =
66    allocators
67        .into_iter()
68        .zip(worker_promises)
69        .zip(worker_futures)
70        .enumerate()
71        .map(|(index, ((inner, promises), futures))| {
72            TcpBuilder {
73                inner,
74                index: my_process * threads + index,
75                peers: threads * processes,
76                promises,
77                futures,
78                refill: refill.clone(),
79                spill: spill.clone(),
80            }})
81        .collect();
82
83    (builders, network_promises, network_futures)
84}
85
86impl TcpBuilder {
87
88    /// Builds a `TcpAllocator`, instantiating `Rc<RefCell<_>>` elements.
89    pub fn build(self) -> TcpAllocator {
90
91        // Fulfill puller obligations.
92        let mut recvs = Vec::with_capacity(self.peers);
93        for promise in self.promises.into_iter() {
94            let buzzer = crate::buzzer::Buzzer::default();
95            let (writer, reader) = match self.spill.as_ref() {
96                Some(build_fn) => {
97                    let (w, r) = build_fn();
98                    MergeQueue::new_pair(buzzer, Some(w), Some(r))
99                }
100                None => MergeQueue::new_pair(buzzer, None, None),
101            };
102            // `recv_loop` is the writer here (it `extend`s with bytes from
103            // TCP), so it keeps the original; the worker is the reader.
104            recvs.push(reader);
105            promise.send(writer).expect("Failed to send MergeQueue");
106        }
107
108        // Extract pusher commitments.
109        let mut sends = Vec::with_capacity(self.peers);
110        for pusher in self.futures.into_iter() {
111            let queue = pusher.recv().expect("Failed to receive push queue");
112            let sendpoint = SendEndpoint::new(queue, self.refill.clone());
113            sends.push(Rc::new(RefCell::new(sendpoint)));
114        }
115
116        // let sends: Vec<_> = self.sends.into_iter().map(
117        //     |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();
118
119        TcpAllocator {
120            inner: self.inner.build(),
121            index: self.index,
122            peers: self.peers,
123            canaries: Rc::new(RefCell::new(Vec::new())),
124            channel_id_bound: None,
125            staged: Vec::new(),
126            sends,
127            recvs,
128            to_local: HashMap::new(),
129        }
130    }
131}
132
133/// A TCP-based allocator for inter-process communication.
134pub struct TcpAllocator {
135
136    inner:      Process,                           // A non-serialized inner allocator for process-local peers.
137
138    index:      usize,                              // number out of peers
139    peers:      usize,                              // number of peer allocators (for typed channel allocation).
140
141    staged:     Vec<Bytes>,                         // staging area for incoming Bytes
142    canaries:   Rc<RefCell<Vec<usize>>>,
143
144    channel_id_bound: Option<usize>,
145
146    // sending, receiving, and responding to binary buffers.
147    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>,     // sends[x] -> goes to process x.
148    recvs:      Vec<MergeQueue>,                                // recvs[x] <- from process x.
149    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,   // to worker-local typed pullers.
150}
151
152impl Allocate for TcpAllocator {
153    fn index(&self) -> usize { self.index }
154    fn peers(&self) -> usize { self.peers }
155    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
156
157        // Assume and enforce in-order identifier allocation.
158        if let Some(bound) = self.channel_id_bound {
159            assert!(bound < identifier);
160        }
161        self.channel_id_bound = Some(identifier);
162
163        // Result list of boxed pushers.
164        let mut pushes = Vec::<Box<dyn Push<T>>>::new();
165
166        // Inner exchange allocations.
167        let inner_peers = self.inner.peers();
168        let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
169
170        for target_index in 0 .. self.peers() {
171
172            // TODO: crappy place to hardcode this rule.
173            let mut process_id = target_index / inner_peers;
174
175            if process_id == self.index / inner_peers {
176                pushes.push(inner_sends.remove(0));
177            }
178            else {
179                // message header template.
180                let header = MessageHeader {
181                    channel:    identifier,
182                    source:     self.index,
183                    target_lower:     target_index,
184                    target_upper:     target_index + 1,
185                    length:     0,
186                    seqno:      0,
187                };
188
189                // create, box, and stash new process_binary pusher.
190                if process_id > self.index / inner_peers { process_id -= 1; }
191                pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[process_id]))));
192            }
193        }
194
195        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
196
197        use crate::allocator::counters::Puller as CountPuller;
198        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
199        let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
200
201        (pushes, puller, )
202    }
203
204    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
205
206        // Assume and enforce in-order identifier allocation.
207        if let Some(bound) = self.channel_id_bound {
208            assert!(bound < identifier);
209        }
210        self.channel_id_bound = Some(identifier);
211
212        // Result list of boxed pushers.
213        // One entry for each process.
214        let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
215
216        // Inner exchange allocations.
217        let inner_peers = self.inner.peers();
218        let (inner_send, inner_recv) = self.inner.broadcast(identifier);
219
220        pushes.push(inner_send);
221        for (mut index, send) in self.sends.iter().enumerate() {
222            // The span of worker indexes jumps by `inner_peers` as we skip our own process.
223            // We bump `index` by one as we pass `self.index/inner_peers` to effect this.
224            if index >= self.index/inner_peers { index += 1; }
225            let header = MessageHeader {
226                channel: identifier,
227                source: self.index,
228                target_lower: index * inner_peers,
229                target_upper: index * inner_peers + inner_peers,
230                length: 0,
231                seqno: 0,
232            };
233            pushes.push(Box::new(Pusher::new(header, Rc::clone(send))))
234        }
235
236        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
237
238        use crate::allocator::counters::Puller as CountPuller;
239        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
240        let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, Rc::clone(self.events())));
241
242        let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
243        (pushes, puller, )
244    }
245
246    // Perform preparatory work, most likely reading binary buffers from self.recv.
247    #[inline(never)]
248    fn receive(&mut self) {
249
250        // Check for channels whose `Puller` has been dropped.
251        let mut canaries = self.canaries.borrow_mut();
252        for dropped_channel in canaries.drain(..) {
253            let _dropped =
254            self.to_local
255                .remove(&dropped_channel)
256                .expect("non-existent channel dropped");
257            // Borrowed channels may be non-empty, if the dataflow was forcibly
258            // dropped. The contract is that if a dataflow is dropped, all other
259            // workers will drop the dataflow too, without blocking indefinitely
260            // on events from it.
261            // assert!(dropped.borrow().is_empty());
262        }
263        ::std::mem::drop(canaries);
264
265        self.inner.receive();
266
267        for recv in self.recvs.iter_mut() {
268            recv.drain_into(&mut self.staged);
269        }
270
271        let mut events = self.inner.events().borrow_mut();
272
273        for mut bytes in self.staged.drain(..) {
274
275            // We expect that `bytes` contains an integral number of messages.
276            // No splitting occurs across allocations.
277            while !bytes.is_empty() {
278
279                if let Some(header) = MessageHeader::try_read(&bytes[..]) {
280
281                    // Get the header and payload, ditch the header.
282                    let mut peel = bytes.extract_to(header.required_bytes());
283                    let _ = peel.extract_to(header.header_bytes());
284
285                    // Increment message count for channel.
286                    // Safe to do this even if the channel has been dropped.
287                    events.push(header.channel);
288
289                    // Ensure that a queue exists.
290                    match self.to_local.entry(header.channel) {
291                        Entry::Vacant(entry) => {
292                            // We may receive data before allocating, and shouldn't block.
293                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
294                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
295                                    .borrow_mut()
296                                    .push_back(peel);
297                            }
298                        }
299                        Entry::Occupied(mut entry) => {
300                            entry.get_mut().borrow_mut().push_back(peel);
301                        }
302                    }
303                }
304                else {
305                    println!("failed to read full header!");
306                }
307            }
308        }
309    }
310
311    // Perform postparatory work, most likely sending un-full binary buffers.
312    fn release(&mut self) {
313        // Publish outgoing byte ledgers.
314        for send in self.sends.iter_mut() {
315            send.borrow_mut().publish();
316        }
317
318        // OPTIONAL: Tattle on channels sitting on borrowed data.
319        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
320        // for (index, list) in self.to_local.iter() {
321        //     let len = list.borrow_mut().len();
322        //     if len > 0 {
323        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
324        //     }
325        // }
326    }
327    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
328        self.inner.events()
329    }
330    fn await_events(&self, duration: Option<std::time::Duration>) {
331        self.inner.await_events(duration);
332    }
333}