Skip to main content

timely/dataflow/operators/vec/
partition.rs

1//! Partition a stream of records into multiple streams.
2
3use crate::container::CapacityContainerBuilder;
4use crate::dataflow::operators::core::Partition as PartitionCore;
5use crate::dataflow::{Scope, StreamVec};
6
7/// Partition a stream of records into multiple streams.
8pub trait Partition<G: Scope, D: 'static, D2: 'static, F: Fn(D) -> (u64, D2)> {
9    /// Produces `parts` output streams, containing records produced and assigned by `route`.
10    ///
11    /// # Examples
12    /// ```
13    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Partition};
14    ///
15    /// timely::example(|scope| {
16    ///     let mut streams = (0..10).to_stream(scope)
17    ///                              .partition(3, |x| (x % 3, x));
18    ///
19    ///     streams.pop().unwrap().inspect(|x| println!("seen 2: {:?}", x));
20    ///     streams.pop().unwrap().inspect(|x| println!("seen 1: {:?}", x));
21    ///     streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x));
22    /// });
23    /// ```
24    fn partition(self, parts: u64, route: F) -> Vec<StreamVec<G, D2>>;
25}
26
27impl<G: Scope, D: 'static, D2: 'static, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for StreamVec<G, D> {
28    fn partition(self, parts: u64, route: F) -> Vec<StreamVec<G, D2>> {
29        PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route)
30    }
31}