timely/dataflow/
stream.rs1use crate::progress::{Source, Target};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::Message;
13use std::fmt::{self, Debug};
14
15pub struct Stream<S: Scope, C> {
22 name: Source,
24 scope: S,
26 ports: TeeHelper<S::Timestamp, C>,
28}
29
30impl<S: Scope, C: Clone+'static> Clone for Stream<S, C> {
31 fn clone(&self) -> Self {
32 Self {
33 name: self.name,
34 scope: self.scope.clone(),
35 ports: self.ports.clone(),
36 }
37 }
38
39 fn clone_from(&mut self, source: &Self) {
40 self.name.clone_from(&source.name);
41 self.scope.clone_from(&source.scope);
42 self.ports.clone_from(&source.ports);
43 }
44}
45
46pub type StreamVec<S, D> = Stream<S, Vec<D>>;
48
49impl<S: Scope, C> Stream<S, C> {
50 pub fn connect_to<P: Push<Message<S::Timestamp, C>>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static {
55
56 let mut logging = self.scope().logging();
57 logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
58 id: identifier,
59 scope_addr: self.scope.addr().to_vec(),
60 source: (self.name.node, self.name.port),
61 target: (target.node, target.port),
62 typ: std::any::type_name::<C>().to_string(),
63 }));
64
65 self.scope.add_edge(self.name, target);
66 self.ports.add_pusher(pusher);
67 }
68 pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self {
70 Self { name: source, ports: output, scope }
71 }
72 pub fn name(&self) -> &Source { &self.name }
74 pub fn scope(&self) -> S { self.scope.clone() }
76
77 pub fn container<C2>(self) -> Stream<S, C2> where Self: AsStream<S, C2> { self.as_stream() }
93}
94
95pub trait AsStream<S: Scope, C> {
97 fn as_stream(self) -> Stream<S, C>;
99}
100
101impl<S: Scope, C> AsStream<S, C> for Stream<S, C> {
102 fn as_stream(self) -> Self { self }
103}
104
105impl<S, C> Debug for Stream<S, C>
106where
107 S: Scope,
108{
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("Stream")
111 .field("source", &self.name)
112 .finish_non_exhaustive()
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use crate::dataflow::channels::pact::Pipeline;
119 use crate::dataflow::operators::{Operator, ToStream};
120
121 #[derive(Debug, Eq, PartialEq)]
122 struct NotClone;
123
124 #[test]
125 fn test_non_clone_stream() {
126 crate::example(|scope| {
127 let _ = [NotClone]
128 .to_stream(scope)
129 .container::<Vec<_>>()
130 .sink(Pipeline, "check non-clone", |(input, _frontier)| {
131 input.for_each(|_cap, data| {
132 for datum in data.drain(..) {
133 assert_eq!(datum, NotClone);
134 }
135 });
136 });
137 });
138 }
139}