//! Zero-copy allocator for intra-process serialized communication.
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::{VecDeque, HashMap, hash_map::Entry};
use crossbeam_channel::{Sender, Receiver};
use timely_bytes::arc::Bytes;
use crate::networking::MessageHeader;
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
use crate::allocator::canary::Canary;
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
use super::push_pull::{Pusher, Puller};
/// Builds an instance of a ProcessAllocator.
/// Builders are required because some of the state in a `ProcessAllocator` cannot be sent between
/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
/// shared between threads here, and then provide a method that will instantiate the non-movable
/// members once in the destination thread.
pub struct ProcessBuilder {
index: usize, // number out of peers
peers: usize, // number of peer allocators.
pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
pullers: Vec<Sender<MergeQueue>>, // for pulling bytes from other workers.
refill: BytesRefill,
impl PeerBuilder for ProcessBuilder {
type Peer = ProcessBuilder;
/// Creates a vector of builders, sharing appropriate state.
/// This method requires access to a byte exchanger, from which it mints channels.
fn new_vector(count: usize, refill: BytesRefill) -> Vec<ProcessBuilder> {
// Channels for the exchange of `MergeQueue` endpoints.
let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
.map(|(index, (pushers, pullers))|
ProcessBuilder {
peers: count,
refill: refill.clone(),
impl ProcessBuilder {
/// Builds a `ProcessAllocator`, instantiating `Rc<RefCell<_>>` elements.
pub fn build(self) -> ProcessAllocator {
// Fulfill puller obligations.
let mut recvs = Vec::with_capacity(self.peers);
for puller in self.pullers.into_iter() {
let buzzer = crate::buzzer::Buzzer::default();
let queue = MergeQueue::new(buzzer);
puller.send(queue.clone()).expect("Failed to send MergeQueue");
// Extract pusher commitments.
let mut sends = Vec::with_capacity(self.peers);
for pusher in self.pushers.into_iter() {
let queue = pusher.recv().expect("Failed to receive MergeQueue");
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
ProcessAllocator {
index: self.index,
peers: self.peers,
events: Rc::new(RefCell::new(Default::default())),
canaries: Rc::new(RefCell::new(Vec::new())),
channel_id_bound: None,
staged: Vec::new(),
to_local: HashMap::new(),
impl AllocateBuilder for ProcessBuilder {
type Allocator = ProcessAllocator;
/// Builds allocator, consumes self.
fn build(self) -> Self::Allocator {
/// A serializing allocator for inter-thread intra-process communication.
pub struct ProcessAllocator {
index: usize, // number out of peers
peers: usize, // number of peer allocators (for typed channel allocation).
events: Rc<RefCell<Vec<usize>>>,
canaries: Rc<RefCell<Vec<usize>>>,
channel_id_bound: Option<usize>,
// sending, receiving, and responding to binary buffers.
staged: Vec<Bytes>,
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
recvs: Vec<MergeQueue>, // recvs[x] <- from thread x.
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
impl Allocate for ProcessAllocator {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
self.channel_id_bound = Some(identifier);
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());
for target_index in 0 .. self.peers() {
// message header template.
let header = MessageHeader {
channel: identifier,
source: self.index,
target_lower: target_index,
target_upper: target_index+1,
length: 0,
seqno: 0,
// create, box, and stash new process_binary pusher.
pushes.push(Box::new(Pusher::new(header, self.sends[target_index].clone())));
let channel =
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, self.canaries.clone());
let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, self.events().clone()));
(pushes, puller)
// Perform preparatory work, most likely reading binary buffers from self.recv.
fn receive(&mut self) {
// Check for channels whose `Puller` has been dropped.
let mut canaries = self.canaries.borrow_mut();
for dropped_channel in canaries.drain(..) {
let _dropped =
.expect("non-existent channel dropped");
// Borrowed channels may be non-empty, if the dataflow was forcibly
// dropped. The contract is that if a dataflow is dropped, all other
// workers will drop the dataflow too, without blocking indefinitely
// on events from it.
// assert!(dropped.borrow().is_empty());
let mut events = self.events.borrow_mut();
for recv in self.recvs.iter_mut() {
recv.drain_into(&mut self.staged);
for mut bytes in self.staged.drain(..) {
// We expect that `bytes` contains an integral number of messages.
// No splitting occurs across allocations.
while bytes.len() > 0 {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
let _ = peel.extract_to(header.header_bytes());
// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
// Ensure that a queue exists.
match self.to_local.entry(header.channel) {
Entry::Vacant(entry) => {
// We may receive data before allocating, and shouldn't block.
if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
Entry::Occupied(mut entry) => {
else {
println!("failed to read full header!");
// Perform postparatory work, most likely sending un-full binary buffers.
fn release(&mut self) {
// Publish outgoing byte ledgers.
for send in self.sends.iter_mut() {
// OPTIONAL: Tattle on channels sitting on borrowed data.
// OPTIONAL: Perhaps copy borrowed data into owned allocation.
// for (index, list) in self.to_local.iter() {
// let len = list.borrow_mut().len();
// if len > 0 {
// eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
// }
// }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
fn await_events(&self, duration: Option<std::time::Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
else {