Skip to main content

timely_communication/allocator/
mod.rs

1//! Types and traits for the allocation of channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6
7pub use self::thread::Thread;
8pub use self::generic::{Allocator, AllocatorBuilder};
9
10pub mod thread;
11pub mod process;
12pub mod generic;
13
14pub mod canary;
15pub mod counters;
16
17pub mod zero_copy;
18
19use crate::{Bytesable, Push, Pull};
20use crate::allocator::process::{Process as TypedProcess, ProcessBuilder as TypedProcessBuilder};
21use crate::allocator::zero_copy::allocator_process::{ProcessAllocator as BytesProcess, ProcessBuilder as BytesProcessBuilder};
22
23/// A proto-allocator, which implements `Send` and can be completed with `build`.
24///
25/// This trait exists because some allocators contain elements that do not implement
26/// the `Send` trait, for example `Rc` wrappers for shared state. As such, what we
27/// actually need to create to initialize a computation are builders, which we can
28/// then move into new threads each of which then construct their actual allocator.
29pub(crate) trait AllocateBuilder : Send {
30    /// The type of allocator to be built.
31    type Allocator: Allocate;
32    /// Builds allocator, consumes self.
33    fn build(self) -> Self::Allocator;
34}
35
36use std::any::Any;
37
38/// A type that can be sent along an allocated channel.
39pub trait Exchangeable : Send+Any+Bytesable { }
40impl<T: Send+Any+Bytesable> Exchangeable for T { }
41
42/// A type capable of allocating channels.
43///
44/// There is some feature creep, in that this contains several convenience methods about the nature
45/// of the allocated channels, and maintenance methods to ensure that they move records around.
46pub(crate) trait Allocate {
47    /// The index of the worker out of `(0..self.peers())`.
48    fn index(&self) -> usize;
49    /// The number of workers in the communication group.
50    fn peers(&self) -> usize;
51    /// Constructs several send endpoints and one receive endpoint.
52    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
53    /// A shared queue of communication events with channel identifier.
54    ///
55    /// It is expected that users of the channel allocator will regularly
56    /// drain these events in order to drive their computation. If they
57    /// fail to do so the event queue may become quite large, and turn
58    /// into a performance problem.
59    fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
60
61    /// Awaits communication events.
62    ///
63    /// This method may park the current thread, for at most `duration`,
64    /// until new events arrive.
65    /// The method is not guaranteed to wait for any amount of time, but
66    /// good implementations should use this as a hint to park the thread.
67    fn await_events(&self, _duration: Option<Duration>) { }
68
69    /// Ensure that received messages are surfaced in each channel.
70    ///
71    /// This method should be called to ensure that received messages are
72    /// surfaced in each channel, but failing to call the method does not
73    /// ensure that they are not surfaced.
74    ///
75    /// Generally, this method is the indication that the allocator should
76    /// present messages contained in otherwise scarce resources (for example
77    /// network buffers), under the premise that someone is about to consume
78    /// the messages and release the resources.
79    fn receive(&mut self) { }
80
81    /// Signal the completion of a batch of reads from channels.
82    ///
83    /// Conventionally, this method signals to the communication fabric
84    /// that the worker is taking a break from reading from channels, and
85    /// the fabric should consider re-acquiring scarce resources. This can
86    /// lead to the fabric performing defensive copies out of un-consumed
87    /// buffers, and can be a performance problem if invoked casually.
88    fn release(&mut self) { }
89
90    /// Allocates a broadcast channel, where each pushed message is received by all.
91    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
92        let (pushers, pull) = self.allocate(identifier);
93        (Box::new(Broadcaster { spare: None, pushers }), pull)
94    }
95}
96
97/// An adapter to broadcast any pushed element.
98struct Broadcaster<T> {
99    /// Spare element for defensive copies.
100    spare: Option<T>,
101    /// Destinations to which pushed elements should be broadcast.
102    pushers: Vec<Box<dyn Push<T>>>,
103}
104
105impl<T: Clone> Push<T> for Broadcaster<T> {
106    fn push(&mut self, element: &mut Option<T>) {
107        // Push defensive copies to pushers after the first.
108        for pusher in self.pushers.iter_mut().skip(1) {
109            self.spare.clone_from(element);
110            pusher.push(&mut self.spare);
111        }
112        // Push the element itself at the first pusher.
113        for pusher in self.pushers.iter_mut().take(1) {
114            pusher.push(element);
115        }
116    }
117}
118
119use crate::allocator::zero_copy::bytes_slab::BytesRefill;
120
121/// A builder for vectors of peers.
122pub(crate) trait PeerBuilder {
123    /// The peer type.
124    type Peer: AllocateBuilder + Sized;
125    /// Allocate a list of `Self::Peer` of length `peers`.
126    fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
127}
128
129
130/// Two flavors of intra-process allocator builder.
131#[non_exhaustive]
132pub enum ProcessBuilder {
133    /// Regular intra-process allocator (mpsc-based).
134    Typed(TypedProcessBuilder),
135    /// Binary intra-process allocator (zero-copy serialized).
136    Bytes(BytesProcessBuilder),
137}
138
139impl ProcessBuilder {
140    /// Builds the runtime allocator from this builder.
141    pub fn build(self) -> Process {
142        match self {
143            ProcessBuilder::Typed(t) => Process::Typed(t.build()),
144            ProcessBuilder::Bytes(b) => Process::Bytes(b.build()),
145        }
146    }
147
148    /// Constructs a vector of regular (mpsc-based, "Typed") intra-process builders.
149    pub fn new_typed_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
150        <TypedProcess as PeerBuilder>::new_vector(peers, refill)
151            .into_iter()
152            .map(ProcessBuilder::Typed)
153            .collect()
154    }
155
156    /// Constructs a vector of binary (zero-copy serialized, "Bytes") intra-process builders.
157    pub fn new_bytes_vector(peers: usize, refill: BytesRefill) -> Vec<Self> {
158        <BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill)
159            .into_iter()
160            .map(ProcessBuilder::Bytes)
161            .collect()
162    }
163}
164
165/// The runtime counterpart of `ProcessBuilder`: the actual constructed inner allocator.
166///
167/// Inherent methods mirror the subset of `Allocate` that `TcpAllocator` needs from its inner.
168#[non_exhaustive]
169pub enum Process {
170    /// Regular intra-process allocator.
171    Typed(TypedProcess),
172    /// Binary intra-process allocator.
173    Bytes(BytesProcess),
174}
175
176impl Process {
177    pub(crate) fn index(&self) -> usize {
178        match self {
179            Process::Typed(p) => p.index(),
180            Process::Bytes(pb) => pb.index(),
181        }
182    }
183    pub(crate) fn peers(&self) -> usize {
184        match self {
185            Process::Typed(p) => p.peers(),
186            Process::Bytes(pb) => pb.peers(),
187        }
188    }
189    pub(crate) fn allocate<T: Exchangeable>(&mut self, identifier: usize)
190        -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>)
191    {
192        match self {
193            Process::Typed(p) => p.allocate(identifier),
194            Process::Bytes(pb) => pb.allocate(identifier),
195        }
196    }
197    pub(crate) fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize)
198        -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
199    {
200        match self {
201            Process::Typed(p) => p.broadcast(identifier),
202            Process::Bytes(pb) => pb.broadcast(identifier),
203        }
204    }
205    pub(crate) fn receive(&mut self) {
206        match self {
207            Process::Typed(p) => p.receive(),
208            Process::Bytes(pb) => pb.receive(),
209        }
210    }
211    pub(crate) fn release(&mut self) {
212        match self {
213            Process::Typed(p) => p.release(),
214            Process::Bytes(pb) => pb.release(),
215        }
216    }
217    pub(crate) fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
218        match self {
219            Process::Typed(p) => p.events(),
220            Process::Bytes(pb) => pb.events(),
221        }
222    }
223    pub(crate) fn await_events(&self, duration: Option<std::time::Duration>) {
224        match self {
225            Process::Typed(p) => p.await_events(duration),
226            Process::Bytes(pb) => pb.await_events(duration),
227        }
228    }
229}