timely_communication/allocator/mod.rs
1//! Types and traits for the allocation of channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6
7pub use self::thread::Thread;
8pub use self::generic::{Allocator, AllocatorBuilder};
9
10pub mod thread;
11pub mod process;
12pub mod generic;
13
14pub mod canary;
15pub mod counters;
16
17pub mod zero_copy;
18
19use crate::{Bytesable, Push, Pull};
20use crate::allocator::process::{Process as TypedProcess, ProcessBuilder as TypedProcessBuilder};
21use crate::allocator::zero_copy::allocator_process::{ProcessAllocator as BytesProcess, ProcessBuilder as BytesProcessBuilder};
22
23/// A proto-allocator, which implements `Send` and can be completed with `build`.
24///
25/// This trait exists because some allocators contain elements that do not implement
26/// the `Send` trait, for example `Rc` wrappers for shared state. As such, what we
27/// actually need to create to initialize a computation are builders, which we can
28/// then move into new threads each of which then construct their actual allocator.
29pub(crate) trait AllocateBuilder : Send {
30 /// The type of allocator to be built.
31 type Allocator: Allocate;
32 /// Builds allocator, consumes self.
33 fn build(self) -> Self::Allocator;
34}
35
36use std::any::Any;
37
38/// A type that can be sent along an allocated channel.
39pub trait Exchangeable : Send+Any+Bytesable { }
40impl<T: Send+Any+Bytesable> Exchangeable for T { }
41
42/// A type capable of allocating channels.
43///
44/// There is some feature creep, in that this contains several convenience methods about the nature
45/// of the allocated channels, and maintenance methods to ensure that they move records around.
46pub(crate) trait Allocate {
47 /// The index of the worker out of `(0..self.peers())`.
48 fn index(&self) -> usize;
49 /// The number of workers in the communication group.
50 fn peers(&self) -> usize;
51 /// Constructs several send endpoints and one receive endpoint.
52 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
53 /// A shared queue of communication events with channel identifier.
54 ///
55 /// It is expected that users of the channel allocator will regularly
56 /// drain these events in order to drive their computation. If they
57 /// fail to do so the event queue may become quite large, and turn
58 /// into a performance problem.
59 fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
60
61 /// Awaits communication events.
62 ///
63 /// This method may park the current thread, for at most `duration`,
64 /// until new events arrive.
65 /// The method is not guaranteed to wait for any amount of time, but
66 /// good implementations should use this as a hint to park the thread.
67 fn await_events(&self, _duration: Option<Duration>) { }
68
69 /// Ensure that received messages are surfaced in each channel.
70 ///
71 /// This method should be called to ensure that received messages are
72 /// surfaced in each channel, but failing to call the method does not
73 /// ensure that they are not surfaced.
74 ///
75 /// Generally, this method is the indication that the allocator should
76 /// present messages contained in otherwise scarce resources (for example
77 /// network buffers), under the premise that someone is about to consume
78 /// the messages and release the resources.
79 fn receive(&mut self) { }
80
81 /// Signal the completion of a batch of reads from channels.
82 ///
83 /// Conventionally, this method signals to the communication fabric
84 /// that the worker is taking a break from reading from channels, and
85 /// the fabric should consider re-acquiring scarce resources. This can
86 /// lead to the fabric performing defensive copies out of un-consumed
87 /// buffers, and can be a performance problem if invoked casually.
88 fn release(&mut self) { }
89
90 /// Allocates a broadcast channel, where each pushed message is received by all.
91 fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
92 let (pushers, pull) = self.allocate(identifier);
93 (Box::new(Broadcaster { spare: None, pushers }), pull)
94 }
95}
96
97/// An adapter to broadcast any pushed element.
98struct Broadcaster<T> {
99 /// Spare element for defensive copies.
100 spare: Option<T>,
101 /// Destinations to which pushed elements should be broadcast.
102 pushers: Vec<Box<dyn Push<T>>>,
103}
104
105impl<T: Clone> Push<T> for Broadcaster<T> {
106 fn push(&mut self, element: &mut Option<T>) {
107 // Push defensive copies to pushers after the first.
108 for pusher in self.pushers.iter_mut().skip(1) {
109 self.spare.clone_from(element);
110 pusher.push(&mut self.spare);
111 }
112 // Push the element itself at the first pusher.
113 for pusher in self.pushers.iter_mut().take(1) {
114 pusher.push(element);
115 }
116 }
117}
118
119use crate::allocator::zero_copy::bytes_slab::BytesRefill;
120use crate::allocator::zero_copy::spill::SpillPolicyFn;
121
122/// A builder for vectors of peers.
123pub(crate) trait PeerBuilder {
124 /// The peer type.
125 type Peer: AllocateBuilder + Sized;
126 /// Allocate a list of `Self::Peer` of length `peers`.
127 ///
128 /// `spill` is an optional factory for spill policies; one fresh policy
129 /// per `MergeQueue` that the resulting peers construct. Implementors
130 /// that don't use `MergeQueue` (e.g. `Typed` mpsc-based intra-process)
131 /// ignore it.
132 fn new_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self::Peer>;
133}
134
135
136/// Two flavors of intra-process allocator builder.
137#[non_exhaustive]
138pub enum ProcessBuilder {
139 /// Regular intra-process allocator (mpsc-based).
140 Typed(TypedProcessBuilder),
141 /// Binary intra-process allocator (zero-copy serialized).
142 Bytes(BytesProcessBuilder),
143}
144
145impl ProcessBuilder {
146 /// Builds the runtime allocator from this builder.
147 pub fn build(self) -> Process {
148 match self {
149 ProcessBuilder::Typed(t) => Process::Typed(t.build()),
150 ProcessBuilder::Bytes(b) => Process::Bytes(b.build()),
151 }
152 }
153
154 /// Constructs a vector of regular (mpsc-based, "Typed") intra-process builders.
155 pub fn new_typed_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
156 <TypedProcess as PeerBuilder>::new_vector(peers, refill, spill)
157 .into_iter()
158 .map(ProcessBuilder::Typed)
159 .collect()
160 }
161
162 /// Constructs a vector of binary (zero-copy serialized, "Bytes") intra-process builders.
163 pub fn new_bytes_vector(peers: usize, refill: BytesRefill, spill: Option<SpillPolicyFn>) -> Vec<Self> {
164 <BytesProcessBuilder as PeerBuilder>::new_vector(peers, refill, spill)
165 .into_iter()
166 .map(ProcessBuilder::Bytes)
167 .collect()
168 }
169}
170
171/// The runtime counterpart of `ProcessBuilder`: the actual constructed inner allocator.
172///
173/// Inherent methods mirror the subset of `Allocate` that `TcpAllocator` needs from its inner.
174#[non_exhaustive]
175pub enum Process {
176 /// Regular intra-process allocator.
177 Typed(TypedProcess),
178 /// Binary intra-process allocator.
179 Bytes(BytesProcess),
180}
181
182impl Process {
183 pub(crate) fn index(&self) -> usize {
184 match self {
185 Process::Typed(p) => p.index(),
186 Process::Bytes(pb) => pb.index(),
187 }
188 }
189 pub(crate) fn peers(&self) -> usize {
190 match self {
191 Process::Typed(p) => p.peers(),
192 Process::Bytes(pb) => pb.peers(),
193 }
194 }
195 pub(crate) fn allocate<T: Exchangeable>(&mut self, identifier: usize)
196 -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>)
197 {
198 match self {
199 Process::Typed(p) => p.allocate(identifier),
200 Process::Bytes(pb) => pb.allocate(identifier),
201 }
202 }
203 pub(crate) fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize)
204 -> (Box<dyn Push<T>>, Box<dyn Pull<T>>)
205 {
206 match self {
207 Process::Typed(p) => p.broadcast(identifier),
208 Process::Bytes(pb) => pb.broadcast(identifier),
209 }
210 }
211 pub(crate) fn receive(&mut self) {
212 match self {
213 Process::Typed(p) => p.receive(),
214 Process::Bytes(pb) => pb.receive(),
215 }
216 }
217 pub(crate) fn release(&mut self) {
218 match self {
219 Process::Typed(p) => p.release(),
220 Process::Bytes(pb) => pb.release(),
221 }
222 }
223 pub(crate) fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
224 match self {
225 Process::Typed(p) => p.events(),
226 Process::Bytes(pb) => pb.events(),
227 }
228 }
229 pub(crate) fn await_events(&self, duration: Option<std::time::Duration>) {
230 match self {
231 Process::Typed(p) => p.await_events(duration),
232 Process::Bytes(pb) => pb.await_events(duration),
233 }
234 }
235}