timely_communication/allocator/
mod.rs

1//! Types and traits for the allocation of channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6
7pub use self::thread::Thread;
8pub use self::process::Process;
9pub use self::generic::{Generic, GenericBuilder};
10
11pub mod thread;
12pub mod process;
13pub mod generic;
14
15pub mod canary;
16pub mod counters;
17
18pub mod zero_copy;
19
20use crate::{Bytesable, Push, Pull};
21
22/// A proto-allocator, which implements `Send` and can be completed with `build`.
23///
24/// This trait exists because some allocators contain elements that do not implement
25/// the `Send` trait, for example `Rc` wrappers for shared state. As such, what we
26/// actually need to create to initialize a computation are builders, which we can
27/// then move into new threads each of which then construct their actual allocator.
28pub trait AllocateBuilder : Send {
29    /// The type of allocator to be built.
30    type Allocator: Allocate;
31    /// Builds allocator, consumes self.
32    fn build(self) -> Self::Allocator;
33}
34
35use std::any::Any;
36
37/// A type that can be sent along an allocated channel.
38pub trait Exchangeable : Send+Any+Bytesable { }
39impl<T: Send+Any+Bytesable> Exchangeable for T { }
40
41/// A type capable of allocating channels.
42///
43/// There is some feature creep, in that this contains several convenience methods about the nature
44/// of the allocated channels, and maintenance methods to ensure that they move records around.
45pub trait Allocate {
46    /// The index of the worker out of `(0..self.peers())`.
47    fn index(&self) -> usize;
48    /// The number of workers in the communication group.
49    fn peers(&self) -> usize;
50    /// Constructs several send endpoints and one receive endpoint.
51    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
52    /// A shared queue of communication events with channel identifier.
53    ///
54    /// It is expected that users of the channel allocator will regularly
55    /// drain these events in order to drive their computation. If they
56    /// fail to do so the event queue may become quite large, and turn
57    /// into a performance problem.
58    fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
59
60    /// Awaits communication events.
61    ///
62    /// This method may park the current thread, for at most `duration`,
63    /// until new events arrive.
64    /// The method is not guaranteed to wait for any amount of time, but
65    /// good implementations should use this as a hint to park the thread.
66    fn await_events(&self, _duration: Option<Duration>) { }
67
68    /// Ensure that received messages are surfaced in each channel.
69    ///
70    /// This method should be called to ensure that received messages are
71    /// surfaced in each channel, but failing to call the method does not
72    /// ensure that they are not surfaced.
73    ///
74    /// Generally, this method is the indication that the allocator should
75    /// present messages contained in otherwise scarce resources (for example
76    /// network buffers), under the premise that someone is about to consume
77    /// the messages and release the resources.
78    fn receive(&mut self) { }
79
80    /// Signal the completion of a batch of reads from channels.
81    ///
82    /// Conventionally, this method signals to the communication fabric
83    /// that the worker is taking a break from reading from channels, and
84    /// the fabric should consider re-acquiring scarce resources. This can
85    /// lead to the fabric performing defensive copies out of un-consumed
86    /// buffers, and can be a performance problem if invoked casually.
87    fn release(&mut self) { }
88
89    /// Constructs a pipeline channel from the worker to itself.
90    ///
91    /// By default, this method uses the thread-local channel constructor
92    /// based on a shared `VecDeque` which updates the event queue.
93    fn pipeline<T: 'static>(&mut self, identifier: usize) ->
94        (thread::ThreadPusher<T>,
95         thread::ThreadPuller<T>)
96    {
97        thread::Thread::new_from(identifier, Rc::clone(self.events()))
98    }
99
100    /// Allocates a broadcast channel, where each pushed message is received by all.
101    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
102        let (pushers, pull) = self.allocate(identifier);
103        (Box::new(Broadcaster { spare: None, pushers }), pull)
104    }
105}
106
107/// An adapter to broadcast any pushed element.
108struct Broadcaster<T> {
109    /// Spare element for defensive copies.
110    spare: Option<T>,
111    /// Destinations to which pushed elements should be broadcast.
112    pushers: Vec<Box<dyn Push<T>>>,
113}
114
115impl<T: Clone> Push<T> for Broadcaster<T> {
116    fn push(&mut self, element: &mut Option<T>) {
117        // Push defensive copies to pushers after the first.
118        for pusher in self.pushers.iter_mut().skip(1) {
119            self.spare.clone_from(element);
120            pusher.push(&mut self.spare);
121        }
122        // Push the element itself at the first pusher.
123        for pusher in self.pushers.iter_mut().take(1) {
124            pusher.push(element);
125        }
126    }
127}
128
129use crate::allocator::zero_copy::bytes_slab::BytesRefill;
130
131/// A builder for vectors of peers.
132pub trait PeerBuilder {
133    /// The peer type.
134    type Peer: AllocateBuilder + Sized;
135    /// Allocate a list of `Self::Peer` of length `peers`.
136    fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
137}