timely_communication/allocator/zero_copy/push_pull.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
//! Push and Pull implementations wrapping serialized data.
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;
use bytes::arc::Bytes;
use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;
use crate::{Data, Push, Pull};
use crate::allocator::Message;
use super::bytes_exchange::{BytesPush, SendEndpoint};
/// An adapter into which one may push elements of type `T`.
///
/// This pusher has a fixed MessageHeader, and access to a SharedByteBuffer which it uses to
/// acquire buffers for serialization.
pub struct Pusher<T, P: BytesPush> {
header: MessageHeader,
sender: Rc<RefCell<SendEndpoint<P>>>,
phantom: ::std::marker::PhantomData<T>,
}
impl<T, P: BytesPush> Pusher<T, P> {
/// Creates a new `Pusher` from a header and shared byte buffer.
pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
Pusher {
header,
sender,
phantom: ::std::marker::PhantomData,
}
}
}
impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<Message<T>>) {
if let Some(ref mut element) = *element {
// determine byte lengths and build header.
let mut header = self.header;
self.header.seqno += 1;
header.length = element.length_in_bytes();
assert!(header.length > 0);
// acquire byte buffer and write header, element.
let mut borrow = self.sender.borrow_mut();
{
let mut bytes = borrow.reserve(header.required_bytes());
assert!(bytes.len() >= header.required_bytes());
let writer = &mut bytes;
header.write_to(writer).expect("failed to write header!");
element.into_bytes(writer);
}
borrow.make_valid(header.required_bytes());
}
}
}
/// An adapter from which one can pull elements of type `T`.
///
/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
/// not the most efficient thing possible, which would probably instead be something
/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
/// allocation.
pub struct Puller<T> {
_canary: Canary,
current: Option<Message<T>>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}
impl<T:Data> Puller<T> {
/// Creates a new `Puller` instance from a shared queue.
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
Puller {
_canary,
current: None,
receiver,
}
}
}
impl<T:Data> Pull<Message<T>> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
&mut self.current
}
}
/// An adapter from which one can pull elements of type `T`.
///
/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
/// not the most efficient thing possible, which would probably instead be something
/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
/// allocation.
pub struct PullerInner<T> {
inner: Box<dyn Pull<Message<T>>>, // inner pullable (e.g. intra-process typed queue)
_canary: Canary,
current: Option<Message<T>>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}
impl<T:Data> PullerInner<T> {
/// Creates a new `PullerInner` instance from a shared queue.
pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
PullerInner {
inner,
_canary,
current: None,
receiver,
}
}
}
impl<T:Data> Pull<Message<T>> for PullerInner<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
let inner = self.inner.pull();
if inner.is_some() {
inner
}
else {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
&mut self.current
}
}
}