timely/dataflow/
stream.rs1use crate::progress::{Source, Target};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::Message;
13use std::fmt::{self, Debug};
14use crate::Container;
15
16pub struct StreamCore<S: Scope, C> {
23 name: Source,
25 scope: S,
27 ports: TeeHelper<S::Timestamp, C>,
29}
30
31impl<S: Scope, C> Clone for StreamCore<S, C> {
32 fn clone(&self) -> Self {
33 Self {
34 name: self.name,
35 scope: self.scope.clone(),
36 ports: self.ports.clone(),
37 }
38 }
39
40 fn clone_from(&mut self, source: &Self) {
41 self.name.clone_from(&source.name);
42 self.scope.clone_from(&source.scope);
43 self.ports.clone_from(&source.ports);
44 }
45}
46
47pub type Stream<S, D> = StreamCore<S, Vec<D>>;
49
50impl<S: Scope, C: Container> StreamCore<S, C> {
51 pub fn connect_to<P: Push<Message<S::Timestamp, C>>+'static>(&self, target: Target, pusher: P, identifier: usize) {
56
57 let mut logging = self.scope().logging();
58 logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
59 id: identifier,
60 scope_addr: self.scope.addr().to_vec(),
61 source: (self.name.node, self.name.port),
62 target: (target.node, target.port),
63 typ: std::any::type_name::<C>().to_string(),
64 }));
65
66 self.scope.add_edge(self.name, target);
67 self.ports.add_pusher(pusher);
68 }
69 pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self {
71 Self { name: source, ports: output, scope }
72 }
73 pub fn name(&self) -> &Source { &self.name }
75 pub fn scope(&self) -> S { self.scope.clone() }
77
78 pub fn container<D: Container>(self) -> StreamCore<S, D> where Self: AsStream<S, D> { self.as_stream() }
80}
81
82pub trait AsStream<S: Scope, C> {
84 fn as_stream(self) -> StreamCore<S, C>;
86}
87
88impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C> {
89 fn as_stream(self) -> Self { self }
90}
91
92impl<S, C> Debug for StreamCore<S, C>
93where
94 S: Scope,
95{
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("Stream")
98 .field("source", &self.name)
99 .finish_non_exhaustive()
100 }
101}