timely/dataflow/operators/
broadcast.rs

1//! Broadcast records to all workers.
2
3use crate::ExchangeData;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::operators::{Map, Exchange};
6
7/// Broadcast records to all workers.
8pub trait Broadcast<D: ExchangeData> {
9    /// Broadcast records to all workers.
10    ///
11    /// # Examples
12    /// ```
13    /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect};
14    ///
15    /// timely::example(|scope| {
16    ///     (0..10).to_stream(scope)
17    ///            .broadcast()
18    ///            .inspect(|x| println!("seen: {:?}", x));
19    /// });
20    /// ```
21    fn broadcast(&self) -> Self;
22}
23
24impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
25    fn broadcast(&self) -> Stream<G, D> {
26
27        // NOTE: Simplified implementation due to underlying motion
28        // in timely dataflow internals. Optimize once they have
29        // settled down.
30        let peers = self.scope().peers() as u64;
31        self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone())))
32            .exchange(|ix| ix.0)
33            .map(|(_i,x)| x)
34    }
35}