timely/dataflow/operators/core/
partition.rs
1use timely_container::{Container, ContainerBuilder, PushInto};
4
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8use crate::Data;
9
10pub trait Partition<G: Scope, C: Container> {
12 fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
31 where
32 CB: ContainerBuilder + PushInto<D2>,
33 CB::Container: Data,
34 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
35}
36
37impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
38 fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
39 where
40 CB: ContainerBuilder + PushInto<D2>,
41 CB::Container: Data,
42 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
43 {
44 let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45
46 let mut input = builder.new_input(self, Pipeline);
47 let mut outputs = Vec::with_capacity(parts as usize);
48 let mut streams = Vec::with_capacity(parts as usize);
49
50 for _ in 0..parts {
51 let (output, stream) = builder.new_output::<CB>();
52 outputs.push(output);
53 streams.push(stream);
54 }
55
56 builder.build(move |_| {
57 move |_frontiers| {
58 let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
59 input.for_each(|time, data| {
60 let mut sessions = handles
61 .iter_mut()
62 .map(|h| h.session_with_builder(&time))
63 .collect::<Vec<_>>();
64
65 for datum in data.drain() {
66 let (part, datum2) = route(datum);
67 sessions[part as usize].give(datum2);
68 }
69 });
70 }
71 });
72
73 streams
74 }
75}