timely/dataflow/channels/pushers/
tee.rs1use std::cell::RefCell;
9use std::fmt::{self, Debug};
10use std::rc::Rc;
11
12use crate::dataflow::channels::Message;
13
14use crate::communication::Push;
15use crate::Container;
16
17use push_set::{PushSet, PushOne, PushMany};
18mod push_set {
19
20 use crate::communication::Push;
21
22 pub trait PushSet<T> : Push<T> {
28 fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>>;
30 }
31
32 pub struct PushOne<P> { inner: P }
34 impl<T, P: Push<T>> Push<T> for PushOne<P> {
35 fn push(&mut self, item: &mut Option<T>) { self.inner.push(item) }
36 }
37 impl<T: 'static, P: Push<T> + 'static> PushSet<T> for PushOne<P> {
38 fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { None }
39 }
40 impl<P> From<P> for PushOne<P> { fn from(inner: P) -> Self { Self { inner } } }
41
42 pub struct PushMany<T> {
44 buffer: Option<T>,
46 list: Vec<Box<dyn Push<T>>>,
48 }
49 impl<T: Clone> Push<T> for PushMany<T> {
50 fn push(&mut self, item: &mut Option<T>) {
51 if item.is_some() {
58 for pusher in self.list.iter_mut().rev().skip(1).rev() {
59 self.buffer.clone_from(&item);
60 pusher.push(&mut self.buffer);
61 }
62 if let Some(pusher) = self.list.last_mut() {
63 std::mem::swap(&mut self.buffer, item);
64 pusher.push(&mut self.buffer);
65 }
66 }
67 else { for pusher in self.list.iter_mut() { pusher.done(); } }
68 }
69 }
70 impl<T: Clone + 'static> PushSet<T> for PushMany<T> {
71 fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { Some(&mut self.list) }
72 }
73 impl<T> From<Vec<Box<dyn Push<T>>>> for PushMany<T> { fn from(list: Vec<Box<dyn Push<T>>>) -> Self { Self { list, buffer: None } } }
74
75}
76
77type PushList<T, C> = Rc<RefCell<Option<Box<dyn PushSet<Message<T, C>>>>>>;
79
80pub struct Tee<T, C> { shared: PushList<T, C> }
82
83impl<T: 'static, C: Container> Push<Message<T, C>> for Tee<T, C> {
84 #[inline]
85 fn push(&mut self, message: &mut Option<Message<T, C>>) {
86 if let Some(pushee) = self.shared.borrow_mut().as_mut() {
87 pushee.push(message)
88 }
89 }
90}
91
92impl<T, C> Tee<T, C> {
93 pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
95 let shared = Rc::new(RefCell::new(None));
96 let port = Tee { shared: Rc::clone(&shared) };
97 (port, TeeHelper { shared })
98 }
99}
100
101impl<T, C> Debug for Tee<T, C> {
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Tee") }
103}
104
105pub struct TeeHelper<T, C> { shared: PushList<T, C> }
109
110impl<T: 'static, C: 'static> TeeHelper<T, C> {
111 pub fn upgrade(&self) where T: Clone, C: Clone {
117 let mut borrow = self.shared.borrow_mut();
118 if let Some(mut pusher) = borrow.take() {
119 if pusher.as_list().is_none() {
120 *borrow = Some(Box::new(PushMany::from(vec![pusher as Box<dyn Push<Message<T, C>>>])));
121 }
122 else {
123 *borrow = Some(pusher);
124 }
125 }
126 else {
127 *borrow = Some(Box::new(PushMany::from(vec![])));
128 }
129 }
130
131 pub fn add_pusher<P: Push<Message<T, C>>+'static>(self, pusher: P) {
133 let mut borrow = self.shared.borrow_mut();
134 if let Some(many) = borrow.as_mut() {
135 many.as_list().unwrap().push(Box::new(pusher))
136 }
137 else {
138 assert!(borrow.is_none());
140 *borrow = Some(Box::new(PushOne::from(pusher)));
141 }
142 }
143}
144
145impl<T: Clone+'static, C: Clone+'static> Clone for TeeHelper<T, C> {
146 fn clone(&self) -> Self { self.upgrade(); TeeHelper { shared: Rc::clone(&self.shared) } }
147}
148
149impl<T, C> Debug for TeeHelper<T, C> {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TeeHelper") }
151}