timely/dataflow/operators/core/concat.rs
1//! Merges the contents of multiple streams.
2
3use crate::Container;
4use crate::progress::Timestamp;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{Stream, Scope};
7
8/// Merge the contents of two streams.
9pub trait Concat{
10 /// Merge the contents of two streams.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
15 ///
16 /// timely::example(|scope| {
17 ///
18 /// let stream = (0..10).to_stream(scope);
19 /// stream.clone()
20 /// .concat(stream)
21 /// .container::<Vec<_>>()
22 /// .inspect(|x| println!("seen: {:?}", x));
23 /// });
24 /// ```
25 fn concat(self, other: Self) -> Self;
26}
27
28impl<'scope, T: Timestamp, C: Container> Concat for Stream<'scope, T, C> {
29 fn concat(self, other: Self) -> Self {
30 self.scope().concatenate([self, other])
31 }
32}
33
34/// Merge the contents of multiple streams.
35pub trait Concatenate<'scope, T: Timestamp> {
36 /// Merge the contents of multiple streams.
37 ///
38 /// # Examples
39 /// ```
40 /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
41 ///
42 /// timely::example(|scope| {
43 ///
44 /// let streams = vec![(0..10).to_stream(scope),
45 /// (0..10).to_stream(scope),
46 /// (0..10).to_stream(scope)];
47 ///
48 /// scope.concatenate(streams)
49 /// .container::<Vec<_>>()
50 /// .inspect(|x| println!("seen: {:?}", x));
51 /// });
52 /// ```
53 fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
54 where
55 I: IntoIterator<Item=Stream<'scope, T, C>>;
56}
57
58impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {
59 fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
60 where
61 I: IntoIterator<Item=Stream<'scope, T, C>>
62 {
63
64 // create an operator builder.
65 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
66 let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self);
67
68 // create new input handles for each input stream.
69 let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
70 for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); }
71
72 // create one output handle for the concatenated results.
73 let (mut output, result) = builder.new_output();
74
75 // build an operator that plays out all input data.
76 builder.build(move |_capability| {
77
78 move |_frontier| {
79 let mut output = output.activate();
80 for handle in handles.iter_mut() {
81 handle.for_each(|time, data| {
82 output.give(&time, data);
83 })
84 }
85 }
86 });
87
88 result
89 }
90}