Skip to main content

timely/dataflow/operators/vec/
broadcast.rs

1//! Broadcast records to all workers.
2
3use crate::ExchangeData;
4use crate::progress::Timestamp;
5use crate::dataflow::StreamVec;
6use crate::dataflow::operators::{vec::Map, Exchange};
7
8/// Broadcast records to all workers.
9pub trait Broadcast<D: ExchangeData> {
10    /// Broadcast records to all workers.
11    ///
12    /// # Examples
13    /// ```
14    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Broadcast};
15    ///
16    /// timely::example(|scope| {
17    ///     (0..10).to_stream(scope)
18    ///            .broadcast()
19    ///            .inspect(|x| println!("seen: {:?}", x));
20    /// });
21    /// ```
22    fn broadcast(self) -> Self;
23}
24
25impl<T: Timestamp, D: ExchangeData + Clone> Broadcast<D> for StreamVec<'_, T, D> {
26    fn broadcast(self) -> Self {
27
28        // NOTE: Simplified implementation due to underlying motion
29        // in timely dataflow internals. Optimize once they have
30        // settled down.
31        let peers = self.scope().peers() as u64;
32        self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone())))
33            .exchange(|ix| ix.0)
34            .map(|(_i,x)| x)
35    }
36}