timely/dataflow/operators/vec/broadcast.rs
1//! Broadcast records to all workers.
2
3use crate::ExchangeData;
4use crate::progress::Timestamp;
5use crate::dataflow::StreamVec;
6use crate::dataflow::operators::{vec::Map, Exchange};
7
8/// Broadcast records to all workers.
9pub trait Broadcast<D: ExchangeData> {
10 /// Broadcast records to all workers.
11 ///
12 /// # Examples
13 /// ```
14 /// use timely::dataflow::operators::{ToStream, Inspect, vec::Broadcast};
15 ///
16 /// timely::example(|scope| {
17 /// (0..10).to_stream(scope)
18 /// .broadcast()
19 /// .inspect(|x| println!("seen: {:?}", x));
20 /// });
21 /// ```
22 fn broadcast(self) -> Self;
23}
24
25impl<T: Timestamp, D: ExchangeData + Clone> Broadcast<D> for StreamVec<'_, T, D> {
26 fn broadcast(self) -> Self {
27
28 // NOTE: Simplified implementation due to underlying motion
29 // in timely dataflow internals. Optimize once they have
30 // settled down.
31 let peers = self.scope().peers() as u64;
32 self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone())))
33 .exchange(|ix| ix.0)
34 .map(|(_i,x)| x)
35 }
36}