Skip to main content

timely/dataflow/
stream.rs

1//! A handle to a typed stream of timely data.
2//!
3//! Most high-level timely dataflow programming is done with streams, which are each a handle to an
4//! operator output. Extension methods on the `Stream` type provide the appearance of higher-level
5//! declarative programming, while constructing a dataflow graph underneath.
6
7use crate::progress::{Source, Target, Timestamp};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::Message;
13use std::fmt::{self, Debug};
14
15/// Abstraction of a stream of `C: Container` records timestamped with `T`.
16///
17/// Internally `Stream` maintains a list of data recipients who should be presented with data
18/// produced by the source of the stream.
19pub struct Stream<'scope, T: Timestamp, C> {
20    /// The progress identifier of the stream's data source.
21    name: Source,
22    /// The `Scope` containing the stream.
23    scope: Scope<'scope, T>,
24    /// Maintains a list of Push<Message<T, C>> interested in the stream's output.
25    ports: TeeHelper<T, C>,
26}
27
28impl<'scope, T: Timestamp, C: Clone+'static> Clone for Stream<'scope, T, C> {
29    fn clone(&self) -> Self {
30        Self {
31            name: self.name,
32            scope: self.scope,
33            ports: self.ports.clone(),
34        }
35    }
36
37    fn clone_from(&mut self, source: &Self) {
38        self.name.clone_from(&source.name);
39        self.scope.clone_from(&source.scope);
40        self.ports.clone_from(&source.ports);
41    }
42}
43
44/// A stream batching data in owning vectors.
45pub type StreamVec<'scope, T, D> = Stream<'scope, T, Vec<D>>;
46
47impl<'scope, T: Timestamp, C> Stream<'scope, T, C> {
48    /// Connects the stream to a destination.
49    ///
50    /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the
51    /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
52    pub fn connect_to<P: Push<Message<T, C>>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static {
53
54        let mut logging: Option<crate::logging::TimelyLogger> = self.scope().worker().logging();
55        logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
56            id: identifier,
57            scope_addr: self.scope.addr().to_vec(),
58            source: (self.name.node, self.name.port),
59            target: (target.node, target.port),
60            typ: std::any::type_name::<C>().to_string(),
61        }));
62
63        self.scope.add_edge(self.name, target);
64        self.ports.add_pusher(pusher);
65    }
66    /// Allocates a `Stream` from a supplied `Source` name and rendezvous point.
67    pub fn new(source: Source, output: TeeHelper<T, C>, scope: Scope<'scope, T>) -> Self {
68        Self { name: source, ports: output, scope }
69    }
70    /// The name of the stream's source operator.
71    pub fn name(&self) -> &Source { &self.name }
72    /// The scope immediately containing the stream.
73    pub fn scope(&self) -> Scope<'scope, T> { self.scope }
74
75    /// Allows the assertion of a container type, for the benefit of type inference.
76    ///
77    /// This method can be needed when the container type of a stream is unconstrained,
78    /// most commonly after creating an input, or bracking wholly generic operators.
79    ///
80    /// # Examples
81    /// ```
82    /// use timely::dataflow::operators::{ToStream, Inspect};
83    ///
84    /// timely::example(|scope| {
85    ///     (0..10).to_stream(scope)
86    ///            .container::<Vec<_>>()
87    ///            .inspect(|x| println!("seen: {:?}", x));
88    /// });
89    /// ```
90    pub fn container<C2>(self) -> Stream<'scope, T, C2> where Self: AsStream<'scope, T, C2> { self.as_stream() }
91}
92
93/// A type that can be translated to a [Stream].
94pub trait AsStream<'scope, T: Timestamp, C> {
95    /// Translate `self` to a [Stream].
96    fn as_stream(self) -> Stream<'scope, T, C>;
97}
98
99impl<'scope, T: Timestamp, C> AsStream<'scope, T, C> for Stream<'scope, T, C> {
100    fn as_stream(self) -> Self { self }
101}
102
103impl<'scope, T: Timestamp, C> Debug for Stream<'scope, T, C> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_struct("Stream")
106            .field("source", &self.name)
107            .finish_non_exhaustive()
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use crate::dataflow::channels::pact::Pipeline;
114    use crate::dataflow::operators::{Operator, ToStream};
115
116    #[derive(Debug, Eq, PartialEq)]
117    struct NotClone;
118
119    #[test]
120    fn test_non_clone_stream() {
121        crate::example(|scope| {
122            let _ = [NotClone]
123                .to_stream(scope)
124                .container::<Vec<_>>()
125                .sink(Pipeline, "check non-clone", |(input, _frontier)| {
126                    input.for_each(|_cap, data| {
127                        for datum in data.drain(..) {
128                            assert_eq!(datum, NotClone);
129                        }
130                    });
131                });
132        });
133    }
134}