timely/dataflow/operators/core/
partition.rs1use std::collections::BTreeMap;
3
4use crate::container::{DrainContainer, PushInto};
5use crate::progress::Timestamp;
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
8use crate::dataflow::Stream;
9use crate::{Container, ContainerBuilder};
10
11pub trait Partition<'scope, T: Timestamp, C: DrainContainer> {
13 fn partition<CB, D2, F>(self, parts: u64, route: F) -> Vec<Stream<'scope, T, CB::Container>>
33 where
34 CB: ContainerBuilder + PushInto<D2>,
35 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
36}
37
38impl<'scope, T: Timestamp, C: Container + DrainContainer> Partition<'scope, T, C> for Stream<'scope, T, C> {
39 fn partition<CB, D2, F>(self, parts: u64, mut route: F) -> Vec<Stream<'scope, T, CB::Container>>
40 where
41 CB: ContainerBuilder + PushInto<D2>,
42 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
43 {
44 let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45
46 let mut input = builder.new_input(self, Pipeline);
47 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
48 let mut outputs = Vec::with_capacity(parts as usize);
49 let mut streams = Vec::with_capacity(parts as usize);
50
51 let mut c_build = CB::default();
52
53 for _ in 0..parts {
54 let (output, stream) = builder.new_output::<CB::Container>();
55 outputs.push(output);
56 streams.push(stream);
57 }
58
59 builder.build(move |_| {
60 move |_frontiers| {
61 let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
62 let mut targets = BTreeMap::<u64,Vec<_>>::default();
63 input.for_each_time(|time, data| {
64 for datum in data.flat_map(|d| d.drain()) {
66 let (part, datum) = route(datum);
67 targets.entry(part).or_default().push(datum);
68 }
69 while let Some((part, data)) = targets.pop_first() {
71 for datum in data.into_iter() {
72 c_build.push_into(datum);
73 while let Some(container) = c_build.extract() {
74 handles[part as usize].give(&time, container);
75 }
76 }
77 while let Some(container) = c_build.finish() {
78 handles[part as usize].give(&time, container);
79 }
80 }
81 });
82 }
83 });
84
85 streams
86 }
87}