timely/dataflow/channels/pushers/
tee.rs

1//! A `Push` implementor with a list of `Box<Push>` to forward pushes to.
2
3use std::cell::RefCell;
4use std::fmt::{self, Debug};
5use std::rc::Rc;
6
7use crate::dataflow::channels::Message;
8
9use crate::communication::Push;
10use crate::{Container, Data};
11
12type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Message<T, C>>>>>>;
13
14/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
15pub struct Tee<T, C> {
16    buffer: C,
17    shared: PushList<T, C>,
18}
19
20impl<T: Data, C: Container + Data> Push<Message<T, C>> for Tee<T, C> {
21    #[inline]
22    fn push(&mut self, message: &mut Option<Message<T, C>>) {
23        let mut pushers = self.shared.borrow_mut();
24        if let Some(message) = message {
25            for index in 1..pushers.len() {
26                self.buffer.clone_from(&message.data);
27                Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]);
28            }
29        }
30        else {
31            for index in 1..pushers.len() {
32                pushers[index-1].push(&mut None);
33            }
34        }
35        if pushers.len() > 0 {
36            let last = pushers.len() - 1;
37            pushers[last].push(message);
38        }
39    }
40}
41
42impl<T, C: Container> Tee<T, C> {
43    /// Allocates a new pair of `Tee` and `TeeHelper`.
44    pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
45        let shared = Rc::new(RefCell::new(Vec::new()));
46        let port = Tee {
47            buffer: Default::default(),
48            shared: Rc::clone(&shared),
49        };
50
51        (port, TeeHelper { shared })
52    }
53}
54
55impl<T, C: Container> Clone for Tee<T, C> {
56    fn clone(&self) -> Self {
57        Self {
58            buffer: Default::default(),
59            shared: Rc::clone(&self.shared),
60        }
61    }
62}
63
64impl<T, C> Debug for Tee<T, C>
65where
66    C: Debug,
67{
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        let mut debug = f.debug_struct("Tee");
70        debug.field("buffer", &self.buffer);
71
72        if let Ok(shared) = self.shared.try_borrow() {
73            debug.field("shared", &format!("{} pushers", shared.len()));
74        } else {
75            debug.field("shared", &"...");
76        }
77
78        debug.finish()
79    }
80}
81
82/// A shared list of `Box<Push>` used to add `Push` implementors.
83pub struct TeeHelper<T, C> {
84    shared: PushList<T, C>,
85}
86
87impl<T, C> TeeHelper<T, C> {
88    /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
89    pub fn add_pusher<P: Push<Message<T, C>>+'static>(&self, pusher: P) {
90        self.shared.borrow_mut().push(Box::new(pusher));
91    }
92}
93
94impl<T, C> Clone for TeeHelper<T, C> {
95    fn clone(&self) -> Self {
96        TeeHelper {
97            shared: Rc::clone(&self.shared),
98        }
99    }
100}
101
102impl<T, C> Debug for TeeHelper<T, C> {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        let mut debug = f.debug_struct("TeeHelper");
105
106        if let Ok(shared) = self.shared.try_borrow() {
107            debug.field("shared", &format!("{} pushers", shared.len()));
108        } else {
109            debug.field("shared", &"...");
110        }
111
112        debug.finish()
113    }
114}