use std::rc::Rc;
use timely::communication::{Pull, Push};
use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
use timely::dataflow::channels::Bundle;
use timely::logging::TimelyLogger;
use timely::progress::Timestamp;
use timely::worker::AsWorker;
use timely::{Container, ExchangeData};
#[derive(Debug)]
pub struct Distribute;
impl<T: Timestamp, C: Container + ExchangeData> ParallelizationContract<T, C> for Distribute {
type Pusher = DistributePusher<LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;
fn connect<A: AsWorker>(
self,
allocator: &mut A,
identifier: usize,
address: Rc<[usize]>,
logging: Option<TimelyLogger>,
) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Bundle<T, C>>(identifier, address);
let senders = senders
.into_iter()
.enumerate()
.map(|(i, x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone()))
.collect::<Vec<_>>();
(
DistributePusher::new(senders),
LogPuller::new(receiver, allocator.index(), identifier, logging.clone()),
)
}
}
pub struct DistributePusher<P> {
pushers: Vec<P>,
next: usize,
}
impl<P> DistributePusher<P> {
pub fn new(pushers: Vec<P>) -> DistributePusher<P> {
Self { pushers, next: 0 }
}
}
impl<T, C, P> Push<Bundle<T, C>> for DistributePusher<P>
where
T: Eq + ExchangeData,
C: Container,
P: Push<Bundle<T, C>>,
{
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
let worker_idx = self.next;
self.next = (self.next + 1) % self.pushers.len();
self.pushers[worker_idx].push(message);
}
}