timely_communication/allocator/zero_copy/
allocator_process.rs

1//! Zero-copy allocator for intra-process serialized communication.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use std::sync::mpsc::{Sender, Receiver};
7
8use timely_bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Push, Pull};
13use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
14use crate::allocator::canary::Canary;
15use crate::allocator::zero_copy::bytes_slab::BytesRefill;
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17
18use super::push_pull::{Pusher, Puller};
19
20/// Builds an instance of a ProcessAllocator.
21///
22/// Builders are required because some of the state in a `ProcessAllocator` cannot be sent between
23/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
24/// shared between threads here, and then provide a method that will instantiate the non-movable
25/// members once in the destination thread.
26pub struct ProcessBuilder {
27    index:  usize,                      // number out of peers
28    peers:  usize,                      // number of peer allocators.
29    pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
30    pullers: Vec<Sender<MergeQueue>>,   // for pulling bytes from other workers.
31    refill: BytesRefill,
32}
33
34impl PeerBuilder for ProcessBuilder {
35    type Peer = ProcessBuilder;
36    /// Creates a vector of builders, sharing appropriate state.
37    ///
38    /// This method requires access to a byte exchanger, from which it mints channels.
39    fn new_vector(count: usize, refill: BytesRefill) -> Vec<ProcessBuilder> {
40
41        // Channels for the exchange of `MergeQueue` endpoints.
42        let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
43
44        pushers_vec
45            .into_iter()
46            .zip(pullers_vec)
47            .enumerate()
48            .map(|(index, (pushers, pullers))|
49                ProcessBuilder {
50                    index,
51                    peers: count,
52                    pushers,
53                    pullers,
54                    refill: refill.clone(),
55                }
56            )
57            .collect()
58    }
59}
60
61impl ProcessBuilder {
62    /// Builds a `ProcessAllocator`, instantiating `Rc<RefCell<_>>` elements.
63    pub fn build(self) -> ProcessAllocator {
64
65        // Fulfill puller obligations.
66        let mut recvs = Vec::with_capacity(self.peers);
67        for puller in self.pullers.into_iter() {
68            let buzzer = crate::buzzer::Buzzer::default();
69            let queue = MergeQueue::new(buzzer);
70            puller.send(queue.clone()).expect("Failed to send MergeQueue");
71            recvs.push(queue.clone());
72        }
73
74        // Extract pusher commitments.
75        let mut sends = Vec::with_capacity(self.peers);
76        for pusher in self.pushers.into_iter() {
77            let queue = pusher.recv().expect("Failed to receive MergeQueue");
78            let sendpoint = SendEndpoint::new(queue, self.refill.clone());
79            sends.push(Rc::new(RefCell::new(sendpoint)));
80        }
81
82        ProcessAllocator {
83            index: self.index,
84            peers: self.peers,
85            events: Rc::new(RefCell::new(Default::default())),
86            canaries: Rc::new(RefCell::new(Vec::new())),
87            channel_id_bound: None,
88            staged: Vec::new(),
89            sends,
90            recvs,
91            to_local: HashMap::new(),
92        }
93    }
94}
95
96impl AllocateBuilder for ProcessBuilder {
97    type Allocator = ProcessAllocator;
98    /// Builds allocator, consumes self.
99    fn build(self) -> Self::Allocator {
100        self.build()
101    }
102
103}
104
105/// A serializing allocator for inter-thread intra-process communication.
106pub struct ProcessAllocator {
107
108    index:      usize,                              // number out of peers
109    peers:      usize,                              // number of peer allocators (for typed channel allocation).
110
111    events: Rc<RefCell<Vec<usize>>>,
112
113    canaries: Rc<RefCell<Vec<usize>>>,
114
115    channel_id_bound: Option<usize>,
116
117    // sending, receiving, and responding to binary buffers.
118    staged:     Vec<Bytes>,
119    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
120    recvs:      Vec<MergeQueue>,                            // recvs[x] <- from thread x.
121    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,          // to worker-local typed pullers.
122}
123
124impl Allocate for ProcessAllocator {
125    fn index(&self) -> usize { self.index }
126    fn peers(&self) -> usize { self.peers }
127    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
128
129        // Assume and enforce in-order identifier allocation.
130        if let Some(bound) = self.channel_id_bound {
131            assert!(bound < identifier);
132        }
133        self.channel_id_bound = Some(identifier);
134
135        let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());
136
137        for target_index in 0 .. self.peers() {
138
139            // message header template.
140            let header = MessageHeader {
141                channel:    identifier,
142                source:     self.index,
143                target_lower:     target_index,
144                target_upper:     target_index+1,
145                length:     0,
146                seqno:      0,
147            };
148
149            // create, box, and stash new process_binary pusher.
150            pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[target_index]))));
151        }
152
153        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
154
155        use crate::allocator::counters::Puller as CountPuller;
156        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
157        let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, Rc::clone(self.events())));
158
159        (pushes, puller)
160    }
161
162    // Perform preparatory work, most likely reading binary buffers from self.recv.
163    #[inline(never)]
164    fn receive(&mut self) {
165
166        // Check for channels whose `Puller` has been dropped.
167        let mut canaries = self.canaries.borrow_mut();
168        for dropped_channel in canaries.drain(..) {
169            let _dropped =
170            self.to_local
171                .remove(&dropped_channel)
172                .expect("non-existent channel dropped");
173            // Borrowed channels may be non-empty, if the dataflow was forcibly
174            // dropped. The contract is that if a dataflow is dropped, all other
175            // workers will drop the dataflow too, without blocking indefinitely
176            // on events from it.
177            // assert!(dropped.borrow().is_empty());
178        }
179        std::mem::drop(canaries);
180
181        let mut events = self.events.borrow_mut();
182
183        for recv in self.recvs.iter_mut() {
184            recv.drain_into(&mut self.staged);
185        }
186
187        for mut bytes in self.staged.drain(..) {
188
189            // We expect that `bytes` contains an integral number of messages.
190            // No splitting occurs across allocations.
191            while bytes.len() > 0 {
192
193                if let Some(header) = MessageHeader::try_read(&bytes[..]) {
194
195                    // Get the header and payload, ditch the header.
196                    let mut peel = bytes.extract_to(header.required_bytes());
197                    let _ = peel.extract_to(header.header_bytes());
198
199                    // Increment message count for channel.
200                    // Safe to do this even if the channel has been dropped.
201                    events.push(header.channel);
202
203                    // Ensure that a queue exists.
204                    match self.to_local.entry(header.channel) {
205                        Entry::Vacant(entry) => {
206                            // We may receive data before allocating, and shouldn't block.
207                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
208                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
209                                    .borrow_mut()
210                                    .push_back(peel);
211                            }
212                        }
213                        Entry::Occupied(mut entry) => {
214                            entry.get_mut().borrow_mut().push_back(peel);
215                        }
216                    }
217                }
218                else {
219                    println!("failed to read full header!");
220                }
221            }
222        }
223    }
224
225    // Perform postparatory work, most likely sending un-full binary buffers.
226    fn release(&mut self) {
227        // Publish outgoing byte ledgers.
228        for send in self.sends.iter_mut() {
229            send.borrow_mut().publish();
230        }
231
232        // OPTIONAL: Tattle on channels sitting on borrowed data.
233        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
234        // for (index, list) in self.to_local.iter() {
235        //     let len = list.borrow_mut().len();
236        //     if len > 0 {
237        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
238        //     }
239        // }
240    }
241
242    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
243        &self.events
244    }
245    fn await_events(&self, duration: Option<std::time::Duration>) {
246        if self.events.borrow().is_empty() {
247            if let Some(duration) = duration {
248                std::thread::park_timeout(duration);
249            }
250            else {
251                std::thread::park();
252            }
253        }
254    }
255}