1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! Merges the contents of multiple streams.


use crate::{Container, Data};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{StreamCore, Scope};

/// Merge the contents of two streams.
pub trait Concat<G: Scope, C: Container> {
    /// Merge the contents of two streams.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
    ///
    /// timely::example(|scope| {
    ///
    ///     let stream = (0..10).to_stream(scope);
    ///     stream.concat(&stream)
    ///           .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
}

impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
    fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
        self.scope().concatenate([self.clone(), other.clone()])
    }
}

/// Merge the contents of multiple streams.
pub trait Concatenate<G: Scope, C: Container> {
    /// Merge the contents of multiple streams.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
    ///
    /// timely::example(|scope| {
    ///
    ///     let streams = vec![(0..10).to_stream(scope),
    ///                        (0..10).to_stream(scope),
    ///                        (0..10).to_stream(scope)];
    ///
    ///     scope.concatenate(streams)
    ///          .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
    where
        I: IntoIterator<Item=StreamCore<G, C>>;
}

impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
    where
        I: IntoIterator<Item=StreamCore<G, C>>
    {
        let clone = self.clone();
        self.scope().concatenate(Some(clone).into_iter().chain(sources))
    }
}

impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
    where
        I: IntoIterator<Item=StreamCore<G, C>>
    {

        // create an operator builder.
        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
        let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());

        // create new input handles for each input stream.
        let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();

        // create one output handle for the concatenated results.
        let (mut output, result) = builder.new_output();

        // build an operator that plays out all input data.
        builder.build(move |_capability| {

            move |_frontier| {
                let mut output = output.activate();
                for handle in handles.iter_mut() {
                    handle.for_each(|time, data| {
                        output.session(&time).give_container(data);
                    })
                }
            }
        });

        result
    }
}