timely/dataflow/operators/core/
enterleave.rs
1use std::marker::PhantomData;
23use std::rc::Rc;
24
25use crate::logging::{TimelyLogger, MessagesEvent};
26use crate::progress::Timestamp;
27use crate::progress::timestamp::Refines;
28use crate::progress::{Source, Target};
29use crate::{Container, Data};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{Counter, Tee};
32use crate::dataflow::channels::Message;
33use crate::worker::AsWorker;
34use crate::dataflow::{StreamCore, Scope};
35use crate::dataflow::scopes::Child;
36
37pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
39 fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
54}
55
56impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
57 fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
58
59 use crate::scheduling::Scheduler;
60
61 let (targets, registrar) = Tee::<T, C>::new();
62 let ingress = IngressNub {
63 targets: Counter::new(targets),
64 phantom: PhantomData,
65 activator: scope.activator_for(scope.addr()),
66 active: false,
67 };
68 let produced = Rc::clone(ingress.targets.produced());
69 let input = scope.subgraph.borrow_mut().new_input(produced);
70 let channel_id = scope.clone().new_identifier();
71
72 if let Some(logger) = scope.logging() {
73 let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
74 self.connect_to(input, pusher, channel_id);
75 } else {
76 self.connect_to(input, ingress, channel_id);
77 }
78
79 StreamCore::new(
80 Source::new(0, input.port),
81 registrar,
82 scope.clone(),
83 )
84 }
85}
86
87pub trait Leave<G: Scope, C: Container> {
89 fn leave(&self) -> StreamCore<G, C>;
104}
105
106impl<G: Scope, C: Container + Data, T: Timestamp+Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C> {
107 fn leave(&self) -> StreamCore<G, C> {
108
109 let scope = self.scope();
110
111 let output = scope.subgraph.borrow_mut().new_output();
112 let target = Target::new(0, output.port);
113 let (targets, registrar) = Tee::<G::Timestamp, C>::new();
114 let egress = EgressNub { targets, phantom: PhantomData };
115 let channel_id = scope.clone().new_identifier();
116
117 if let Some(logger) = scope.logging() {
118 let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
119 self.connect_to(target, pusher, channel_id);
120 } else {
121 self.connect_to(target, egress, channel_id);
122 }
123
124 StreamCore::new(
125 output,
126 registrar,
127 scope.parent,
128 )
129 }
130}
131
132
133struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> {
134 targets: Counter<TInner, TContainer, Tee<TInner, TContainer>>,
135 phantom: ::std::marker::PhantomData<TOuter>,
136 activator: crate::scheduling::Activator,
137 active: bool,
138}
139
140impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Container + Data> Push<Message<TOuter, TContainer>> for IngressNub<TOuter, TInner, TContainer> {
141 fn push(&mut self, element: &mut Option<Message<TOuter, TContainer>>) {
142 if let Some(outer_message) = element {
143 let data = ::std::mem::take(&mut outer_message.data);
144 let mut inner_message = Some(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0));
145 self.targets.push(&mut inner_message);
146 if let Some(inner_message) = inner_message {
147 outer_message.data = inner_message.data;
148 }
149 self.active = true;
150 }
151 else {
152 if self.active {
153 self.activator.activate();
154 self.active = false;
155 }
156 self.targets.done();
157 }
158 }
159}
160
161
162struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Data> {
163 targets: Tee<TOuter, TContainer>,
164 phantom: PhantomData<TInner>,
165}
166
167impl<TOuter, TInner, TContainer: Container> Push<Message<TInner, TContainer>> for EgressNub<TOuter, TInner, TContainer>
168where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TContainer: Data {
169 fn push(&mut self, message: &mut Option<Message<TInner, TContainer>>) {
170 if let Some(inner_message) = message {
171 let data = ::std::mem::take(&mut inner_message.data);
172 let mut outer_message = Some(Message::new(inner_message.time.clone().to_outer(), data, 0, 0));
173 self.targets.push(&mut outer_message);
174 if let Some(outer_message) = outer_message {
175 inner_message.data = outer_message.data;
176 }
177 }
178 else { self.targets.done(); }
179 }
180}
181
182struct LogPusher<P> {
189 pusher: P,
190 channel: usize,
191 counter: usize,
192 index: usize,
193 logger: TimelyLogger,
194}
195
196impl<P> LogPusher<P> {
197 fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
198 Self {
199 pusher,
200 channel,
201 counter: 0,
202 index,
203 logger,
204 }
205 }
206}
207
208impl<T, C, P> Push<Message<T, C>> for LogPusher<P>
209where
210 C: Container,
211 P: Push<Message<T, C>>,
212{
213 fn push(&mut self, element: &mut Option<Message<T, C>>) {
214 if let Some(bundle) = element {
215 let send_event = MessagesEvent {
216 is_send: true,
217 channel: self.channel,
218 source: self.index,
219 target: self.index,
220 seq_no: self.counter,
221 length: bundle.data.len(),
222 };
223 let recv_event = MessagesEvent {
224 is_send: false,
225 ..send_event
226 };
227
228 self.logger.log(send_event);
229 self.logger.log(recv_event);
230 self.counter += 1;
231 }
232
233 self.pusher.push(element);
234 }
235}
236
237#[cfg(test)]
238mod test {
239 #[test]
243 fn test_nested() {
244
245 use crate::dataflow::{InputHandle, ProbeHandle};
246 use crate::dataflow::operators::{Input, Inspect, Probe};
247
248 use crate::dataflow::Scope;
249 use crate::dataflow::operators::{Enter, Leave};
250
251 crate::execute(crate::Config::process(3), |worker| {
253
254 let index = worker.index();
255 let mut input = InputHandle::new();
256 let probe = ProbeHandle::new();
257
258 worker.dataflow(|scope| {
260 let data = scope.input_from(&mut input);
261
262 scope.region(|inner| {
263
264 let data = data.enter(inner);
265 inner.region(|inner2| data.enter(inner2).leave()).leave()
266 })
267 .inspect(move |x| println!("worker {}:\thello {}", index, x))
268 .probe_with(&probe);
269 });
270
271 input.advance_to(0);
273 for round in 0..10 {
274 if index == 0 {
275 input.send(round);
276 }
277 input.advance_to(round + 1);
278 while probe.less_than(input.time()) {
279 worker.step_or_park(None);
280 }
281 }
282 }).unwrap();
283 }
284
285}