Skip to main content

timely/dataflow/operators/vec/
partition.rs

1//! Partition a stream of records into multiple streams.
2
3use crate::container::CapacityContainerBuilder;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::core::Partition as PartitionCore;
6use crate::dataflow::StreamVec;
7
8/// Partition a stream of records into multiple streams.
9pub trait Partition<'scope, T: Timestamp, D: 'static> {
10    /// Produces `parts` output streams, containing records produced and assigned by `route`.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Partition};
15    ///
16    /// timely::example(|scope| {
17    ///     let mut streams = (0..10).to_stream(scope)
18    ///                              .partition(3, |x| (x % 3, x));
19    ///
20    ///     streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
21    ///     streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
22    ///     streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
23    /// });
24    /// ```
25    fn partition<D2: 'static, F: Fn(D) -> (u64, D2)+'static>(self, parts: u64, route: F) -> Vec<StreamVec<'scope, T, D2>>;
26}
27
28impl<'scope, T: Timestamp, D: 'static> Partition<'scope, T, D> for StreamVec<'scope, T, D> {
29    fn partition<D2: 'static, F: Fn(D)->(u64, D2)+'static>(self, parts: u64, route: F) -> Vec<StreamVec<'scope, T, D2>> {
30        PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route)
31    }
32}