timely/dataflow/operators/core/
partition.rs

1//! Partition a stream of records into multiple streams.
2
3use timely_container::{Container, ContainerBuilder, PushInto};
4
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8use crate::Data;
9
10/// Partition a stream of records into multiple streams.
11pub trait Partition<G: Scope, C: Container> {
12    /// Produces `parts` output streams, containing records produced and assigned by `route`.
13    ///
14    /// # Examples
15    /// ```
16    /// use timely::dataflow::operators::ToStream;
17    /// use timely::dataflow::operators::core::{Partition, Inspect};
18    /// use timely_container::CapacityContainerBuilder;
19    ///
20    /// timely::example(|scope| {
21    ///     let streams = (0..10).to_stream(scope)
22    ///                          .partition::<CapacityContainerBuilder<Vec<_>>, _, _>(3, |x| (x % 3, x));
23    ///
24    ///     for (idx, stream) in streams.into_iter().enumerate() {
25    ///         stream
26    ///             .inspect(move |x| println!("seen {idx}: {x:?}"));
27    ///     }
28    /// });
29    /// ```
30    fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
31    where
32        CB: ContainerBuilder + PushInto<D2>,
33        CB::Container: Data,
34        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
35}
36
37impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
38    fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
39    where
40        CB: ContainerBuilder + PushInto<D2>,
41        CB::Container: Data,
42        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
43    {
44        let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45
46        let mut input = builder.new_input(self, Pipeline);
47        let mut outputs = Vec::with_capacity(parts as usize);
48        let mut streams = Vec::with_capacity(parts as usize);
49
50        for _ in 0..parts {
51            let (output, stream) = builder.new_output::<CB>();
52            outputs.push(output);
53            streams.push(stream);
54        }
55
56        builder.build(move |_| {
57            move |_frontiers| {
58                let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
59                input.for_each(|time, data| {
60                    let mut sessions = handles
61                        .iter_mut()
62                        .map(|h| h.session_with_builder(&time))
63                        .collect::<Vec<_>>();
64
65                    for datum in data.drain() {
66                        let (part, datum2) = route(datum);
67                        sessions[part as usize].give(datum2);
68                    }
69                });
70            }
71        });
72
73        streams
74    }
75}