timely/dataflow/operators/core/
concat.rs

1//! Merges the contents of multiple streams.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{StreamCore, Scope};
6
7/// Merge the contents of two streams.
8pub trait Concat<G: Scope, C> {
9    /// Merge the contents of two streams.
10    ///
11    /// # Examples
12    /// ```
13    /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
14    ///
15    /// timely::example(|scope| {
16    ///
17    ///     let stream = (0..10).to_stream(scope);
18    ///     stream.concat(&stream)
19    ///           .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
23}
24
25impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C> {
26    fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
27        self.scope().concatenate([self.clone(), other.clone()])
28    }
29}
30
31/// Merge the contents of multiple streams.
32pub trait Concatenate<G: Scope, C> {
33    /// Merge the contents of multiple streams.
34    ///
35    /// # Examples
36    /// ```
37    /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
38    ///
39    /// timely::example(|scope| {
40    ///
41    ///     let streams = vec![(0..10).to_stream(scope),
42    ///                        (0..10).to_stream(scope),
43    ///                        (0..10).to_stream(scope)];
44    ///
45    ///     scope.concatenate(streams)
46    ///          .inspect(|x| println!("seen: {:?}", x));
47    /// });
48    /// ```
49    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
50    where
51        I: IntoIterator<Item=StreamCore<G, C>>;
52}
53
54impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
55    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
56    where
57        I: IntoIterator<Item=StreamCore<G, C>>
58    {
59        let clone = self.clone();
60        self.scope().concatenate(Some(clone).into_iter().chain(sources))
61    }
62}
63
64impl<G: Scope, C: Container> Concatenate<G, C> for G {
65    fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
66    where
67        I: IntoIterator<Item=StreamCore<G, C>>
68    {
69
70        // create an operator builder.
71        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
72        let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
73        builder.set_notify(false);
74
75        // create new input handles for each input stream.
76        let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
77
78        // create one output handle for the concatenated results.
79        let (mut output, result) = builder.new_output();
80
81        // build an operator that plays out all input data.
82        builder.build(move |_capability| {
83
84            move |_frontier| {
85                let mut output = output.activate();
86                for handle in handles.iter_mut() {
87                    handle.for_each(|time, data| {
88                        output.session(&time).give_container(data);
89                    })
90                }
91            }
92        });
93
94        result
95    }
96}