Skip to main content

timely/dataflow/operators/core/
concat.rs

1//! Merges the contents of multiple streams.
2
3use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{Stream, Scope};
6
7/// Merge the contents of two streams.
8pub trait Concat<G: Scope, C> {
9    /// Merge the contents of two streams.
10    ///
11    /// # Examples
12    /// ```
13    /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
14    ///
15    /// timely::example(|scope| {
16    ///
17    ///     let stream = (0..10).to_stream(scope);
18    ///     stream.clone()
19    ///           .concat(stream)
20    ///           .container::<Vec<_>>()
21    ///           .inspect(|x| println!("seen: {:?}", x));
22    /// });
23    /// ```
24    fn concat(self, other: Stream<G, C>) -> Stream<G, C>;
25}
26
27impl<G: Scope, C: Container> Concat<G, C> for Stream<G, C> {
28    fn concat(self, other: Stream<G, C>) -> Stream<G, C> {
29        self.scope().concatenate([self, other])
30    }
31}
32
33/// Merge the contents of multiple streams.
34pub trait Concatenate<G: Scope, C> {
35    /// Merge the contents of multiple streams.
36    ///
37    /// # Examples
38    /// ```
39    /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
40    ///
41    /// timely::example(|scope| {
42    ///
43    ///     let streams = vec![(0..10).to_stream(scope),
44    ///                        (0..10).to_stream(scope),
45    ///                        (0..10).to_stream(scope)];
46    ///
47    ///     scope.concatenate(streams)
48    ///          .container::<Vec<_>>()
49    ///          .inspect(|x| println!("seen: {:?}", x));
50    /// });
51    /// ```
52    fn concatenate<I>(&self, sources: I) -> Stream<G, C>
53    where
54        I: IntoIterator<Item=Stream<G, C>>;
55}
56
57impl<G: Scope, C: Container> Concatenate<G, C> for G {
58    fn concatenate<I>(&self, sources: I) -> Stream<G, C>
59    where
60        I: IntoIterator<Item=Stream<G, C>>
61    {
62
63        // create an operator builder.
64        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
65        let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
66        builder.set_notify(false);
67
68        // create new input handles for each input stream.
69        let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
70
71        // create one output handle for the concatenated results.
72        let (mut output, result) = builder.new_output();
73
74        // build an operator that plays out all input data.
75        builder.build(move |_capability| {
76
77            move |_frontier| {
78                let mut output = output.activate();
79                for handle in handles.iter_mut() {
80                    handle.for_each(|time, data| {
81                        output.give(&time, data);
82                    })
83                }
84            }
85        });
86
87        result
88    }
89}