timely_communication/allocator/zero_copy/
push_pull.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely_bytes::arc::Bytes;
8
9use crate::allocator::canary::Canary;
10use crate::networking::MessageHeader;
11use crate::{Bytesable, Push, Pull};
12
13use super::bytes_exchange::{BytesPush, SendEndpoint};
14
15pub struct Pusher<T, P: BytesPush> {
20 header: MessageHeader,
21 sender: Rc<RefCell<SendEndpoint<P>>>,
22 phantom: ::std::marker::PhantomData<T>,
23}
24
25impl<T, P: BytesPush> Pusher<T, P> {
26 pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> {
28 Pusher {
29 header,
30 sender,
31 phantom: ::std::marker::PhantomData,
32 }
33 }
34}
35
36impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
37 #[inline]
38 fn push(&mut self, element: &mut Option<T>) {
39 if let Some(ref mut element) = *element {
40
41 let mut header = self.header;
43 self.header.seqno += 1;
44 header.length = element.length_in_bytes();
45 assert!(header.length > 0);
46
47 let mut borrow = self.sender.borrow_mut();
49 {
50 let mut bytes = borrow.reserve(header.required_bytes());
51 assert!(bytes.len() >= header.required_bytes());
52 let writer = &mut bytes;
53 header.write_to(writer).expect("failed to write header!");
54 element.into_bytes(writer);
55 }
56 borrow.make_valid(header.required_bytes());
57 }
58 }
59}
60
61pub struct Puller<T> {
68 _canary: Canary,
69 current: Option<T>,
70 receiver: Rc<RefCell<VecDeque<Bytes>>>, }
72
73impl<T: Bytesable> Puller<T> {
74 pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
76 Puller {
77 _canary,
78 current: None,
79 receiver,
80 }
81 }
82}
83
84impl<T: Bytesable> Pull<T> for Puller<T> {
85 #[inline]
86 fn pull(&mut self) -> &mut Option<T> {
87 self.current =
88 self.receiver
89 .borrow_mut()
90 .pop_front()
91 .map(T::from_bytes);
92
93 &mut self.current
94 }
95}
96
97pub struct PullerInner<T> {
104 inner: Box<dyn Pull<T>>, _canary: Canary,
106 current: Option<T>,
107 receiver: Rc<RefCell<VecDeque<Bytes>>>, }
109
110impl<T: Bytesable> PullerInner<T> {
111 pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
113 PullerInner {
114 inner,
115 _canary,
116 current: None,
117 receiver,
118 }
119 }
120}
121
122impl<T: Bytesable> Pull<T> for PullerInner<T> {
123 #[inline]
124 fn pull(&mut self) -> &mut Option<T> {
125
126 let inner = self.inner.pull();
127 if inner.is_some() {
128 inner
129 }
130 else {
131 self.current =
132 self.receiver
133 .borrow_mut()
134 .pop_front()
135 .map(T::from_bytes);
136
137 &mut self.current
138 }
139 }
140}