timely/dataflow/operators/core/
concat.rs1use crate::Container;
4use crate::dataflow::channels::pact::Pipeline;
5use crate::dataflow::{StreamCore, Scope};
6
7pub trait Concat<G: Scope, C> {
9 fn concat(&self, _: &StreamCore<G, C>) -> StreamCore<G, C>;
23}
24
25impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C> {
26 fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C> {
27 self.scope().concatenate([self.clone(), other.clone()])
28 }
29}
30
31pub trait Concatenate<G: Scope, C> {
33 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
50 where
51 I: IntoIterator<Item=StreamCore<G, C>>;
52}
53
54impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C> {
55 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
56 where
57 I: IntoIterator<Item=StreamCore<G, C>>
58 {
59 let clone = self.clone();
60 self.scope().concatenate(Some(clone).into_iter().chain(sources))
61 }
62}
63
64impl<G: Scope, C: Container> Concatenate<G, C> for G {
65 fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
66 where
67 I: IntoIterator<Item=StreamCore<G, C>>
68 {
69
70 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
72 let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
73 builder.set_notify(false);
74
75 let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
77
78 let (mut output, result) = builder.new_output();
80
81 builder.build(move |_capability| {
83
84 move |_frontier| {
85 let mut output = output.activate();
86 for handle in handles.iter_mut() {
87 handle.for_each(|time, data| {
88 output.session(&time).give_container(data);
89 })
90 }
91 }
92 });
93
94 result
95 }
96}