timely/dataflow/operators/core/
concat.rs
1use crate::{Container, Data};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{StreamCore, Scope};
7
8pub trait Concat<G: Scope, C: Container> {
10 fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
24}
25
26impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C> {
27 fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
28 self.scope().concatenate([self.clone(), other.clone()])
29 }
30}
31
32pub trait Concatenate<G: Scope, C: Container> {
34 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
51 where
52 I: IntoIterator<Item=StreamCore<G, C>>;
53}
54
55impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C> {
56 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
57 where
58 I: IntoIterator<Item=StreamCore<G, C>>
59 {
60 let clone = self.clone();
61 self.scope().concatenate(Some(clone).into_iter().chain(sources))
62 }
63}
64
65impl<G: Scope, C: Container + Data> Concatenate<G, C> for G {
66 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
67 where
68 I: IntoIterator<Item=StreamCore<G, C>>
69 {
70
71 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
73 let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
74
75 let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
77
78 let (mut output, result) = builder.new_output();
80
81 builder.build(move |_capability| {
83
84 move |_frontier| {
85 let mut output = output.activate();
86 for handle in handles.iter_mut() {
87 handle.for_each(|time, data| {
88 output.session(&time).give_container(data);
89 })
90 }
91 }
92 });
93
94 result
95 }
96}