1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
//! Types and traits for the allocation of channels.
use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
pub use self::thread::Thread;
pub use self::process::Process;
pub use self::generic::{Generic, GenericBuilder};
pub mod thread;
pub mod process;
pub mod generic;
pub mod canary;
pub mod counters;
pub mod zero_copy;
use crate::{Bytesable, Push, Pull};
/// A proto-allocator, which implements `Send` and can be completed with `build`.
/// This trait exists because some allocators contain elements that do not implement
/// the `Send` trait, for example `Rc` wrappers for shared state. As such, what we
/// actually need to create to initialize a computation are builders, which we can
/// then move into new threads each of which then construct their actual allocator.
pub trait AllocateBuilder : Send {
/// The type of allocator to be built.
type Allocator: Allocate;
/// Builds allocator, consumes self.
fn build(self) -> Self::Allocator;
use std::any::Any;
/// A type that can be sent along an allocated channel.
pub trait Exchangeable : Send+Any+Bytesable { }
impl<T: Send+Any+Bytesable> Exchangeable for T { }
/// A type capable of allocating channels.
/// There is some feature creep, in that this contains several convenience methods about the nature
/// of the allocated channels, and maintenance methods to ensure that they move records around.
pub trait Allocate {
/// The index of the worker out of `(0..self.peers())`.
fn index(&self) -> usize;
/// The number of workers in the communication group.
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
/// A shared queue of communication events with channel identifier.
/// It is expected that users of the channel allocator will regularly
/// drain these events in order to drive their computation. If they
/// fail to do so the event queue may become quite large, and turn
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
/// Awaits communication events.
/// This method may park the current thread, for at most `duration`,
/// until new events arrive.
/// The method is not guaranteed to wait for any amount of time, but
/// good implementations should use this as a hint to park the thread.
fn await_events(&self, _duration: Option<Duration>) { }
/// Ensure that received messages are surfaced in each channel.
/// This method should be called to ensure that received messages are
/// surfaced in each channel, but failing to call the method does not
/// ensure that they are not surfaced.
/// Generally, this method is the indication that the allocator should
/// present messages contained in otherwise scarce resources (for example
/// network buffers), under the premise that someone is about to consume
/// the messages and release the resources.
fn receive(&mut self) { }
/// Signal the completion of a batch of reads from channels.
/// Conventionally, this method signals to the communication fabric
/// that the worker is taking a break from reading from channels, and
/// the fabric should consider re-acquiring scarce resources. This can
/// lead to the fabric performing defensive copies out of un-consumed
/// buffers, and can be a performance problem if invoked casually.
fn release(&mut self) { }
/// Constructs a pipeline channel from the worker to itself.
/// By default, this method uses the thread-local channel constructor
/// based on a shared `VecDeque` which updates the event queue.
fn pipeline<T: 'static>(&mut self, identifier: usize) ->
thread::Thread::new_from(identifier, self.events().clone())
/// Allocates a broadcast channel, where each pushed message is received by all.
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
let (pushers, pull) = self.allocate(identifier);
(Box::new(Broadcaster { spare: None, pushers }), pull)
/// An adapter to broadcast any pushed element.
struct Broadcaster<T> {
/// Spare element for defensive copies.
spare: Option<T>,
/// Destinations to which pushed elements should be broadcast.
pushers: Vec<Box<dyn Push<T>>>,
impl<T: Clone> Push<T> for Broadcaster<T> {
fn push(&mut self, element: &mut Option<T>) {
// Push defensive copies to pushers after the first.
for pusher in self.pushers.iter_mut().skip(1) {
pusher.push(&mut self.spare);
// Push the element itself at the first pusher.
for pusher in self.pushers.iter_mut().take(1) {