timely/dataflow/operators/core/
exchange.rs

1//! Exchange records between workers.
2
3use crate::ExchangeData;
4use crate::container::{Container, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, StreamCore};
8
9/// Exchange records between workers.
10pub trait Exchange<C: Container> {
11    /// Exchange records between workers.
12    ///
13    /// The closure supplied should map a reference to a record to a `u64`,
14    /// whose value determines to which worker the record will be routed.
15    ///
16    /// # Examples
17    /// ```
18    /// use timely::dataflow::operators::{ToStream, Exchange, Inspect};
19    ///
20    /// timely::example(|scope| {
21    ///     (0..10).to_stream(scope)
22    ///            .exchange(|x| *x)
23    ///            .inspect(|x| println!("seen: {:?}", x));
24    /// });
25    /// ```
26    fn exchange<F>(&self, route: F) -> Self
27    where
28        for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
29}
30
31impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
32where
33    C: SizableContainer + ExchangeData + crate::dataflow::channels::ContainerBytes,
34    C: for<'a> PushInto<C::Item<'a>>,
35
36{
37    fn exchange<F>(&self, route: F) -> StreamCore<G, C>
38    where
39        for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
40    {
41        self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
42            move |input, output| {
43                input.for_each(|time, data| {
44                    output.session(&time).give_container(data);
45                });
46            }
47        })
48    }
49}