Skip to main content

timely_communication/allocator/
mod.rs

1//! Types and traits for the allocation of channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6
7pub use self::thread::Thread;
8pub use self::generic::{Allocator, AllocatorBuilder};
9
10pub mod thread;
11pub mod process;
12pub mod generic;
13
14pub mod canary;
15pub mod counters;
16
17pub mod zero_copy;
18
19use crate::{Bytesable, Push, Pull};
20use crate::allocator::process::{Process as TypedProcess, ProcessBuilder as TypedProcessBuilder};
21use crate::allocator::zero_copy::allocator_process::{ProcessAllocator as BytesProcess, ProcessBuilder as BytesProcessBuilder};
22
23/// A proto-allocator, which implements `Send` and can be completed with `build`.
24///
25/// This trait exists because some allocators contain elements that do not implement
26/// the `Send` trait, for example `Rc` wrappers for shared state. As such, what we
27/// actually need to create to initialize a computation are builders, which we can
28/// then move into new threads each of which then construct their actual allocator.
29pub(crate) trait AllocateBuilder : Send {
30    /// The type of allocator to be built.
31    type Allocator: Allocate;
32    /// Builds allocator, consumes self.
33    fn build(self) -> Self::Allocator;
34}
35
36use std::any::Any;
37
38/// A type that can be sent along an allocated channel.
39pub trait Exchangeable : Send+Any+Bytesable { }
40impl<T: Send+Any+Bytesable> Exchangeable for T { }
41
42/// A type capable of allocating channels.
43///
44/// There is some feature creep, in that this contains several convenience methods about the nature
45/// of the allocated channels, and maintenance methods to ensure that they move records around.
46pub(crate) trait Allocate {
47    /// The index of the worker out of `(0..self.peers())`.
48    fn index(&self) -> usize;
49    /// The number of workers in the communication group.
50    fn peers(&self) -> usize;
51    /// Constructs several send endpoints and one receive endpoint.
52    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
53    /// A shared queue of communication events with channel identifier.
54    ///
55    /// It is expected that users of the channel allocator will regularly
56    /// drain these events in order to drive their computation. If they
57    /// fail to do so the event queue may become quite large, and turn
58    /// into a performance problem.
59    fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
60
61    /// Awaits communication events.
62    ///
63    /// This method may park the current thread, for at most `duration`,
64    /// until new events arrive.
65    /// The method is not guaranteed to wait for any amount of time, but
66    /// good implementations should use this as a hint to park the thread.
67    fn await_events(&self, _duration: Option<Duration>) { }
68
69    /// Ensure that received messages are surfaced in each channel.
70    ///
71    /// This method should be called to ensure that received messages are
72    /// surfaced in each channel, but failing to call the method does not
73    /// ensure that they are not surfaced.
74    ///
75    /// Generally, this method is the indication that the allocator should
76    /// present messages contained in otherwise scarce resources (for example
77    /// network buffers), under the premise that someone is about to consume
78    /// the messages and release the resources.
79    fn receive(&mut self) { }
80
81    /// Signal the completion of a batch of reads from channels.
82    ///
83    /// Conventionally, this method signals to the communication fabric
84    /// that the worker is taking a break from reading from channels, and
85    /// the fabric should consider re-acquiring scarce resources. This can
86    /// lead to the fabric performing defensive copies out of un-consumed
87    /// buffers, and can be a performance problem if invoked casually.
88    fn release(&mut self) { }
89
90    /// Allocates a broadcast channel, where each pushed message is received by all.
91    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
92        let (pushers, pull) = self.allocate(identifier);
93        (Box::new(Broadcaster { spare: None, pushers }), pull)
94    }
95}
96
97/// An adapter to broadcast any pushed element.
98struct Broadcaster<T> {
99    /// Spare element for defensive copies.
100    spare: Option<T>,
101    /// Destinations to which pushed elements should be broadcast.
102    pushers: Vec<Box<dyn Push<T>>>,
103}
104
105impl<T: Clone> Push<T> for Broadcaster<T> {
106    fn push(&mut self, element: &mut Option<T>) {
107        // Push defensive copies to pushers after the first.
108        for pusher in self.pushers.iter_mut().skip(1) {
109            self.spare.clone_from(element);
110            pusher.push(&mut self.spare);
111        }
112        // Push the element itself at the first pusher.
113        for pusher in self.pushers.iter_mut().take(1) {
114            pusher.push(element);
115        }
116    }
117}
118
119use crate::allocator::zero_copy::bytes_slab::BytesRefill;
120use crate::allocator::zero_copy::spill::SpillPolicyFn;
121
122/// A builder for vectors of peers.
123pub(crate) trait PeerBuilder {
124    /// The peer type.
125    type Peer: AllocateBuilder + Sized;
126    /// Allocate a list of `Self::Peer` of length `peers`.
127    ///
128    /// `spill` is an optional factory for spill policies; one fresh policy
129    /// per `MergeQueue` that the resulting peers construct. Implementors
130    /// that don't use `MergeQueue` (e.g. `Typed` mpsc-based intra-process)
131    /// ignore it.
132    fn new_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self::Peer>;
133}
134
135
136/// Two flavors of intra-process allocator builder.
137#[non_exhaustive]
138pub enum ProcessBuilder {
139    /// Regular intra-process allocator (mpsc-based).
140    Typed(TypedProcessBuilder),
141    /// Binary intra-process allocator (zero-copy serialized).
142    Bytes(BytesProcessBuilder),
143}
144
145impl ProcessBuilder {
146    /// Builds the runtime allocator from this builder.
147    pub fn build(self) -> Process {
148        match self {
149            ProcessBuilder::Typed(t) => Process::Typed(t.build()),
150            ProcessBuilder::Bytes(b) => Process::Bytes(b.build()),
151        }
152    }
153
154    /// Constructs a vector of regular (mpsc-based, "Typed") intra-process builders.
155    pub fn new_typed_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
156        <TypedProcess as PeerBuilder>::new_vector(peers, refill, spill)
157            .into_iter()
158            .map(ProcessBuilder::Typed)
159            .collect()
160    }
161
162    /// Constructs a vector of binary (zero-copy serialized, "Bytes") intra-process builders.
163    pub fn new_bytes_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
164        <BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill, spill)
165            .into_iter()
166            .map(ProcessBuilder::Bytes)
167            .collect()
168    }
169}
170
171/// The runtime counterpart of `ProcessBuilder`: the actual constructed inner allocator.
172///
173/// Inherent methods mirror the subset of `Allocate` that `TcpAllocator` needs from its inner.
174#[non_exhaustive]
175pub enum Process {
176    /// Regular intra-process allocator.
177    Typed(TypedProcess),
178    /// Binary intra-process allocator.
179    Bytes(BytesProcess),
180}
181
182impl Process {
183    pub(crate) fn index(&self) -> usize {
184        match self {
185            Process::Typed(p) => p.index(),
186            Process::Bytes(pb) => pb.index(),
187        }
188    }
189    pub(crate) fn peers(&self) -> usize {
190        match self {
191            Process::Typed(p) => p.peers(),
192            Process::Bytes(pb) => pb.peers(),
193        }
194    }
195    pub(crate) fn allocate<T: Exchangeable>(&mut self, identifier: usize)
196        -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>)
197    {
198        match self {
199            Process::Typed(p) => p.allocate(identifier),
200            Process::Bytes(pb) => pb.allocate(identifier),
201        }
202    }
203    pub(crate) fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize)
204        -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
205    {
206        match self {
207            Process::Typed(p) => p.broadcast(identifier),
208            Process::Bytes(pb) => pb.broadcast(identifier),
209        }
210    }
211    pub(crate) fn receive(&mut self) {
212        match self {
213            Process::Typed(p) => p.receive(),
214            Process::Bytes(pb) => pb.receive(),
215        }
216    }
217    pub(crate) fn release(&mut self) {
218        match self {
219            Process::Typed(p) => p.release(),
220            Process::Bytes(pb) => pb.release(),
221        }
222    }
223    pub(crate) fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
224        match self {
225            Process::Typed(p) => p.events(),
226            Process::Bytes(pb) => pb.events(),
227        }
228    }
229    pub(crate) fn await_events(&self, duration: Option<std::time::Duration>) {
230        match self {
231            Process::Typed(p) => p.await_events(duration),
232            Process::Bytes(pb) => pb.await_events(duration),
233        }
234    }
235}