1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
//! Merges the contents of multiple streams.
use crate::{Container, Data};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{StreamCore, Scope};
/// Merge the contents of two streams.
pub trait Concat<G: Scope, C: Container> {
/// Merge the contents of two streams.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Concat, Inspect};
///
/// timely::example(|scope| {
///
/// let stream = (0..10).to_stream(scope);
/// stream.concat(&stream)
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
}
impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
self.scope().concatenate([self.clone(), other.clone()])
}
}
/// Merge the contents of multiple streams.
pub trait Concatenate<G: Scope, C: Container> {
/// Merge the contents of multiple streams.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
///
/// timely::example(|scope| {
///
/// let streams = vec![(0..10).to_stream(scope),
/// (0..10).to_stream(scope),
/// (0..10).to_stream(scope)];
///
/// scope.concatenate(streams)
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=StreamCore<G, C>>;
}
impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=StreamCore<G, C>>
{
let clone = self.clone();
self.scope().concatenate(Some(clone).into_iter().chain(sources))
}
}
impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
where
I: IntoIterator<Item=StreamCore<G, C>>
{
// create an operator builder.
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
// create new input handles for each input stream.
let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
// create one output handle for the concatenated results.
let (mut output, result) = builder.new_output();
// build an operator that plays out all input data.
builder.build(move |_capability| {
move |_frontier| {
let mut output = output.activate();
for handle in handles.iter_mut() {
handle.for_each(|time, data| {
output.session(&time).give_container(data);
})
}
}
});
result
}
}