timely/dataflow/operators/broadcast.rs
1//! Broadcast records to all workers.
2
3use crate::ExchangeData;
4use crate::dataflow::{Stream, Scope};
5use crate::dataflow::operators::{Map, Exchange};
6
7/// Broadcast records to all workers.
8pub trait Broadcast<D: ExchangeData> {
9 /// Broadcast records to all workers.
10 ///
11 /// # Examples
12 /// ```
13 /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect};
14 ///
15 /// timely::example(|scope| {
16 /// (0..10).to_stream(scope)
17 /// .broadcast()
18 /// .inspect(|x| println!("seen: {:?}", x));
19 /// });
20 /// ```
21 fn broadcast(&self) -> Self;
22}
23
24impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
25 fn broadcast(&self) -> Stream<G, D> {
26
27 // NOTE: Simplified implementation due to underlying motion
28 // in timely dataflow internals. Optimize once they have
29 // settled down.
30 let peers = self.scope().peers() as u64;
31 self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone())))
32 .exchange(|ix| ix.0)
33 .map(|(_i,x)| x)
34 }
35}