Skip to main content

timely/dataflow/channels/pushers/
tee.rs

1//! The `Tee` and `TeeHelper` types, by which stream consumers rendezvous with the producer.
2//!
3//! The design is a shared list of `Box<dyn Push<T>>` types, which are added to by `TeeHelper`
4//! and pushed into by the `Tee` once the dataflow is running. Some care is taken so that `T`
5//! does not need to implement `Clone`, other than for the instantiation of a (boxed) variant
6//! that supports multiple consumers, to avoid the constraint for single-consumer streams.
7
8use std::cell::RefCell;
9use std::fmt::{self, Debug};
10use std::rc::Rc;
11
12use crate::dataflow::channels::Message;
13
14use crate::communication::Push;
15use crate::Container;
16
17use push_set::{PushSet, PushOne, PushMany};
18mod push_set {
19
20    use crate::communication::Push;
21
22    /// A type that can be pushed at, and which may be able to accommodate a similar pusher.
23    ///
24    /// This trait exists to support fanning out of pushers when the data may not be `Clone`,
25    /// allowing the implementation for multiple pushers (which may require cloning) to be
26    /// behind an abstraction.
27    pub trait PushSet<T> : Push<T> {
28        /// If a list of boxed pushers, that list.
29        fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>>;
30    }
31
32    /// A `Push` wrapper that implements `PushOne`.
33    pub struct PushOne<P> { inner: P }
34    impl<T, P: Push<T>> Push<T> for PushOne<P> {
35        fn push(&mut self, item: &mut Option<T>) { self.inner.push(item) }
36    }
37    impl<T: 'static, P: Push<T> + 'static> PushSet<T> for PushOne<P> {
38        fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { None }
39    }
40    impl<P> From<P> for PushOne<P> { fn from(inner: P) -> Self { Self { inner } } }
41
42    /// A `Push` wrapper for a list of boxed implementors.
43    pub struct PushMany<T> {
44        /// Used to clone into, for the chance to avoid continual re-allocation.
45        buffer: Option<T>,
46        /// The intended recipients of pushed values.
47        list: Vec<Box<dyn Push<T>>>,
48    }
49    impl<T: Clone> Push<T> for PushMany<T> {
50        fn push(&mut self, item: &mut Option<T>) {
51            // We defensively clone `element` for all but the last element of `self.list`,
52            // as we cannot be sure that a `Push` implementor will not modify the contents.
53            // Indeed, that's the goal of the `Push` trait, to allow one to take ownership.
54
55            // This guard prevents dropping `self.buffer` when a `None` is received.
56            // We might prefer to do that, to reduce steady state memory.
57            if item.is_some() {
58                for pusher in self.list.iter_mut().rev().skip(1).rev() {
59                    self.buffer.clone_from(&item);
60                    pusher.push(&mut self.buffer);
61                }
62                if let Some(pusher) = self.list.last_mut() {
63                    std::mem::swap(&mut self.buffer, item);
64                    pusher.push(&mut self.buffer);
65                }
66            }
67            else { for pusher in self.list.iter_mut() { pusher.done(); } }
68        }
69    }
70    impl<T: Clone + 'static> PushSet<T> for PushMany<T> {
71        fn as_list(&mut self) -> Option<&mut Vec<Box<dyn Push<T>>>> { Some(&mut self.list) }
72    }
73    impl<T> From<Vec<Box<dyn Push<T>>>> for PushMany<T> { fn from(list: Vec<Box<dyn Push<T>>>) -> Self { Self { list, buffer: None } } }
74
75}
76
77/// The shared state between a `Tee` and `TeeHelper`: an extensible list of pushers.
78type PushList<T, C> = Rc<RefCell<Option<Box<dyn PushSet<Message<T, C>>>>>>;
79
80/// The writing half of a shared destination for pushing at.
81pub struct Tee<T, C> { shared: PushList<T, C> }
82
83impl<T: 'static, C: Container> Push<Message<T, C>> for Tee<T, C> {
84    #[inline]
85    fn push(&mut self, message: &mut Option<Message<T, C>>) {
86        if let Some(pushee) = self.shared.borrow_mut().as_mut() {
87            pushee.push(message)
88        }
89    }
90}
91
92impl<T, C> Tee<T, C> {
93    /// Allocates a new pair of `Tee` and `TeeHelper`.
94    pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
95        let shared = Rc::new(RefCell::new(None));
96        let port = Tee { shared: Rc::clone(&shared) };
97        (port, TeeHelper { shared })
98    }
99}
100
101impl<T, C> Debug for Tee<T, C> {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Tee") }
103}
104
105/// The subscribe half of a shared destination for pushing at.
106///
107/// Cloning a `TeeHelper` will upgrade it, teaching the shared list how to clone containers.
108pub struct TeeHelper<T, C> { shared: PushList<T, C> }
109
110impl<T: 'static, C: 'static> TeeHelper<T, C> {
111    /// Upgrades the shared list to one that supports cloning.
112    ///
113    /// This method "teaches" the `Tee` how to clone containers, which enables adding multiple pushers.
114    /// It introduces the cost of one additional virtual call through a boxed trait, so one should not
115    /// upgrade for no reason.
116    pub fn upgrade(&self) where T: Clone, C: Clone {
117        let mut borrow = self.shared.borrow_mut();
118        if let Some(mut pusher) = borrow.take() {
119            if pusher.as_list().is_none() {
120                *borrow = Some(Box::new(PushMany::from(vec![pusher as Box<dyn Push<Message<T, C>>>])));
121            }
122            else {
123                *borrow = Some(pusher);
124            }
125        }
126        else {
127            *borrow = Some(Box::new(PushMany::from(vec![])));
128        }
129    }
130
131    /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
132    pub fn add_pusher<P: Push<Message<T, C>>+'static>(self, pusher: P) {
133        let mut borrow = self.shared.borrow_mut();
134        if let Some(many) = borrow.as_mut() {
135            many.as_list().unwrap().push(Box::new(pusher))
136        }
137        else {
138            // If we are adding a second pusher without upgrading, something has gone wrong.
139            assert!(borrow.is_none());
140            *borrow = Some(Box::new(PushOne::from(pusher)));
141        }
142    }
143}
144
145impl<T: Clone+'static, C: Clone+'static> Clone for TeeHelper<T, C> {
146    fn clone(&self) -> Self { self.upgrade(); TeeHelper { shared: Rc::clone(&self.shared) } }
147}
148
149impl<T, C> Debug for TeeHelper<T, C> {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TeeHelper") }
151}