timely/dataflow/
stream.rs1use crate::progress::{Source, Target, Timestamp};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::Message;
13use std::fmt::{self, Debug};
14
15pub struct Stream<'scope, T: Timestamp, C> {
20 name: Source,
22 scope: Scope<'scope, T>,
24 ports: TeeHelper<T, C>,
26}
27
28impl<'scope, T: Timestamp, C: Clone+'static> Clone for Stream<'scope, T, C> {
29 fn clone(&self) -> Self {
30 Self {
31 name: self.name,
32 scope: self.scope,
33 ports: self.ports.clone(),
34 }
35 }
36
37 fn clone_from(&mut self, source: &Self) {
38 self.name.clone_from(&source.name);
39 self.scope.clone_from(&source.scope);
40 self.ports.clone_from(&source.ports);
41 }
42}
43
44pub type StreamVec<'scope, T, D> = Stream<'scope, T, Vec<D>>;
46
47impl<'scope, T: Timestamp, C> Stream<'scope, T, C> {
48 pub fn connect_to<P: Push<Message<T, C>>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static {
53
54 let mut logging: Option<crate::logging::TimelyLogger> = self.scope().worker().logging();
55 logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
56 id: identifier,
57 scope_addr: self.scope.addr().to_vec(),
58 source: (self.name.node, self.name.port),
59 target: (target.node, target.port),
60 typ: std::any::type_name::<C>().to_string(),
61 }));
62
63 self.scope.add_edge(self.name, target);
64 self.ports.add_pusher(pusher);
65 }
66 pub fn new(source: Source, output: TeeHelper<T, C>, scope: Scope<'scope, T>) -> Self {
68 Self { name: source, ports: output, scope }
69 }
70 pub fn name(&self) -> &Source { &self.name }
72 pub fn scope(&self) -> Scope<'scope, T> { self.scope }
74
75 pub fn container<C2>(self) -> Stream<'scope, T, C2> where Self: AsStream<'scope, T, C2> { self.as_stream() }
91}
92
93pub trait AsStream<'scope, T: Timestamp, C> {
95 fn as_stream(self) -> Stream<'scope, T, C>;
97}
98
99impl<'scope, T: Timestamp, C> AsStream<'scope, T, C> for Stream<'scope, T, C> {
100 fn as_stream(self) -> Self { self }
101}
102
103impl<'scope, T: Timestamp, C> Debug for Stream<'scope, T, C> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("Stream")
106 .field("source", &self.name)
107 .finish_non_exhaustive()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use crate::dataflow::channels::pact::Pipeline;
114 use crate::dataflow::operators::{Operator, ToStream};
115
116 #[derive(Debug, Eq, PartialEq)]
117 struct NotClone;
118
119 #[test]
120 fn test_non_clone_stream() {
121 crate::example(|scope| {
122 let _ = [NotClone]
123 .to_stream(scope)
124 .container::<Vec<_>>()
125 .sink(Pipeline, "check non-clone", |(input, _frontier)| {
126 input.for_each(|_cap, data| {
127 for datum in data.drain(..) {
128 assert_eq!(datum, NotClone);
129 }
130 });
131 });
132 });
133 }
134}