timely/dataflow/operators/core/
partition.rs1use timely_container::{Container, ContainerBuilder, PushInto};
4
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::operators::InputCapability;
8use crate::dataflow::{Scope, StreamCore};
9use crate::Data;
10
11pub trait Partition<G: Scope, C: Container> {
13 fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
32 where
33 CB: ContainerBuilder + PushInto<D2>,
34 CB::Container: Data,
35 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
36}
37
38impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
39 fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
40 where
41 CB: ContainerBuilder + PushInto<D2>,
42 CB::Container: Data,
43 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
44 {
45 let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
46 builder.set_notify(false);
47
48 let mut input = builder.new_input(self, Pipeline);
49 let mut outputs = Vec::with_capacity(parts as usize);
50 let mut streams = Vec::with_capacity(parts as usize);
51
52 for _ in 0..parts {
53 let (output, stream) = builder.new_output::<CB>();
54 outputs.push(output);
55 streams.push(stream);
56 }
57
58 builder.build(move |_| {
59 let mut todo = vec![];
60 move |_frontiers| {
61 let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
62
63 let mut sessions_cap: Option<InputCapability<G::Timestamp>> = None;
65 let mut sessions = vec![];
66
67 while let Some((cap, data)) = input.next() {
68 todo.push((cap, std::mem::take(data)));
69 }
70 todo.sort_unstable_by(|a, b| a.0.cmp(&b.0));
71
72 for (cap, mut data) in todo.drain(..) {
73 if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) {
74 sessions = handles.iter_mut().map(|h| (None, Some(h))).collect();
75 sessions_cap = Some(cap);
76 }
77 for datum in data.drain() {
78 let (part, datum2) = route(datum);
79
80 let session = match sessions[part as usize] {
81 (Some(ref mut s), _) => s,
82 (ref mut session_slot, ref mut handle) => {
83 let handle = handle.take().unwrap();
84 let session = handle.session_with_builder(sessions_cap.as_ref().unwrap());
85 session_slot.insert(session)
86 }
87 };
88 session.give(datum2);
89 }
90 }
91 }
92 });
93
94 streams
95 }
96}