timely/dataflow/operators/partition.rs
1//! Partition a stream of records into multiple streams.
2
3use crate::container::CapacityContainerBuilder;
4use crate::dataflow::operators::core::Partition as PartitionCore;
5use crate::dataflow::{Scope, Stream};
6use crate::Data;
7
8/// Partition a stream of records into multiple streams.
9pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> {
10 /// Produces `parts` output streams, containing records produced and assigned by `route`.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Partition, Inspect};
15 ///
16 /// timely::example(|scope| {
17 /// let streams = (0..10).to_stream(scope)
18 /// .partition(3, |x| (x % 3, x));
19 ///
20 /// streams[0].inspect(|x| println!("seen 0: {:?}", x));
21 /// streams[1].inspect(|x| println!("seen 1: {:?}", x));
22 /// streams[2].inspect(|x| println!("seen 2: {:?}", x));
23 /// });
24 /// ```
25 fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>;
26}
27
28impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
29 fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
30 PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route)
31 }
32}