timely/dataflow/operators/core/
enterleave.rs1use std::marker::PhantomData;
23use std::rc::Rc;
24
25use crate::logging::{TimelyLogger, MessagesEvent};
26use crate::progress::Timestamp;
27use crate::progress::timestamp::Refines;
28use crate::progress::{Source, Target};
29use crate::{Accountable, Container};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{Counter, Tee};
32use crate::dataflow::channels::Message;
33use crate::dataflow::{Stream, Scope};
34
35pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C> {
37 fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
54}
55
56impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
57where
58 TOuter: Timestamp,
59 TInner: Timestamp + Refines<TOuter>,
60 C: Container,
61{
62 fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {
63
64 let inner_addr = inner.addr();
66 let outer_addr = self.scope().addr();
67 assert!(
68 inner_addr.len() == outer_addr.len() + 1
69 && inner_addr[..outer_addr.len()] == outer_addr[..],
70 "Enter::enter: `inner` is not a child of the stream's scope \
71 (inner addr: {:?}, outer addr: {:?})",
72 inner_addr,
73 outer_addr,
74 );
75
76 let (targets, registrar) = Tee::<TInner, C>::new();
77 let ingress = IngressNub {
78 targets: Counter::new(targets),
79 phantom: PhantomData,
80 activator: inner.activator_for(inner_addr),
81 active: false,
82 };
83 let produced = Rc::clone(ingress.targets.produced());
84 let input = inner.subgraph.borrow_mut().new_input(produced);
85 let channel_id = inner.worker().new_identifier();
86
87 if let Some(logger) = inner.worker().logging() {
88 let pusher = LogPusher::new(ingress, channel_id, inner.index(), logger);
89 self.connect_to(input, pusher, channel_id);
90 } else {
91 self.connect_to(input, ingress, channel_id);
92 }
93
94 Stream::new(Source::new(0, input.port), registrar, inner)
95 }
96}
97
98pub trait Leave<'inner, TInner: Timestamp, C> {
100 fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
119}
120
121impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C>
122where
123 TInner: Timestamp,
124 C: Container,
125{
126 fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
127
128 let scope = self.scope();
129
130 let inner_addr = scope.addr();
132 let outer_addr = outer.addr();
133 assert!(
134 inner_addr.len() == outer_addr.len() + 1
135 && inner_addr[..outer_addr.len()] == outer_addr[..],
136 "Leave::leave: `outer` is not the parent of the stream's scope \
137 (stream addr: {:?}, outer addr: {:?})",
138 inner_addr,
139 outer_addr,
140 );
141
142 let output = scope.subgraph.borrow_mut().new_output();
143 let target = Target::new(0, output.port);
144 let (targets, registrar) = Tee::<TOuter, C>::new();
145 let egress = EgressNub { targets, phantom: PhantomData };
146 let channel_id = scope.worker().new_identifier();
147
148 if let Some(logger) = scope.worker().logging() {
149 let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
150 self.connect_to(target, pusher, channel_id);
151 } else {
152 self.connect_to(target, egress, channel_id);
153 }
154
155 Stream::new(output, registrar, outer)
156 }
157}
158
159
160struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> {
161 targets: Counter<TInner, Tee<TInner, TContainer>>,
162 phantom: PhantomData<TOuter>,
163 activator: crate::scheduling::Activator,
164 active: bool,
165}
166
167impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
168 fn push(&mut self, element: &mut Option<Message<TOuter, TContainer>>) {
169 if let Some(outer_message) = element {
170 let data = ::std::mem::take(&mut outer_message.data);
171 let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data));
172 self.targets.push(&mut inner_message);
173 if let Some(inner_message) = inner_message {
174 outer_message.data = inner_message.data;
175 }
176 self.active = true;
177 }
178 else {
179 if self.active {
180 self.activator.activate();
181 self.active = false;
182 }
183 self.targets.done();
184 }
185 }
186}
187
188
189struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer> {
190 targets: Tee<TOuter, TContainer>,
191 phantom: PhantomData<TInner>,
192}
193
194impl<TOuter, TInner, TContainer: Container> Push<Message<TInner, TContainer>> for EgressNub<TOuter, TInner, TContainer>
195where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, {
196 fn push(&mut self, message: &mut Option<Message<TInner, TContainer>>) {
197 if let Some(inner_message) = message {
198 let data = ::std::mem::take(&mut inner_message.data);
199 let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data));
200 self.targets.push(&mut outer_message);
201 if let Some(outer_message) = outer_message {
202 inner_message.data = outer_message.data;
203 }
204 }
205 else { self.targets.done(); }
206 }
207}
208
209struct LogPusher<P> {
216 pusher: P,
217 channel: usize,
218 counter: usize,
219 index: usize,
220 logger: TimelyLogger,
221}
222
223impl<P> LogPusher<P> {
224 fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
225 Self {
226 pusher,
227 channel,
228 counter: 0,
229 index,
230 logger,
231 }
232 }
233}
234
235impl<T, C, P> Push<Message<T, C>> for LogPusher<P>
236where
237 C: Accountable,
238 P: Push<Message<T, C>>,
239{
240 fn push(&mut self, element: &mut Option<Message<T, C>>) {
241 if let Some(bundle) = element {
242 let send_event = MessagesEvent {
243 is_send: true,
244 channel: self.channel,
245 source: self.index,
246 target: self.index,
247 seq_no: self.counter,
248 record_count: bundle.data.record_count(),
249 };
250 let recv_event = MessagesEvent {
251 is_send: false,
252 ..send_event
253 };
254
255 self.logger.log(send_event);
256 self.logger.log(recv_event);
257 self.counter += 1;
258 }
259
260 self.pusher.push(element);
261 }
262}
263
264#[cfg(test)]
265mod test {
266 #[test]
270 fn test_nested() {
271
272 use crate::dataflow::{InputHandle, ProbeHandle};
273 use crate::dataflow::operators::{vec::Input, Inspect, Probe};
274
275 use crate::dataflow::operators::{Enter, Leave};
276
277 crate::execute(crate::Config::process(3), |worker| {
279
280 let index = worker.index();
281 let mut input = InputHandle::new();
282 let probe = ProbeHandle::new();
283
284 worker.dataflow(|scope| {
286 let data = scope.input_from(&mut input);
287
288 scope.region(|inner| {
289
290 let data = data.enter(inner);
291 inner.region(|inner2| data.enter(inner2).leave(inner)).leave(scope)
292 })
293 .inspect(move |x| println!("worker {}:\thello {}", index, x))
294 .probe_with(&probe);
295 });
296
297 input.advance_to(0);
299 for round in 0..10 {
300 if index == 0 {
301 input.send(round);
302 }
303 input.advance_to(round + 1);
304 while probe.less_than(input.time()) {
305 worker.step_or_park(None);
306 }
307 }
308 }).unwrap();
309 }
310
311}