timely_communication/allocator/zero_copy/
push_pull.rs

1//! Push and Pull implementations wrapping serialized data.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely_bytes::arc::Bytes;
8
9use crate::allocator::canary::Canary;
10use crate::networking::MessageHeader;
11use crate::{Bytesable, Push, Pull};
12
13use super::bytes_exchange::{BytesPush, SendEndpoint};
14
15/// An adapter into which one may push elements of type `T`.
16///
17/// This pusher has a fixed MessageHeader, and access to a SharedByteBuffer which it uses to
18/// acquire buffers for serialization.
19pub struct Pusher<T, P: BytesPush> {
20    header:     MessageHeader,
21    sender:     Rc<RefCell<SendEndpoint<P>>>,
22    phantom:    ::std::marker::PhantomData<T>,
23}
24
25impl<T, P: BytesPush> Pusher<T, P> {
26    /// Creates a new `Pusher` from a header and shared byte buffer.
27    pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
28        Pusher {
29            header,
30            sender,
31            phantom:    ::std::marker::PhantomData,
32        }
33    }
34}
35
36impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
37    #[inline]
38    fn push(&mut self, element: &mut Option<T>) {
39        if let Some(ref mut element) = *element {
40
41            // determine byte lengths and build header.
42            let mut header = self.header;
43            self.header.seqno += 1;
44            header.length = element.length_in_bytes();
45            assert!(header.length > 0);
46
47            // acquire byte buffer and write header, element.
48            let mut borrow = self.sender.borrow_mut();
49            {
50                let mut bytes = borrow.reserve(header.required_bytes());
51                assert!(bytes.len() >= header.required_bytes());
52                let writer = &mut bytes;
53                header.write_to(writer).expect("failed to write header!");
54                element.into_bytes(writer);
55            }
56            borrow.make_valid(header.required_bytes());
57        }
58    }
59}
60
61/// An adapter from which one can pull elements of type `T`.
62///
63/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
64/// not the most efficient thing possible, which would probably instead be something
65/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
66/// allocation.
67pub struct Puller<T> {
68    _canary: Canary,
69    current: Option<T>,
70    receiver: Rc<RefCell<VecDeque<Bytes>>>,    // source of serialized buffers
71}
72
73impl<T: Bytesable> Puller<T> {
74    /// Creates a new `Puller` instance from a shared queue.
75    pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
76        Puller {
77            _canary,
78            current: None,
79            receiver,
80        }
81    }
82}
83
84impl<T: Bytesable> Pull<T> for Puller<T> {
85    #[inline]
86    fn pull(&mut self) -> &mut Option<T> {
87        self.current =
88        self.receiver
89            .borrow_mut()
90            .pop_front()
91            .map(T::from_bytes);
92
93        &mut self.current
94    }
95}
96
97/// An adapter from which one can pull elements of type `T`.
98///
99/// This type is very simple, and just consumes owned `Vec<u8>` allocations. It is
100/// not the most efficient thing possible, which would probably instead be something
101/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
102/// allocation.
103pub struct PullerInner<T> {
104    inner: Box<dyn Pull<T>>,               // inner pullable (e.g. intra-process typed queue)
105    _canary: Canary,
106    current: Option<T>,
107    receiver: Rc<RefCell<VecDeque<Bytes>>>,     // source of serialized buffers
108}
109
110impl<T: Bytesable> PullerInner<T> {
111    /// Creates a new `PullerInner` instance from a shared queue.
112    pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
113        PullerInner {
114            inner,
115            _canary,
116            current: None,
117            receiver,
118        }
119    }
120}
121
122impl<T: Bytesable> Pull<T> for PullerInner<T> {
123    #[inline]
124    fn pull(&mut self) -> &mut Option<T> {
125
126        let inner = self.inner.pull();
127        if inner.is_some() {
128            inner
129        }
130        else {
131            self.current =
132            self.receiver
133                .borrow_mut()
134                .pop_front()
135                .map(T::from_bytes);
136
137            &mut self.current
138        }
139    }
140}