timely/dataflow/operators/core/
partition.rs1use std::collections::BTreeMap;
3
4use crate::container::{DrainContainer, PushInto};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8use crate::{Container, ContainerBuilder};
9
10pub trait Partition<G: Scope, C: DrainContainer> {
12 fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
31 where
32 CB: ContainerBuilder + PushInto<D2>,
33 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
34}
35
36impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for StreamCore<G, C> {
37 fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
38 where
39 CB: ContainerBuilder + PushInto<D2>,
40 F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
41 {
42 let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
43 builder.set_notify(false);
44
45 let mut input = builder.new_input(self, Pipeline);
46 let mut outputs = Vec::with_capacity(parts as usize);
47 let mut streams = Vec::with_capacity(parts as usize);
48
49 let mut c_build = CB::default();
50
51 for _ in 0..parts {
52 let (output, stream) = builder.new_output::<CB::Container>();
53 outputs.push(output);
54 streams.push(stream);
55 }
56
57 builder.build(move |_| {
58 move |_frontiers| {
59 let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
60 let mut targets = BTreeMap::<u64,Vec<_>>::default();
61 input.for_each_time(|time, data| {
62 for datum in data.flat_map(|d| d.drain()) {
64 let (part, datum) = route(datum);
65 targets.entry(part).or_default().push(datum);
66 }
67 while let Some((part, data)) = targets.pop_first() {
69 for datum in data.into_iter() {
70 c_build.push_into(datum);
71 while let Some(container) = c_build.extract() {
72 handles[part as usize].give(&time, container);
73 }
74 }
75 while let Some(container) = c_build.finish() {
76 handles[part as usize].give(&time, container);
77 }
78 }
79 });
80 }
81 });
82
83 streams
84 }
85}