timely/dataflow/operators/core/concat.rs
1//! Merges the contents of multiple streams.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6
7/// Merge the contents of two streams.
8pub trait Concat<G: Scope, C> {
9 /// Merge the contents of two streams.
10 ///
11 /// # Examples
12 /// ```
13 /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
14 ///
15 /// timely::example(|scope| {
16 ///
17 /// let stream = (0..10).to_stream(scope);
18 /// stream.clone()
19 /// .concat(stream)
20 /// .container::<Vec<_>>()
21 /// .inspect(|x| println!("seen: {:?}", x));
22 /// });
23 /// ```
24 fn concat(self, other: Stream<G, C>) -> Stream<G, C>;
25}
26
27impl<G: Scope, C: Container> Concat<G, C> for Stream<G, C> {
28 fn concat(self, other: Stream<G, C>) -> Stream<G, C> {
29 self.scope().concatenate([self, other])
30 }
31}
32
33/// Merge the contents of multiple streams.
34pub trait Concatenate<G: Scope, C> {
35 /// Merge the contents of multiple streams.
36 ///
37 /// # Examples
38 /// ```
39 /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
40 ///
41 /// timely::example(|scope| {
42 ///
43 /// let streams = vec![(0..10).to_stream(scope),
44 /// (0..10).to_stream(scope),
45 /// (0..10).to_stream(scope)];
46 ///
47 /// scope.concatenate(streams)
48 /// .container::<Vec<_>>()
49 /// .inspect(|x| println!("seen: {:?}", x));
50 /// });
51 /// ```
52 fn concatenate<I>(&self, sources: I) -> Stream<G, C>
53 where
54 I: IntoIterator<Item=Stream<G, C>>;
55}
56
57impl<G: Scope, C: Container> Concatenate<G, C> for G {
58 fn concatenate<I>(&self, sources: I) -> Stream<G, C>
59 where
60 I: IntoIterator<Item=Stream<G, C>>
61 {
62
63 // create an operator builder.
64 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
65 let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
66 builder.set_notify(false);
67
68 // create new input handles for each input stream.
69 let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
70
71 // create one output handle for the concatenated results.
72 let (mut output, result) = builder.new_output();
73
74 // build an operator that plays out all input data.
75 builder.build(move |_capability| {
76
77 move |_frontier| {
78 let mut output = output.activate();
79 for handle in handles.iter_mut() {
80 handle.for_each(|time, data| {
81 output.give(&time, data);
82 })
83 }
84 }
85 });
86
87 result
88 }
89}