Skip to main content

timely/dataflow/operators/core/
enterleave.rs

1//! Extension traits to move a `Stream` between an outer `Scope` and inner `Scope`.
2//!
3//! Each `Stream` indicates its containing `Scope` as part of its type signature. To create a new
4//! stream with the same contents in another scope, one must explicit use the methods `enter` and
5//! `leave`, to clearly indicate the transition to the timely dataflow progress tracking logic.
6//!
7//! # Examples
8//! ```
9//! use timely::dataflow::scope::Scope;
10//! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect};
11//!
12//! timely::example(|outer| {
13//!     let stream = (0..9).to_stream(outer).container::<Vec<_>>();
14//!     let output = outer.region(|inner| {
15//!         stream.enter(inner)
16//!               .inspect(|x| println!("in nested scope: {:?}", x))
17//!               .leave(outer)
18//!     });
19//! });
20//! ```
21
22use std::marker::PhantomData;
23use std::rc::Rc;
24
25use crate::logging::{TimelyLogger, MessagesEvent};
26use crate::progress::Timestamp;
27use crate::progress::timestamp::Refines;
28use crate::progress::{Source, Target};
29use crate::{Accountable, Container};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{Counter, Tee};
32use crate::dataflow::channels::Message;
33use crate::dataflow::{Stream, Scope};
34
35/// Extension trait to move a `Stream` into a child of its current `Scope`.
36pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
37    /// Moves the `Stream` argument into a child of its current `Scope`.
38    ///
39    /// The destination scope must be a child of the stream's scope.
40    /// The method checks this property at runtime, and will panic if not respected.
41    ///
42    /// # Examples
43    /// ```
44    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
45    ///
46    /// timely::example(|outer| {
47    ///     let stream = (0..9).to_stream(outer).container::<Vec<_>>();
48    ///     let output = outer.region(|inner| {
49    ///         stream.enter(inner).leave(outer)
50    ///     });
51    /// });
52    /// ```
53    fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
54}
55
56impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
57where
58    TOuter: Timestamp,
59    TInner: Timestamp + Refines<TOuter>,
60    C: Container,
61{
62    fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {
63
64        // Validate that `inner` is a child of `self`'s scope.
65        let inner_addr = inner.addr();
66        let outer_addr = self.scope().addr();
67        assert!(
68            inner_addr.len() == outer_addr.len() + 1
69                && inner_addr[..outer_addr.len()] == outer_addr[..],
70            "Enter::enter: `inner` is not a child of the stream's scope \
71            (inner addr: {:?}, outer addr: {:?})",
72            inner_addr,
73            outer_addr,
74        );
75
76        let (targets, registrar) = Tee::<TInner, C>::new();
77        let ingress = IngressNub {
78            targets: Counter::new(targets),
79            phantom: PhantomData,
80            activator: inner.activator_for(inner_addr),
81            active: false,
82        };
83        let produced = Rc::clone(ingress.targets.produced());
84        let input = inner.subgraph.borrow_mut().new_input(produced);
85        let channel_id = inner.worker().new_identifier();
86
87        if let Some(logger) = inner.worker().logging() {
88            let pusher = LogPusher::new(ingress, channel_id, inner.index(), logger);
89            self.connect_to(input, pusher, channel_id);
90        } else {
91            self.connect_to(input, ingress, channel_id);
92        }
93
94        Stream::new(Source::new(0, input.port), registrar, inner)
95    }
96}
97
98/// Extension trait to move a `Stream` to the parent of its current `Scope`.
99pub trait Leave<'inner, TInner: Timestamp, C> {
100    /// Moves a `Stream` to the parent of its current `Scope`.
101    ///
102    /// The parent scope must be supplied as an argument.
103    ///
104    /// The destination scope must be the parent of the stream's scope.
105    /// The method checks this property at runtime, and will panic if not respected.
106    ///
107    /// # Examples
108    /// ```
109    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
110    ///
111    /// timely::example(|outer| {
112    ///     let stream = (0..9).to_stream(outer).container::<Vec<_>>();
113    ///     let output = outer.region(|inner| {
114    ///         stream.enter(inner).leave(outer)
115    ///     });
116    /// });
117    /// ```
118    fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
119}
120
121impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C>
122where
123    TInner: Timestamp,
124    C: Container,
125{
126    fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
127
128        let scope = self.scope();
129
130        // Validate that `self`'s scope is a child of `outer`.
131        let inner_addr = scope.addr();
132        let outer_addr = outer.addr();
133        assert!(
134            inner_addr.len() == outer_addr.len() + 1
135                && inner_addr[..outer_addr.len()] == outer_addr[..],
136            "Leave::leave: `outer` is not the parent of the stream's scope \
137            (stream addr: {:?}, outer addr: {:?})",
138            inner_addr,
139            outer_addr,
140        );
141
142        let output = scope.subgraph.borrow_mut().new_output();
143        let target = Target::new(0, output.port);
144        let (targets, registrar) = Tee::<TOuter, C>::new();
145        let egress = EgressNub { targets, phantom: PhantomData };
146        let channel_id = scope.worker().new_identifier();
147
148        if let Some(logger) = scope.worker().logging() {
149            let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
150            self.connect_to(target, pusher, channel_id);
151        } else {
152            self.connect_to(target, egress, channel_id);
153        }
154
155        Stream::new(output, registrar, outer)
156    }
157}
158
159
160struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
161    targets: Counter<TInner, Tee<TInner, TContainer>>,
162    phantom: PhantomData<TOuter>,
163    activator: crate::scheduling::Activator,
164    active: bool,
165}
166
167impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
168    fn push(&mut self, element: &mut Option<Message<TOuter, TContainer>>) {
169        if let Some(outer_message) = element {
170            let data = ::std::mem::take(&mut outer_message.data);
171            let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data));
172            self.targets.push(&mut inner_message);
173            if let Some(inner_message) = inner_message {
174                outer_message.data = inner_message.data;
175            }
176            self.active = true;
177        }
178        else {
179            if self.active {
180                self.activator.activate();
181                self.active = false;
182            }
183            self.targets.done();
184        }
185    }
186}
187
188
189struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer> {
190    targets: Tee<TOuter, TContainer>,
191    phantom: PhantomData<TInner>,
192}
193
194impl<TOuter, TInner, TContainer: Container> Push<Message<TInner, TContainer>> for EgressNub<TOuter, TInner, TContainer>
195where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, {
196    fn push(&mut self, message: &mut Option<Message<TInner, TContainer>>) {
197        if let Some(inner_message) = message {
198            let data = ::std::mem::take(&mut inner_message.data);
199            let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data));
200            self.targets.push(&mut outer_message);
201            if let Some(outer_message) = outer_message {
202                inner_message.data = outer_message.data;
203            }
204        }
205        else { self.targets.done(); }
206    }
207}
208
209/// A pusher that logs messages passing through it.
210///
211/// This type performs the same function as the `LogPusher` and `LogPuller` types in
212/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave`
213/// channels because those don't have a puller connected. Thus, this pusher needs to log both the
214/// send and the receive `MessageEvent`.
215struct LogPusher<P> {
216    pusher: P,
217    channel: usize,
218    counter: usize,
219    index: usize,
220    logger: TimelyLogger,
221}
222
223impl<P> LogPusher<P> {
224    fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
225        Self {
226            pusher,
227            channel,
228            counter: 0,
229            index,
230            logger,
231        }
232    }
233}
234
235impl<T, C, P> Push<Message<T, C>> for LogPusher<P>
236where
237    C: Accountable,
238    P: Push<Message<T, C>>,
239{
240    fn push(&mut self, element: &mut Option<Message<T, C>>) {
241        if let Some(bundle) = element {
242            let send_event = MessagesEvent {
243                is_send: true,
244                channel: self.channel,
245                source: self.index,
246                target: self.index,
247                seq_no: self.counter,
248                record_count: bundle.data.record_count(),
249            };
250            let recv_event = MessagesEvent {
251                is_send: false,
252                ..send_event
253            };
254
255            self.logger.log(send_event);
256            self.logger.log(recv_event);
257            self.counter += 1;
258        }
259
260        self.pusher.push(element);
261    }
262}
263
264#[cfg(test)]
265mod test {
266    /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.
267    ///
268    /// This is for issue: https://github.com/TimelyDataflow/timely-dataflow/issues/377
269    #[test]
270    fn test_nested() {
271
272        use crate::dataflow::{InputHandle, ProbeHandle};
273        use crate::dataflow::operators::{vec::Input, Inspect, Probe};
274
275        use crate::dataflow::operators::{Enter, Leave};
276
277        // initializes and runs a timely dataflow.
278        crate::execute(crate::Config::process(3), |worker| {
279
280            let index = worker.index();
281            let mut input = InputHandle::new();
282            let probe = ProbeHandle::new();
283
284            // create a new input, exchange data, and inspect its output
285            worker.dataflow(|scope| {
286                let data = scope.input_from(&mut input);
287
288                scope.region(|inner| {
289
290                    let data = data.enter(inner);
291                    inner.region(|inner2| data.enter(inner2).leave(inner)).leave(scope)
292                })
293                    .inspect(move |x| println!("worker {}:\thello {}", index, x))
294                    .probe_with(&probe);
295            });
296
297            // introduce data and watch!
298            input.advance_to(0);
299            for round in 0..10 {
300                if index == 0 {
301                    input.send(round);
302                }
303                input.advance_to(round + 1);
304                while probe.less_than(input.time()) {
305                    worker.step_or_park(None);
306                }
307            }
308        }).unwrap();
309    }
310
311}