timely/dataflow/operators/core/
partition.rs

1//! Partition a stream of records into multiple streams.
2use std::collections::BTreeMap;
3
4use crate::container::{DrainContainer, PushInto};
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{Scope, StreamCore};
8use crate::{Container, ContainerBuilder};
9
10/// Partition a stream of records into multiple streams.
11pub trait Partition<G: Scope, C: DrainContainer> {
12    /// Produces `parts` output streams, containing records produced and assigned by `route`.
13    ///
14    /// # Examples
15    /// ```
16    /// use timely::dataflow::operators::ToStream;
17    /// use timely::dataflow::operators::core::{Partition, Inspect};
18    /// use timely_container::CapacityContainerBuilder;
19    ///
20    /// timely::example(|scope| {
21    ///     let streams = (0..10).to_stream(scope)
22    ///                          .partition::<CapacityContainerBuilder<Vec<_>>, _, _>(3, |x| (x % 3, x));
23    ///
24    ///     for (idx, stream) in streams.into_iter().enumerate() {
25    ///         stream
26    ///             .inspect(move |x| println!("seen {idx}: {x:?}"));
27    ///     }
28    /// });
29    /// ```
30    fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
31    where
32        CB: ContainerBuilder + PushInto<D2>,
33        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
34}
35
36impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for StreamCore<G, C> {
37    fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
38    where
39        CB: ContainerBuilder + PushInto<D2>,
40        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
41    {
42        let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
43        builder.set_notify(false);
44
45        let mut input = builder.new_input(self, Pipeline);
46        let mut outputs = Vec::with_capacity(parts as usize);
47        let mut streams = Vec::with_capacity(parts as usize);
48
49        let mut c_build = CB::default();
50
51        for _ in 0..parts {
52            let (output, stream) = builder.new_output::<CB::Container>();
53            outputs.push(output);
54            streams.push(stream);
55        }
56
57        builder.build(move |_| {
58            move |_frontiers| {
59                let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
60                let mut targets = BTreeMap::<u64,Vec<_>>::default();
61                input.for_each_time(|time, data| {
62                    // Sort data by intended output.
63                    for datum in data.flat_map(|d| d.drain()) {
64                        let (part, datum) = route(datum);
65                        targets.entry(part).or_default().push(datum);
66                    }
67                    // Form each intended output into a container and ship.
68                    while let Some((part, data)) = targets.pop_first() {
69                        for datum in data.into_iter() {
70                            c_build.push_into(datum);
71                            while let Some(container) = c_build.extract() {
72                                handles[part as usize].give(&time, container);
73                            }
74                        }
75                        while let Some(container) = c_build.finish() {
76                            handles[part as usize].give(&time, container);
77                        }
78                    }
79                });
80            }
81        });
82
83        streams
84    }
85}