Skip to main content

timely/dataflow/operators/core/
concat.rs

1//! Merges the contents of multiple streams.
2
3use crate::Container;
4use crate::progress::Timestamp;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{Stream, Scope};
7
8/// Merge the contents of two streams.
9pub trait Concat{
10    /// Merge the contents of two streams.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Concat, Inspect};
15    ///
16    /// timely::example(|scope| {
17    ///
18    ///     let stream = (0..10).to_stream(scope);
19    ///     stream.clone()
20    ///           .concat(stream)
21    ///           .container::<Vec<_>>()
22    ///           .inspect(|x| println!("seen: {:?}", x));
23    /// });
24    /// ```
25    fn concat(self, other: Self) -> Self;
26}
27
28impl<'scope, T: Timestamp, C: Container> Concat for Stream<'scope, T, C> {
29    fn concat(self, other: Self) -> Self {
30        self.scope().concatenate([self, other])
31    }
32}
33
34/// Merge the contents of multiple streams.
35pub trait Concatenate<'scope, T: Timestamp> {
36    /// Merge the contents of multiple streams.
37    ///
38    /// # Examples
39    /// ```
40    /// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
41    ///
42    /// timely::example(|scope| {
43    ///
44    ///     let streams = vec![(0..10).to_stream(scope),
45    ///                        (0..10).to_stream(scope),
46    ///                        (0..10).to_stream(scope)];
47    ///
48    ///     scope.concatenate(streams)
49    ///          .container::<Vec<_>>()
50    ///          .inspect(|x| println!("seen: {:?}", x));
51    /// });
52    /// ```
53    fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
54    where
55        I: IntoIterator<Item=Stream<'scope, T, C>>;
56}
57
58impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {
59    fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
60    where
61        I: IntoIterator<Item=Stream<'scope, T, C>>
62    {
63
64        // create an operator builder.
65        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
66        let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self);
67
68        // create new input handles for each input stream.
69        let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
70        for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); }
71
72        // create one output handle for the concatenated results.
73        let (mut output, result) = builder.new_output();
74
75        // build an operator that plays out all input data.
76        builder.build(move |_capability| {
77
78            move |_frontier| {
79                let mut output = output.activate();
80                for handle in handles.iter_mut() {
81                    handle.for_each(|time, data| {
82                        output.give(&time, data);
83                    })
84                }
85            }
86        });
87
88        result
89    }
90}