timely/dataflow/operators/core/
concat.rs

1//! Merges the contents of multiple streams.
2
3
4use crate::{Container, Data};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{StreamCore, Scope};
7
8/// Merge the contents of two streams.
9pub trait Concat<G: Scope, C: Container> {
10    /// Merge the contents of two streams.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///
18    ///     let stream = (0..10).to_stream(scope);
19    ///     stream.concat(&stream)
20    ///           .inspect(|x| println!("seen: {:?}", x));
21    /// });
22    /// ```
23    fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
24}
25
26impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
27    fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
28        self.scope().concatenate([self.clone(), other.clone()])
29    }
30}
31
32/// Merge the contents of multiple streams.
33pub trait Concatenate<G: Scope, C: Container> {
34    /// Merge the contents of multiple streams.
35    ///
36    /// # Examples
37    /// ```
38    /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
39    ///
40    /// timely::example(|scope| {
41    ///
42    ///     let streams = vec![(0..10).to_stream(scope),
43    ///                        (0..10).to_stream(scope),
44    ///                        (0..10).to_stream(scope)];
45    ///
46    ///     scope.concatenate(streams)
47    ///          .inspect(|x| println!("seen: {:?}", x));
48    /// });
49    /// ```
50    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
51    where
52        I: IntoIterator<Item=StreamCore<G, C>>;
53}
54
55impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
56    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
57    where
58        I: IntoIterator<Item=StreamCore<G, C>>
59    {
60        let clone = self.clone();
61        self.scope().concatenate(Some(clone).into_iter().chain(sources))
62    }
63}
64
65impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
66    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
67    where
68        I: IntoIterator<Item=StreamCore<G, C>>
69    {
70
71        // create an operator builder.
72        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
73        let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
74
75        // create new input handles for each input stream.
76        let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
77
78        // create one output handle for the concatenated results.
79        let (mut output, result) = builder.new_output();
80
81        // build an operator that plays out all input data.
82        builder.build(move |_capability| {
83
84            move |_frontier| {
85                let mut output = output.activate();
86                for handle in handles.iter_mut() {
87                    handle.for_each(|time, data| {
88                        output.session(&time).give_container(data);
89                    })
90                }
91            }
92        });
93
94        result
95    }
96}