timely/dataflow/channels/pushers/
tee.rs
1use std::cell::RefCell;
4use std::fmt::{self, Debug};
5use std::rc::Rc;
6
7use crate::dataflow::channels::Message;
8
9use crate::communication::Push;
10use crate::{Container, Data};
11
12type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
13
14pub struct Tee<T, C> {
16 buffer: C,
17 shared: PushList<T, C>,
18}
19
20impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
21 #[inline]
22 fn push(&mut self, message: &mut Option<Message<T, C>>) {
23 let mut pushers = self.shared.borrow_mut();
24 if let Some(message) = message {
25 for index in 1..pushers.len() {
26 self.buffer.clone_from(&message.data);
27 Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
28 }
29 }
30 else {
31 for index in 1..pushers.len() {
32 pushers[index-1].push(&mut None);
33 }
34 }
35 if pushers.len() > 0 {
36 let last = pushers.len() - 1;
37 pushers[last].push(message);
38 }
39 }
40}
41
42impl<T, C: Container> Tee<T, C> {
43 pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
45 let shared = Rc::new(RefCell::new(Vec::new()));
46 let port = Tee {
47 buffer: Default::default(),
48 shared: Rc::clone(&shared),
49 };
50
51 (port, TeeHelper { shared })
52 }
53}
54
55impl<T, C: Container> Clone for Tee<T, C> {
56 fn clone(&self) -> Self {
57 Self {
58 buffer: Default::default(),
59 shared: Rc::clone(&self.shared),
60 }
61 }
62}
63
64impl<T, C> Debug for Tee<T, C>
65where
66 C: Debug,
67{
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 let mut debug = f.debug_struct("Tee");
70 debug.field("buffer", &self.buffer);
71
72 if let Ok(shared) = self.shared.try_borrow() {
73 debug.field("shared", &format!("{} pushers", shared.len()));
74 } else {
75 debug.field("shared", &"...");
76 }
77
78 debug.finish()
79 }
80}
81
82pub struct TeeHelper<T, C> {
84 shared: PushList<T, C>,
85}
86
87impl<T, C> TeeHelper<T, C> {
88 pub fn add_pusher<P: Push<Message<T, C>>+'static>(&self, pusher: P) {
90 self.shared.borrow_mut().push(Box::new(pusher));
91 }
92}
93
94impl<T, C> Clone for TeeHelper<T, C> {
95 fn clone(&self) -> Self {
96 TeeHelper {
97 shared: Rc::clone(&self.shared),
98 }
99 }
100}
101
102impl<T, C> Debug for TeeHelper<T, C> {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 let mut debug = f.debug_struct("TeeHelper");
105
106 if let Ok(shared) = self.shared.try_borrow() {
107 debug.field("shared", &format!("{} pushers", shared.len()));
108 } else {
109 debug.field("shared", &"...");
110 }
111
112 debug.finish()
113 }
114}