timely/dataflow/operators/core/
enterleave.rs

1//! Extension traits to move a `Stream` between an outer `Scope` and inner `Scope`.
2//!
3//! Each `Stream` indicates its containing `Scope` as part of its type signature. To create a new
4//! stream with the same contents in another scope, one must explicit use the methods `enter` and
5//! `leave`, to clearly indicate the transition to the timely dataflow progress tracking logic.
6//!
7//! # Examples
8//! ```
9//! use timely::dataflow::scopes::Scope;
10//! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect};
11//!
12//! timely::example(|outer| {
13//!     let stream = (0..9).to_stream(outer);
14//!     let output = outer.region(|inner| {
15//!         stream.enter(inner)
16//!               .inspect(|x| println!("in nested scope: {:?}", x))
17//!               .leave()
18//!     });
19//! });
20//! ```
21
22use std::marker::PhantomData;
23use std::rc::Rc;
24
25use crate::logging::{TimelyLogger, MessagesEvent};
26use crate::progress::Timestamp;
27use crate::progress::timestamp::Refines;
28use crate::progress::{Source, Target};
29use crate::{Container, Data};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{Counter, Tee};
32use crate::dataflow::channels::Message;
33use crate::worker::AsWorker;
34use crate::dataflow::{StreamCore, Scope};
35use crate::dataflow::scopes::Child;
36
37/// Extension trait to move a `Stream` into a child of its current `Scope`.
38pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
39    /// Moves the `Stream` argument into a child of its current `Scope`.
40    ///
41    /// # Examples
42    /// ```
43    /// use timely::dataflow::scopes::Scope;
44    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
45    ///
46    /// timely::example(|outer| {
47    ///     let stream = (0..9).to_stream(outer);
48    ///     let output = outer.region(|inner| {
49    ///         stream.enter(inner).leave()
50    ///     });
51    /// });
52    /// ```
53    fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
54}
55
56impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
57    fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
58
59        use crate::scheduling::Scheduler;
60
61        let (targets, registrar) = Tee::<T, C>::new();
62        let ingress = IngressNub {
63            targets: Counter::new(targets),
64            phantom: PhantomData,
65            activator: scope.activator_for(scope.addr()),
66            active: false,
67        };
68        let produced = Rc::clone(ingress.targets.produced());
69        let input = scope.subgraph.borrow_mut().new_input(produced);
70        let channel_id = scope.clone().new_identifier();
71
72        if let Some(logger) = scope.logging() {
73            let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
74            self.connect_to(input, pusher, channel_id);
75        } else {
76            self.connect_to(input, ingress, channel_id);
77        }
78
79        StreamCore::new(
80            Source::new(0, input.port),
81            registrar,
82            scope.clone(),
83        )
84    }
85}
86
87/// Extension trait to move a `Stream` to the parent of its current `Scope`.
88pub trait Leave<G: Scope, C: Container> {
89    /// Moves a `Stream` to the parent of its current `Scope`.
90    ///
91    /// # Examples
92    /// ```
93    /// use timely::dataflow::scopes::Scope;
94    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
95    ///
96    /// timely::example(|outer| {
97    ///     let stream = (0..9).to_stream(outer);
98    ///     let output = outer.region(|inner| {
99    ///         stream.enter(inner).leave()
100    ///     });
101    /// });
102    /// ```
103    fn leave(&self) -> StreamCore<G, C>;
104}
105
106impl<G: Scope, C: Container + Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
107    fn leave(&self) -> StreamCore<G, C> {
108
109        let scope = self.scope();
110
111        let output = scope.subgraph.borrow_mut().new_output();
112        let target = Target::new(0, output.port);
113        let (targets, registrar) = Tee::<G::Timestamp, C>::new();
114        let egress = EgressNub { targets, phantom: PhantomData };
115        let channel_id = scope.clone().new_identifier();
116
117        if let Some(logger) = scope.logging() {
118            let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
119            self.connect_to(target, pusher, channel_id);
120        } else {
121            self.connect_to(target, egress, channel_id);
122        }
123
124        StreamCore::new(
125            output,
126            registrar,
127            scope.parent,
128        )
129    }
130}
131
132
133struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> {
134    targets: Counter<TInner, TContainer, Tee<TInner, TContainer>>,
135    phantom: ::std::marker::PhantomData<TOuter>,
136    activator: crate::scheduling::Activator,
137    active: bool,
138}
139
140impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
141    fn push(&mut self, element: &mut Option<Message<TOuter, TContainer>>) {
142        if let Some(outer_message) = element {
143            let data = ::std::mem::take(&mut outer_message.data);
144            let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0));
145            self.targets.push(&mut inner_message);
146            if let Some(inner_message) = inner_message {
147                outer_message.data = inner_message.data;
148            }
149            self.active = true;
150        }
151        else {
152            if self.active {
153                self.activator.activate();
154                self.active = false;
155            }
156            self.targets.done();
157        }
158    }
159}
160
161
162struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Data> {
163    targets: Tee<TOuter, TContainer>,
164    phantom: PhantomData<TInner>,
165}
166
167impl<TOuter, TInner, TContainer: Container> Push<Message<TInner, TContainer>> for EgressNub<TOuter, TInner, TContainer>
168where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Data {
169    fn push(&mut self, message: &mut Option<Message<TInner, TContainer>>) {
170        if let Some(inner_message) = message {
171            let data = ::std::mem::take(&mut inner_message.data);
172            let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data, 0, 0));
173            self.targets.push(&mut outer_message);
174            if let Some(outer_message) = outer_message {
175                inner_message.data = outer_message.data;
176            }
177        }
178        else { self.targets.done(); }
179    }
180}
181
182/// A pusher that logs messages passing through it.
183///
184/// This type performs the same function as the `LogPusher` and `LogPuller` types in
185/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave`
186/// channels because those don't have a puller connected. Thus, this pusher needs to log both the
187/// send and the receive `MessageEvent`.
188struct LogPusher<P> {
189    pusher: P,
190    channel: usize,
191    counter: usize,
192    index: usize,
193    logger: TimelyLogger,
194}
195
196impl<P> LogPusher<P> {
197    fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
198        Self {
199            pusher,
200            channel,
201            counter: 0,
202            index,
203            logger,
204        }
205    }
206}
207
208impl<T, C, P> Push<Message<T, C>> for LogPusher<P>
209where
210    C: Container,
211    P: Push<Message<T, C>>,
212{
213    fn push(&mut self, element: &mut Option<Message<T, C>>) {
214        if let Some(bundle) = element {
215            let send_event = MessagesEvent {
216                is_send: true,
217                channel: self.channel,
218                source: self.index,
219                target: self.index,
220                seq_no: self.counter,
221                length: bundle.data.len(),
222            };
223            let recv_event = MessagesEvent {
224                is_send: false,
225                ..send_event
226            };
227
228            self.logger.log(send_event);
229            self.logger.log(recv_event);
230            self.counter += 1;
231        }
232
233        self.pusher.push(element);
234    }
235}
236
237#[cfg(test)]
238mod test {
239    /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.
240    ///
241    /// This is for issue: https://github.com/TimelyDataflow/timely-dataflow/issues/377
242    #[test]
243    fn test_nested() {
244
245        use crate::dataflow::{InputHandle, ProbeHandle};
246        use crate::dataflow::operators::{Input, Inspect, Probe};
247
248        use crate::dataflow::Scope;
249        use crate::dataflow::operators::{Enter, Leave};
250
251        // initializes and runs a timely dataflow.
252        crate::execute(crate::Config::process(3), |worker| {
253
254            let index = worker.index();
255            let mut input = InputHandle::new();
256            let probe = ProbeHandle::new();
257
258            // create a new input, exchange data, and inspect its output
259            worker.dataflow(|scope| {
260                let data = scope.input_from(&mut input);
261
262                scope.region(|inner| {
263
264                    let data = data.enter(inner);
265                    inner.region(|inner2| data.enter(inner2).leave()).leave()
266                })
267                    .inspect(move |x| println!("worker {}:\thello {}", index, x))
268                    .probe_with(&probe);
269            });
270
271            // introduce data and watch!
272            input.advance_to(0);
273            for round in 0..10 {
274                if index == 0 {
275                    input.send(round);
276                }
277                input.advance_to(round + 1);
278                while probe.less_than(input.time()) {
279                    worker.step_or_park(None);
280                }
281            }
282        }).unwrap();
283    }
284
285}