use std::marker::PhantomData;
use crate::logging::{TimelyLogger, MessagesEvent};
use crate::progress::Timestamp;
use crate::progress::timestamp::Refines;
use crate::progress::{Source, Target};
use crate::order::Product;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::channels::pushers::{CounterCore, TeeCore};
use crate::dataflow::channels::{BundleCore, Message};
use crate::worker::AsWorker;
use crate::dataflow::{StreamCore, Scope, Stream};
use crate::dataflow::scopes::{Child, ScopeParent};
use crate::dataflow::operators::delay::Delay;
pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
}
use crate::dataflow::scopes::child::Iterative;
pub trait EnterAt<G: Scope, T: Timestamp, D: Data> {
fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream<Iterative<'a, G, T>, D> ;
}
impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Timestamp, T>, Vec<D>>> EnterAt<G, T, D> for E {
fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) ->
Stream<Iterative<'a, G, T>, D> {
self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum)))
}
}
impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
use crate::scheduling::Scheduler;
let (targets, registrar) = TeeCore::<T, C>::new();
let ingress = IngressNub {
targets: CounterCore::new(targets),
phantom: PhantomData,
activator: scope.activator_for(&scope.addr()),
active: false,
};
let produced = ingress.targets.produced().clone();
let input = scope.subgraph.borrow_mut().new_input(produced);
let channel_id = scope.clone().new_identifier();
if let Some(logger) = scope.logging() {
let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
self.connect_to(input, pusher, channel_id);
} else {
self.connect_to(input, ingress, channel_id);
}
StreamCore::new(
Source::new(0, input.port),
registrar,
scope.clone(),
)
}
}
pub trait Leave<G: Scope, D: Container> {
fn leave(&self) -> StreamCore<G, D>;
}
impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, D> for StreamCore<Child<'a, G, T>, D> {
fn leave(&self) -> StreamCore<G, D> {
let scope = self.scope();
let output = scope.subgraph.borrow_mut().new_output();
let target = Target::new(0, output.port);
let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
let egress = EgressNub { targets, phantom: PhantomData };
let channel_id = scope.clone().new_identifier();
if let Some(logger) = scope.logging() {
let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
self.connect_to(target, pusher, channel_id);
} else {
self.connect_to(target, egress, channel_id);
}
StreamCore::new(
output,
registrar,
scope.parent,
)
}
}
struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
targets: CounterCore<TInner, TData, TeeCore<TInner, TData>>,
phantom: ::std::marker::PhantomData<TOuter>,
activator: crate::scheduling::Activator,
active: bool,
}
impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> Push<BundleCore<TOuter, TData>> for IngressNub<TOuter, TInner, TData> {
fn push(&mut self, element: &mut Option<BundleCore<TOuter, TData>>) {
if let Some(message) = element {
let outer_message = message.as_mut();
let data = ::std::mem::take(&mut outer_message.data);
let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0)));
self.targets.push(&mut inner_message);
if let Some(inner_message) = inner_message {
if let Some(inner_message) = inner_message.if_typed() {
outer_message.data = inner_message.data;
}
}
self.active = true;
}
else {
if self.active {
self.activator.activate();
self.active = false;
}
self.targets.done();
}
}
}
struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data> {
targets: TeeCore<TOuter, TData>,
phantom: PhantomData<TInner>,
}
impl<TOuter, TInner, TData: Container> Push<BundleCore<TInner, TData>> for EgressNub<TOuter, TInner, TData>
where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data {
fn push(&mut self, message: &mut Option<BundleCore<TInner, TData>>) {
if let Some(message) = message {
let inner_message = message.as_mut();
let data = ::std::mem::take(&mut inner_message.data);
let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0)));
self.targets.push(&mut outer_message);
if let Some(outer_message) = outer_message {
if let Some(outer_message) = outer_message.if_typed() {
inner_message.data = outer_message.data;
}
}
}
else { self.targets.done(); }
}
}
struct LogPusher<P> {
pusher: P,
channel: usize,
counter: usize,
index: usize,
logger: TimelyLogger,
}
impl<P> LogPusher<P> {
fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
Self {
pusher,
channel,
counter: 0,
index,
logger,
}
}
}
impl<T, D, P> Push<BundleCore<T, D>> for LogPusher<P>
where
D: Container,
P: Push<BundleCore<T, D>>,
{
fn push(&mut self, element: &mut Option<BundleCore<T, D>>) {
if let Some(bundle) = element {
let send_event = MessagesEvent {
is_send: true,
channel: self.channel,
source: self.index,
target: self.index,
seq_no: self.counter,
length: bundle.data.len(),
};
let recv_event = MessagesEvent {
is_send: false,
..send_event
};
self.logger.log(send_event);
self.logger.log(recv_event);
self.counter += 1;
}
self.pusher.push(element);
}
}
#[cfg(test)]
mod test {
#[test]
fn test_nested() {
use crate::dataflow::{InputHandle, ProbeHandle};
use crate::dataflow::operators::{Input, Inspect, Probe};
use crate::dataflow::Scope;
use crate::dataflow::operators::{Enter, Leave};
crate::execute(crate::Config::process(3), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();
worker.dataflow(|scope| {
let data = scope.input_from(&mut input);
scope.region(|inner| {
let data = data.enter(inner);
inner.region(|inner2| data.enter(inner2).leave()).leave()
})
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe_with(&mut probe);
});
input.advance_to(0);
for round in 0..10 {
if index == 0 {
input.send(round);
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step_or_park(None);
}
}
}).unwrap();
}
}