Skip to main content

timely_communication/allocator/
process.rs

1//! Typed inter-thread, intra-process channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::sync::{Arc, Mutex};
6use std::any::Any;
7use std::time::Duration;
8use std::collections::{HashMap};
9use std::sync::mpsc::{Sender, Receiver};
10
11use crate::allocator::thread::{ThreadBuilder};
12use crate::allocator::{Allocate, AllocateBuilder, PeerBuilder, Thread};
13use crate::{Push, Pull};
14use crate::buzzer::Buzzer;
15
16/// An allocator for inter-thread, intra-process communication
17pub struct ProcessBuilder {
18    inner: ThreadBuilder,
19    index: usize,
20    peers: usize,
21    // below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
22    channels: Arc<Mutex<HashMap<usize, Box<dyn Any+Send>>>>,
23
24    // Buzzers for waking other local workers.
25    buzzers_send: Vec<Sender<Buzzer>>,
26    buzzers_recv: Vec<Receiver<Buzzer>>,
27
28    counters_send: Vec<Sender<usize>>,
29    counters_recv: Receiver<usize>,
30}
31
32impl AllocateBuilder for ProcessBuilder {
33    type Allocator = Process;
34    fn build(self) -> Self::Allocator {
35
36        // Initialize buzzers; send first, then recv.
37        for worker in self.buzzers_send.iter() {
38            let buzzer = Buzzer::default();
39            worker.send(buzzer).expect("Failed to send buzzer");
40        }
41        let mut buzzers = Vec::with_capacity(self.buzzers_recv.len());
42        for worker in self.buzzers_recv.iter() {
43            buzzers.push(worker.recv().expect("Failed to recv buzzer"));
44        }
45
46        Process {
47            inner: self.inner.build(),
48            index: self.index,
49            peers: self.peers,
50            channels: self.channels,
51            buzzers,
52            counters_send: self.counters_send,
53            counters_recv: self.counters_recv,
54        }
55    }
56}
57
58/// An allocator for inter-thread, intra-process communication
59pub struct Process {
60    inner: Thread,
61    index: usize,
62    peers: usize,
63    // below: `Box<Any+Send>` is a `Box<Vec<Option<(Vec<Sender<T>>, Receiver<T>)>>>`
64    channels: Arc<Mutex<HashMap</* channel id */ usize, Box<dyn Any+Send>>>>,
65    buzzers: Vec<Buzzer>,
66    counters_send: Vec<Sender<usize>>,
67    counters_recv: Receiver<usize>,
68}
69
70impl Process {
71    /// Access the wrapped inner allocator.
72    pub fn inner(&mut self) -> &mut Thread { &mut self.inner }
73}
74
75impl PeerBuilder for Process {
76    type Peer = ProcessBuilder;
77    /// Allocate a list of connected intra-process allocators.
78    fn new_vector(
79        peers: usize,
80        _refill: crate::allocator::BytesRefill,
81        _spill: Option<crate::allocator::zero_copy::spill::SpillPolicyFn>,
82    ) -> Vec<ProcessBuilder> {
83
84        let mut counters_send = Vec::with_capacity(peers);
85        let mut counters_recv = Vec::with_capacity(peers);
86        for _ in 0 .. peers {
87            let (send, recv) = std::sync::mpsc::channel();
88            counters_send.push(send);
89            counters_recv.push(recv);
90        }
91
92        let channels = Arc::new(Mutex::new(HashMap::with_capacity(peers)));
93
94        // Allocate matrix of buzzer send and recv endpoints.
95        let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers);
96
97        counters_recv
98            .into_iter()
99            .zip(buzzers_send)
100            .zip(buzzers_recv)
101            .enumerate()
102            .map(|(index, ((recv, bsend), brecv))| {
103                ProcessBuilder {
104                    inner: ThreadBuilder,
105                    index,
106                    peers,
107                    buzzers_send: bsend,
108                    buzzers_recv: brecv,
109                    channels: Arc::clone(&channels),
110                    counters_send: counters_send.clone(),
111                    counters_recv: recv,
112                }
113            })
114            .collect()
115    }
116}
117
118impl Allocate for Process {
119    fn index(&self) -> usize { self.index }
120    fn peers(&self) -> usize { self.peers }
121    fn allocate<T: Any+Send>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
122
123        // this is race-y global initialisation of all channels for all workers, performed by the
124        // first worker that enters this critical section
125
126        // ensure exclusive access to shared list of channels
127        let mut channels = self.channels.lock().expect("mutex error?");
128
129        let (sends, recv, empty) = {
130
131            // we may need to alloc a new channel ...
132            let entry = channels.entry(identifier).or_insert_with(|| {
133
134                let mut pushers = Vec::with_capacity(self.peers);
135                let mut pullers = Vec::with_capacity(self.peers);
136                for buzzer in self.buzzers.iter() {
137                    let (s, r): (Sender<T>, Receiver<T>) = std::sync::mpsc::channel();
138                    // TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
139                    pushers.push((Pusher { target: s }, buzzer.clone()));
140                    pullers.push(Puller { source: r, current: None });
141                }
142
143                let mut to_box = Vec::with_capacity(pullers.len());
144                for recv in pullers.into_iter() {
145                    to_box.push(Some((pushers.clone(), recv)));
146                }
147
148                Box::new(to_box)
149            });
150
151            let vector =
152            entry
153                .downcast_mut::<Vec<Option<(Vec<(Pusher<T>, Buzzer)>, Puller<T>)>>>()
154                .expect("failed to correctly cast channel");
155
156            let (sends, recv) =
157            vector[self.index]
158                .take()
159                .expect("channel already consumed");
160
161            let empty = vector.iter().all(|x| x.is_none());
162
163            (sends, recv, empty)
164        };
165
166        // send is a vec of all senders, recv is this worker's receiver
167
168        if empty { channels.remove(&identifier); }
169
170        use crate::allocator::counters::ArcPusher as CountPusher;
171        use crate::allocator::counters::Puller as CountPuller;
172
173        let sends =
174        sends.into_iter()
175             .zip(self.counters_send.iter())
176             .map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
177             .map(|s| Box::new(s) as Box<dyn Push<T>>)
178             .collect::<Vec<_>>();
179
180        let recv = Box::new(CountPuller::new(recv, identifier, Rc::clone(self.inner.events()))) as Box<dyn Pull<T>>;
181
182        (sends, recv)
183    }
184
185    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
186        self.inner.events()
187    }
188
189    fn await_events(&self, duration: Option<Duration>) {
190        self.inner.await_events(duration);
191    }
192
193    fn receive(&mut self) {
194        let mut events = self.inner.events().borrow_mut();
195        while let Ok(index) = self.counters_recv.try_recv() {
196            events.push(index);
197        }
198    }
199}
200
201/// The push half of an intra-process channel.
202struct Pusher<T> {
203    target: Sender<T>,
204}
205
206impl<T> Clone for Pusher<T> {
207    fn clone(&self) -> Self {
208        Self {
209            target: self.target.clone(),
210        }
211    }
212}
213
214impl<T> Push<T> for Pusher<T> {
215    #[inline] fn push(&mut self, element: &mut Option<T>) {
216        if let Some(element) = element.take() {
217            // The remote endpoint could be shut down, and so
218            // it is not fundamentally an error to fail to send.
219            let _ = self.target.send(element);
220        }
221    }
222}
223
224/// The pull half of an intra-process channel.
225struct Puller<T> {
226    current: Option<T>,
227    source: Receiver<T>,
228}
229
230impl<T> Pull<T> for Puller<T> {
231    #[inline]
232    fn pull(&mut self) -> &mut Option<T> {
233        self.current = self.source.try_recv().ok();
234        &mut self.current
235    }
236}