Skip to main content

timely_communication/allocator/zero_copy/
allocator_process.rs

1//! Zero-copy allocator for intra-process serialized communication.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use std::sync::mpsc::{Sender, Receiver};
7
8use timely_bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Push, Pull};
13use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
14use crate::allocator::canary::Canary;
15use crate::allocator::zero_copy::bytes_slab::BytesRefill;
16use crate::allocator::zero_copy::spill::SpillPolicyFn;
17use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
18
19use super::push_pull::{Pusher, Puller};
20
21/// Builds an instance of a ProcessAllocator.
22///
23/// Builders are required because some of the state in a `ProcessAllocator` cannot be sent between
24/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
25/// shared between threads here, and then provide a method that will instantiate the non-movable
26/// members once in the destination thread.
27pub struct ProcessBuilder {
28    index:  usize,                      // number out of peers
29    peers:  usize,                      // number of peer allocators.
30    pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
31    pullers: Vec<Sender<MergeQueue>>,   // for pulling bytes from other workers.
32    refill: BytesRefill,
33    spill: Option<SpillPolicyFn>,       // optional spill factory for recv queues.
34}
35
36impl PeerBuilder for ProcessBuilder {
37    type Peer = ProcessBuilder;
38    /// Creates a vector of builders, sharing appropriate state.
39    ///
40    /// This method requires access to a byte exchanger, from which it mints channels.
41    fn new_vector(count: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<ProcessBuilder> {
42
43        // Channels for the exchange of `MergeQueue` endpoints.
44        let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
45
46        pushers_vec
47            .into_iter()
48            .zip(pullers_vec)
49            .enumerate()
50            .map(|(index, (pushers, pullers))|
51                ProcessBuilder {
52                    index,
53                    peers: count,
54                    pushers,
55                    pullers,
56                    refill: refill.clone(),
57                    spill: spill.clone(),
58                }
59            )
60            .collect()
61    }
62}
63
64impl ProcessBuilder {
65    /// Builds a `ProcessAllocator`, instantiating `Rc<RefCell<_>>` elements.
66    pub fn build(self) -> ProcessAllocator {
67
68        // Fulfill puller obligations.
69        let mut recvs = Vec::with_capacity(self.peers);
70        for puller in self.pullers.into_iter() {
71            let buzzer = crate::buzzer::Buzzer::default();
72            let (writer, reader) = match self.spill.as_ref() {
73                Some(build_fn) => {
74                    let (w, r) = build_fn();
75                    MergeQueue::new_pair(buzzer, Some(w), Some(r))
76                }
77                None => MergeQueue::new_pair(buzzer, None, None),
78            };
79            // The puller side is the writer here (the producing worker
80            // `extend`s into it); this builder owns the reader handle.
81            recvs.push(reader);
82            puller.send(writer).expect("Failed to send MergeQueue");
83        }
84
85        // Extract pusher commitments.
86        let mut sends = Vec::with_capacity(self.peers);
87        for pusher in self.pushers.into_iter() {
88            let queue = pusher.recv().expect("Failed to receive MergeQueue");
89            let sendpoint = SendEndpoint::new(queue, self.refill.clone());
90            sends.push(Rc::new(RefCell::new(sendpoint)));
91        }
92
93        ProcessAllocator {
94            index: self.index,
95            peers: self.peers,
96            events: Rc::new(RefCell::new(Default::default())),
97            canaries: Rc::new(RefCell::new(Vec::new())),
98            channel_id_bound: None,
99            staged: Vec::new(),
100            sends,
101            recvs,
102            to_local: HashMap::new(),
103        }
104    }
105}
106
107impl AllocateBuilder for ProcessBuilder {
108    type Allocator = ProcessAllocator;
109    /// Builds allocator, consumes self.
110    fn build(self) -> Self::Allocator {
111        self.build()
112    }
113
114}
115
116/// A serializing allocator for inter-thread intra-process communication.
117pub struct ProcessAllocator {
118
119    index:      usize,                              // number out of peers
120    peers:      usize,                              // number of peer allocators (for typed channel allocation).
121
122    events: Rc<RefCell<Vec<usize>>>,
123
124    canaries: Rc<RefCell<Vec<usize>>>,
125
126    channel_id_bound: Option<usize>,
127
128    // sending, receiving, and responding to binary buffers.
129    staged:     Vec<Bytes>,
130    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
131    recvs:      Vec<MergeQueue>,                            // recvs[x] <- from thread x.
132    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,          // to worker-local typed pullers.
133}
134
135impl Allocate for ProcessAllocator {
136    fn index(&self) -> usize { self.index }
137    fn peers(&self) -> usize { self.peers }
138    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
139
140        // Assume and enforce in-order identifier allocation.
141        if let Some(bound) = self.channel_id_bound {
142            assert!(bound < identifier);
143        }
144        self.channel_id_bound = Some(identifier);
145
146        let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());
147
148        for target_index in 0 .. self.peers() {
149
150            // message header template.
151            let header = MessageHeader {
152                channel:    identifier,
153                source:     self.index,
154                target_lower:     target_index,
155                target_upper:     target_index+1,
156                length:     0,
157                seqno:      0,
158            };
159
160            // create, box, and stash new process_binary pusher.
161            pushes.push(Box::new(Pusher::new(header, Rc::clone(&self.sends[target_index]))));
162        }
163
164        let channel = Rc::clone(self.to_local.entry(identifier).or_default());
165
166        use crate::allocator::counters::Puller as CountPuller;
167        let canary = Canary::new(identifier, Rc::clone(&self.canaries));
168        let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, Rc::clone(self.events())));
169
170        (pushes, puller)
171    }
172
173    // Perform preparatory work, most likely reading binary buffers from self.recv.
174    #[inline(never)]
175    fn receive(&mut self) {
176
177        // Check for channels whose `Puller` has been dropped.
178        let mut canaries = self.canaries.borrow_mut();
179        for dropped_channel in canaries.drain(..) {
180            let _dropped =
181            self.to_local
182                .remove(&dropped_channel)
183                .expect("non-existent channel dropped");
184            // Borrowed channels may be non-empty, if the dataflow was forcibly
185            // dropped. The contract is that if a dataflow is dropped, all other
186            // workers will drop the dataflow too, without blocking indefinitely
187            // on events from it.
188            // assert!(dropped.borrow().is_empty());
189        }
190        std::mem::drop(canaries);
191
192        let mut events = self.events.borrow_mut();
193
194        for recv in self.recvs.iter_mut() {
195            recv.drain_into(&mut self.staged);
196        }
197
198        for mut bytes in self.staged.drain(..) {
199
200            // We expect that `bytes` contains an integral number of messages.
201            // No splitting occurs across allocations.
202            while !bytes.is_empty() {
203
204                if let Some(header) = MessageHeader::try_read(&bytes[..]) {
205
206                    // Get the header and payload, ditch the header.
207                    let mut peel = bytes.extract_to(header.required_bytes());
208                    let _ = peel.extract_to(header.header_bytes());
209
210                    // Increment message count for channel.
211                    // Safe to do this even if the channel has been dropped.
212                    events.push(header.channel);
213
214                    // Ensure that a queue exists.
215                    match self.to_local.entry(header.channel) {
216                        Entry::Vacant(entry) => {
217                            // We may receive data before allocating, and shouldn't block.
218                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
219                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
220                                    .borrow_mut()
221                                    .push_back(peel);
222                            }
223                        }
224                        Entry::Occupied(mut entry) => {
225                            entry.get_mut().borrow_mut().push_back(peel);
226                        }
227                    }
228                }
229                else {
230                    println!("failed to read full header!");
231                }
232            }
233        }
234    }
235
236    // Perform postparatory work, most likely sending un-full binary buffers.
237    fn release(&mut self) {
238        // Publish outgoing byte ledgers.
239        for send in self.sends.iter_mut() {
240            send.borrow_mut().publish();
241        }
242
243        // OPTIONAL: Tattle on channels sitting on borrowed data.
244        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
245        // for (index, list) in self.to_local.iter() {
246        //     let len = list.borrow_mut().len();
247        //     if len > 0 {
248        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
249        //     }
250        // }
251    }
252
253    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
254        &self.events
255    }
256    fn await_events(&self, duration: Option<std::time::Duration>) {
257        if self.events.borrow().is_empty() {
258            if let Some(duration) = duration {
259                std::thread::park_timeout(duration);
260            }
261            else {
262                std::thread::park();
263            }
264        }
265    }
266}