timely/dataflow/operators/core/
partition.rs

1//! Partition a stream of records into multiple streams.
2
3use timely_container::{Container, ContainerBuilder, PushInto};
4
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::operators::InputCapability;
8use crate::dataflow::{Scope, StreamCore};
9use crate::Data;
10
11/// Partition a stream of records into multiple streams.
12pub trait Partition<G: Scope, C: Container> {
13    /// Produces `parts` output streams, containing records produced and assigned by `route`.
14    ///
15    /// # Examples
16    /// ```
17    /// use timely::dataflow::operators::ToStream;
18    /// use timely::dataflow::operators::core::{Partition, Inspect};
19    /// use timely_container::CapacityContainerBuilder;
20    ///
21    /// timely::example(|scope| {
22    ///     let streams = (0..10).to_stream(scope)
23    ///                          .partition::<CapacityContainerBuilder<Vec<_>>, _, _>(3, |x| (x % 3, x));
24    ///
25    ///     for (idx, stream) in streams.into_iter().enumerate() {
26    ///         stream
27    ///             .inspect(move |x| println!("seen {idx}: {x:?}"));
28    ///     }
29    /// });
30    /// ```
31    fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
32    where
33        CB: ContainerBuilder + PushInto<D2>,
34        CB::Container: Data,
35        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
36}
37
38impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
39    fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
40    where
41        CB: ContainerBuilder + PushInto<D2>,
42        CB::Container: Data,
43        F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
44    {
45        let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
46        builder.set_notify(false);
47
48        let mut input = builder.new_input(self, Pipeline);
49        let mut outputs = Vec::with_capacity(parts as usize);
50        let mut streams = Vec::with_capacity(parts as usize);
51
52        for _ in 0..parts {
53            let (output, stream) = builder.new_output::<CB>();
54            outputs.push(output);
55            streams.push(stream);
56        }
57
58        builder.build(move |_| {
59            let mut todo = vec![];
60            move |_frontiers| {
61                let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>();
62
63                // The capability associated with each session in `sessions`.
64                let mut sessions_cap: Option<InputCapability<G::Timestamp>> = None;
65                let mut sessions = vec![];
66
67                while let Some((cap, data)) = input.next() {
68                    todo.push((cap, std::mem::take(data)));
69                }
70                todo.sort_unstable_by(|a, b| a.0.cmp(&b.0));
71
72                for (cap, mut data) in todo.drain(..) {
73                    if sessions_cap.as_ref().map_or(true, |s_cap| s_cap.time() != cap.time()) {
74                        sessions = handles.iter_mut().map(|h| (None, Some(h))).collect();
75                        sessions_cap = Some(cap);
76                    }
77                    for datum in data.drain() {
78                        let (part, datum2) = route(datum);
79
80                        let session = match sessions[part as usize] {
81                            (Some(ref mut s), _) => s,
82                            (ref mut session_slot, ref mut handle) => {
83                                let handle = handle.take().unwrap();
84                                let session = handle.session_with_builder(sessions_cap.as_ref().unwrap());
85                                session_slot.insert(session)
86                            }
87                        };
88                        session.give(datum2);
89                    }
90                }
91            }
92        });
93
94        streams
95    }
96}