Skip to main content

timely/dataflow/operators/core/
partition.rs

1//! Partition a stream of records into multiple streams.
2use std::collections::BTreeMap;
3
4use crate::container::{DrainContainer, PushInto};
5use crate::progress::Timestamp;
6use crate::dataflow::channels::pact::Pipeline;
7use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
8use crate::dataflow::Stream;
9use crate::{Container, ContainerBuilder};
10
11/// Partition a stream of records into multiple streams.
12pub trait Partition<'scope, T: Timestamp, C: DrainContainer> {
13    /// Produces `parts` output streams, containing records produced and assigned by `route`.
14    ///
15    /// # Examples
16    /// ```
17    /// use timely::dataflow::operators::{ToStream, Inspect};
18    /// use timely::dataflow::operators::core::Partition;
19    /// use timely_container::CapacityContainerBuilder;
20    ///
21    /// timely::example(|scope| {
22    ///     let streams = (0..10).to_stream(scope)
23    ///                          .container::<Vec<_>>()
24    ///                          .partition::<CapacityContainerBuilder<Vec<_>>, _, _>(3, |x| (x % 3, x));
25    ///
26    ///     for (idx, stream) in streams.into_iter().enumerate() {
27    ///         stream
28    ///             .inspect(move |x| println!("seen {idx}: {x:?}"));
29    ///     }
30    /// });
31    /// ```
32    fn partition<CB, D2, F>(self, parts: u64, route: F) -> Vec<Stream<'scope, T, CB::Container>>
33    where
34        CB: ContainerBuilder + PushInto<D2>,
35        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
36}
37
38impl<'scope, T: Timestamp, C: Container + DrainContainer> Partition<'scope, T, C> for Stream<'scope, T, C> {
39    fn partition<CB, D2, F>(self, parts: u64, mut route: F) -> Vec<Stream<'scope, T, CB::Container>>
40    where
41        CB: ContainerBuilder + PushInto<D2>,
42        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
43    {
44        let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
45
46        let mut input = builder.new_input(self, Pipeline);
47        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
48        let mut outputs = Vec::with_capacity(parts as usize);
49        let mut streams = Vec::with_capacity(parts as usize);
50
51        let mut c_build = CB::default();
52
53        for _ in 0..parts {
54            let (output, stream) = builder.new_output::<CB::Container>();
55            outputs.push(output);
56            streams.push(stream);
57        }
58
59        builder.build(move |_| {
60            move |_frontiers| {
61                let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
62                let mut targets = BTreeMap::<u64,Vec<_>>::default();
63                input.for_each_time(|time, data| {
64                    // Sort data by intended output.
65                    for datum in data.flat_map(|d| d.drain()) {
66                        let (part, datum) = route(datum);
67                        targets.entry(part).or_default().push(datum);
68                    }
69                    // Form each intended output into a container and ship.
70                    while let Some((part, data)) = targets.pop_first() {
71                        for datum in data.into_iter() {
72                            c_build.push_into(datum);
73                            while let Some(container) = c_build.extract() {
74                                handles[part as usize].give(&time, container);
75                            }
76                        }
77                        while let Some(container) = c_build.finish() {
78                            handles[part as usize].give(&time, container);
79                        }
80                    }
81                });
82            }
83        });
84
85        streams
86    }
87}