1use std::rc::Rc;
19use timely::communication::{Pull, Push};
20use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
21use timely::dataflow::channels::{ContainerBytes, Message};
22use timely::logging::TimelyLogger;
23use timely::progress::Timestamp;
24use timely::worker::AsWorker;
25use timely::{Container, ExchangeData};
26
27#[derive(Debug)]
29pub struct Distribute;
30
31impl<T, C> ParallelizationContract<T, C> for Distribute
32where
33 T: Timestamp,
34 C: Container + ContainerBytes + Send + 'static,
35{
36 type Pusher = DistributePusher<LogPusher<T, C, Box<dyn Push<Message<T, C>>>>>;
37 type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;
38
39 fn connect<A: AsWorker>(
40 self,
41 allocator: &mut A,
42 identifier: usize,
43 address: Rc<[usize]>,
44 logging: Option<TimelyLogger>,
45 ) -> (Self::Pusher, Self::Puller) {
46 let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
47 let senders = senders
48 .into_iter()
49 .enumerate()
50 .map(|(i, x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone()))
51 .collect::<Vec<_>>();
52 (
53 DistributePusher::new(senders),
54 LogPuller::new(receiver, allocator.index(), identifier, logging.clone()),
55 )
56 }
57}
58
59pub struct DistributePusher<P> {
63 pushers: Vec<P>,
64 next: usize,
65}
66
67impl<P> DistributePusher<P> {
68 pub fn new(pushers: Vec<P>) -> DistributePusher<P> {
70 Self { pushers, next: 0 }
71 }
72}
73
74impl<T, C, P> Push<Message<T, C>> for DistributePusher<P>
75where
76 T: Eq + ExchangeData,
77 C: Container,
78 P: Push<Message<T, C>>,
79{
80 fn push(&mut self, message: &mut Option<Message<T, C>>) {
81 let worker_idx = self.next;
82 self.next = (self.next + 1) % self.pushers.len();
83 self.pushers[worker_idx].push(message);
84 }
85}