timely/dataflow/
stream.rs

1//! A handle to a typed stream of timely data.
2//!
3//! Most high-level timely dataflow programming is done with streams, which are each a handle to an
4//! operator output. Extension methods on the `Stream` type provide the appearance of higher-level
5//! declarative programming, while constructing a dataflow graph underneath.
6
7use crate::progress::{Source, Target};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::Message;
13use std::fmt::{self, Debug};
14use crate::Container;
15
16// use dataflow::scopes::root::loggers::CHANNELS_Q;
17
18/// Abstraction of a stream of `C: Container` records timestamped with `S::Timestamp`.
19///
20/// Internally `Stream` maintains a list of data recipients who should be presented with data
21/// produced by the source of the stream.
22pub struct StreamCore<S: Scope, C> {
23    /// The progress identifier of the stream's data source.
24    name: Source,
25    /// The `Scope` containing the stream.
26    scope: S,
27    /// Maintains a list of Push<Message<T, C>> interested in the stream's output.
28    ports: TeeHelper<S::Timestamp, C>,
29}
30
31impl<S: Scope, C> Clone for StreamCore<S, C> {
32    fn clone(&self) -> Self {
33        Self {
34            name: self.name,
35            scope: self.scope.clone(),
36            ports: self.ports.clone(),
37        }
38    }
39
40    fn clone_from(&mut self, source: &Self) {
41        self.name.clone_from(&source.name);
42        self.scope.clone_from(&source.scope);
43        self.ports.clone_from(&source.ports);
44    }
45}
46
47/// A stream batching data in vectors.
48pub type Stream<S, D> = StreamCore<S, Vec<D>>;
49
50impl<S: Scope, C: Container> StreamCore<S, C> {
51    /// Connects the stream to a destination.
52    ///
53    /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the
54    /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
55    pub fn connect_to<P: Push<Message<S::Timestamp, C>>+'static>(&self, target: Target, pusher: P, identifier: usize) {
56
57        let mut logging = self.scope().logging();
58        logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
59            id: identifier,
60            scope_addr: self.scope.addr().to_vec(),
61            source: (self.name.node, self.name.port),
62            target: (target.node, target.port),
63            typ: std::any::type_name::<C>().to_string(),
64        }));
65
66        self.scope.add_edge(self.name, target);
67        self.ports.add_pusher(pusher);
68    }
69    /// Allocates a `Stream` from a supplied `Source` name and rendezvous point.
70    pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self {
71        Self { name: source, ports: output, scope }
72    }
73    /// The name of the stream's source operator.
74    pub fn name(&self) -> &Source { &self.name }
75    /// The scope immediately containing the stream.
76    pub fn scope(&self) -> S { self.scope.clone() }
77
78    /// Allows the assertion of a container type, for the benefit of type inference.
79    pub fn container<D: Container>(self) -> StreamCore<S, D> where Self: AsStream<S, D> { self.as_stream() }
80}
81
82/// A type that can be translated to a [StreamCore].
83pub trait AsStream<S: Scope, C> {
84    /// Translate `self` to a [StreamCore].
85    fn as_stream(self) -> StreamCore<S, C>;
86}
87
88impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C> {
89    fn as_stream(self) -> Self { self }
90}
91
92impl<S, C> Debug for StreamCore<S, C>
93where
94    S: Scope,
95{
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("Stream")
98            .field("source", &self.name)
99            .finish_non_exhaustive()
100    }
101}