timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::Tee;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, Data};
16use crate::container::{ContainerBuilder, CapacityContainerBuilder};
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1: Container> {
20 /// Creates a new dataflow operator that partitions its input stream by a parallelization
21 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23 ///
24 /// # Examples
25 /// ```
26 /// use std::collections::HashMap;
27 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28 /// use timely::dataflow::operators::generic::Operator;
29 /// use timely::dataflow::channels::pact::Pipeline;
30 ///
31 /// timely::example(|scope| {
32 /// (0u64..10).to_stream(scope)
33 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
34 /// let mut cap = Some(default_cap.delayed(&12));
35 /// let mut notificator = FrontierNotificator::default();
36 /// let mut stash = HashMap::new();
37 /// move |input, output| {
38 /// if let Some(ref c) = cap.take() {
39 /// output.session(&c).give(12);
40 /// }
41 /// while let Some((time, data)) = input.next() {
42 /// stash.entry(time.time().clone())
43 /// .or_insert(Vec::new())
44 /// .extend(data.drain(..));
45 /// }
46 /// notificator.for_each(&[input.frontier()], |time, _not| {
47 /// if let Some(mut vec) = stash.remove(time.time()) {
48 /// output.session(&time).give_iterator(vec.drain(..));
49 /// }
50 /// });
51 /// }
52 /// })
53 /// .container::<Vec<_>>();
54 /// });
55 /// ```
56 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57 where
58 CB: ContainerBuilder,
59 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
61 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
62 P: ParallelizationContract<G::Timestamp, C1>;
63
64 /// Creates a new dataflow operator that partitions its input stream by a parallelization
65 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67 ///
68 /// # Examples
69 /// ```
70 /// use std::collections::HashMap;
71 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72 /// use timely::dataflow::operators::generic::Operator;
73 /// use timely::dataflow::channels::pact::Pipeline;
74 ///
75 /// timely::example(|scope| {
76 /// (0u64..10)
77 /// .to_stream(scope)
78 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79 /// input.for_each(|time, data| {
80 /// output.session(&time).give_container(data);
81 /// notificator.notify_at(time.retain());
82 /// });
83 /// notificator.for_each(|time, _cnt, _not| {
84 /// println!("notified at {:?}", time);
85 /// });
86 /// });
87 /// });
88 /// ```
89 fn unary_notify<CB: ContainerBuilder,
90 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
91 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
92 &mut Notificator<G::Timestamp>)+'static,
93 P: ParallelizationContract<G::Timestamp, C1>>
94 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96 /// Creates a new dataflow operator that partitions its input stream by a parallelization
97 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98 /// `logic` can read from the input stream, and write to the output stream.
99 ///
100 /// # Examples
101 /// ```
102 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103 /// use timely::dataflow::operators::generic::operator::Operator;
104 /// use timely::dataflow::channels::pact::Pipeline;
105 /// use timely::dataflow::Scope;
106 ///
107 /// timely::example(|scope| {
108 /// (0u64..10).to_stream(scope)
109 /// .unary(Pipeline, "example", |default_cap, _info| {
110 /// let mut cap = Some(default_cap.delayed(&12));
111 /// move |input, output| {
112 /// if let Some(ref c) = cap.take() {
113 /// output.session(&c).give(100);
114 /// }
115 /// while let Some((time, data)) = input.next() {
116 /// output.session(&time).give_container(data);
117 /// }
118 /// }
119 /// });
120 /// });
121 /// ```
122 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123 where
124 CB: ContainerBuilder,
125 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
127 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
128 P: ParallelizationContract<G::Timestamp, C1>;
129
130 /// Creates a new dataflow operator that partitions its input streams by a parallelization
131 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133 ///
134 /// # Examples
135 /// ```
136 /// use std::collections::HashMap;
137 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138 /// use timely::dataflow::operators::generic::operator::Operator;
139 /// use timely::dataflow::channels::pact::Pipeline;
140 ///
141 /// timely::execute(timely::Config::thread(), |worker| {
142 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143 /// let (in1_handle, in1) = scope.new_input();
144 /// let (in2_handle, in2) = scope.new_input();
145 /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146 /// let mut notificator = FrontierNotificator::default();
147 /// let mut stash = HashMap::new();
148 /// move |input1, input2, output| {
149 /// while let Some((time, data)) = input1.next() {
150 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
151 /// notificator.notify_at(time.retain());
152 /// }
153 /// while let Some((time, data)) = input2.next() {
154 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
155 /// notificator.notify_at(time.retain());
156 /// }
157 /// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
158 /// if let Some(mut vec) = stash.remove(time.time()) {
159 /// output.session(&time).give_iterator(vec.drain(..));
160 /// }
161 /// });
162 /// }
163 /// })
164 /// .container::<Vec<_>>()
165 /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166 ///
167 /// (in1_handle, in2_handle)
168 /// });
169 ///
170 /// for i in 1..10 {
171 /// in1.send(i - 1);
172 /// in1.advance_to(i);
173 /// in2.send(i - 1);
174 /// in2.advance_to(i);
175 /// }
176 /// }).unwrap();
177 /// ```
178 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179 where
180 C2: Container + Data,
181 CB: ContainerBuilder,
182 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
184 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
185 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
186 P1: ParallelizationContract<G::Timestamp, C1>,
187 P2: ParallelizationContract<G::Timestamp, C2>;
188
189 /// Creates a new dataflow operator that partitions its input streams by a parallelization
190 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192 ///
193 /// # Examples
194 /// ```
195 /// use std::collections::HashMap;
196 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197 /// use timely::dataflow::operators::generic::operator::Operator;
198 /// use timely::dataflow::channels::pact::Pipeline;
199 ///
200 /// timely::execute(timely::Config::thread(), |worker| {
201 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202 /// let (in1_handle, in1) = scope.new_input();
203 /// let (in2_handle, in2) = scope.new_input();
204 ///
205 /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206 /// input1.for_each(|time, data| {
207 /// output.session(&time).give_container(data);
208 /// notificator.notify_at(time.retain());
209 /// });
210 /// input2.for_each(|time, data| {
211 /// output.session(&time).give_container(data);
212 /// notificator.notify_at(time.retain());
213 /// });
214 /// notificator.for_each(|time, _cnt, _not| {
215 /// println!("notified at {:?}", time);
216 /// });
217 /// });
218 ///
219 /// (in1_handle, in2_handle)
220 /// });
221 ///
222 /// for i in 1..10 {
223 /// in1.send(i - 1);
224 /// in1.advance_to(i);
225 /// in2.send(i - 1);
226 /// in2.advance_to(i);
227 /// }
228 /// }).unwrap();
229 /// ```
230 fn binary_notify<C2: Container + Data,
231 CB: ContainerBuilder,
232 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
233 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
234 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
235 &mut Notificator<G::Timestamp>)+'static,
236 P1: ParallelizationContract<G::Timestamp, C1>,
237 P2: ParallelizationContract<G::Timestamp, C2>>
238 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240 /// Creates a new dataflow operator that partitions its input streams by a parallelization
241 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243 ///
244 /// # Examples
245 /// ```
246 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247 /// use timely::dataflow::operators::generic::operator::Operator;
248 /// use timely::dataflow::channels::pact::Pipeline;
249 /// use timely::dataflow::Scope;
250 ///
251 /// timely::example(|scope| {
252 /// let stream2 = (0u64..10).to_stream(scope);
253 /// (0u64..10).to_stream(scope)
254 /// .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255 /// let mut cap = Some(default_cap.delayed(&12));
256 /// move |input1, input2, output| {
257 /// if let Some(ref c) = cap.take() {
258 /// output.session(&c).give(100);
259 /// }
260 /// while let Some((time, data)) = input1.next() {
261 /// output.session(&time).give_container(data);
262 /// }
263 /// while let Some((time, data)) = input2.next() {
264 /// output.session(&time).give_container(data);
265 /// }
266 /// }
267 /// }).inspect(|x| println!("{:?}", x));
268 /// });
269 /// ```
270 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
271 where
272 C2: Container + Data,
273 CB: ContainerBuilder,
274 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
275 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
276 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
277 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
278 P1: ParallelizationContract<G::Timestamp, C1>,
279 P2: ParallelizationContract<G::Timestamp, C2>;
280
281 /// Creates a new dataflow operator that partitions its input stream by a parallelization
282 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
283 /// and inspect the frontier at the input.
284 ///
285 /// # Examples
286 /// ```
287 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
288 /// use timely::dataflow::operators::generic::operator::Operator;
289 /// use timely::dataflow::channels::pact::Pipeline;
290 /// use timely::dataflow::Scope;
291 ///
292 /// timely::example(|scope| {
293 /// (0u64..10)
294 /// .to_stream(scope)
295 /// .sink(Pipeline, "example", |input| {
296 /// while let Some((time, data)) = input.next() {
297 /// for datum in data.iter() {
298 /// println!("{:?}:\t{:?}", time, datum);
299 /// }
300 /// }
301 /// });
302 /// });
303 /// ```
304 fn sink<L, P>(&self, pact: P, name: &str, logic: L)
305 where
306 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
307 P: ParallelizationContract<G::Timestamp, C1>;
308}
309
310impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
311
312 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
313 where
314 CB: ContainerBuilder,
315 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
316 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
317 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
318 P: ParallelizationContract<G::Timestamp, C1> {
319
320 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
321 let operator_info = builder.operator_info();
322
323 let mut input = builder.new_input(self, pact);
324 let (mut output, stream) = builder.new_output();
325
326 builder.build(move |mut capabilities| {
327 // `capabilities` should be a single-element vector.
328 let capability = capabilities.pop().unwrap();
329 let mut logic = constructor(capability, operator_info);
330 move |frontiers| {
331 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
332 let mut output_handle = output.activate();
333 logic(&mut input_handle, &mut output_handle);
334 }
335 });
336
337 stream
338 }
339
340 fn unary_notify<CB: ContainerBuilder,
341 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
342 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
343 &mut Notificator<G::Timestamp>)+'static,
344 P: ParallelizationContract<G::Timestamp, C1>>
345 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
346
347 self.unary_frontier(pact, name, move |capability, _info| {
348 let mut notificator = FrontierNotificator::default();
349 for time in init {
350 notificator.notify_at(capability.delayed(&time));
351 }
352
353 let logging = self.scope().logging();
354 move |input, output| {
355 let frontier = &[input.frontier()];
356 let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
357 logic(input.handle, output, notificator);
358 }
359 })
360 }
361
362 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
363 where
364 CB: ContainerBuilder,
365 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
366 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
367 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
368 P: ParallelizationContract<G::Timestamp, C1> {
369
370 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
371 let operator_info = builder.operator_info();
372
373 let mut input = builder.new_input(self, pact);
374 let (mut output, stream) = builder.new_output();
375 builder.set_notify(false);
376
377 builder.build(move |mut capabilities| {
378 // `capabilities` should be a single-element vector.
379 let capability = capabilities.pop().unwrap();
380 let mut logic = constructor(capability, operator_info);
381 move |_frontiers| {
382 let mut output_handle = output.activate();
383 logic(&mut input, &mut output_handle);
384 }
385 });
386
387 stream
388 }
389
390 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
391 where
392 C2: Container + Data,
393 CB: ContainerBuilder,
394 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
395 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
396 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
397 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
398 P1: ParallelizationContract<G::Timestamp, C1>,
399 P2: ParallelizationContract<G::Timestamp, C2> {
400
401 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
402 let operator_info = builder.operator_info();
403
404 let mut input1 = builder.new_input(self, pact1);
405 let mut input2 = builder.new_input(other, pact2);
406 let (mut output, stream) = builder.new_output();
407
408 builder.build(move |mut capabilities| {
409 // `capabilities` should be a single-element vector.
410 let capability = capabilities.pop().unwrap();
411 let mut logic = constructor(capability, operator_info);
412 move |frontiers| {
413 let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
414 let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
415 let mut output_handle = output.activate();
416 logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
417 }
418 });
419
420 stream
421 }
422
423 fn binary_notify<C2: Container + Data,
424 CB: ContainerBuilder,
425 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
426 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
427 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
428 &mut Notificator<G::Timestamp>)+'static,
429 P1: ParallelizationContract<G::Timestamp, C1>,
430 P2: ParallelizationContract<G::Timestamp, C2>>
431 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
432
433 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
434 let mut notificator = FrontierNotificator::default();
435 for time in init {
436 notificator.notify_at(capability.delayed(&time));
437 }
438
439 let logging = self.scope().logging();
440 move |input1, input2, output| {
441 let frontiers = &[input1.frontier(), input2.frontier()];
442 let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
443 logic(input1.handle, input2.handle, output, notificator);
444 }
445 })
446
447 }
448
449
450 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
451 where
452 C2: Container + Data,
453 CB: ContainerBuilder,
454 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
455 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
456 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
457 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
458 P1: ParallelizationContract<G::Timestamp, C1>,
459 P2: ParallelizationContract<G::Timestamp, C2> {
460
461 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
462 let operator_info = builder.operator_info();
463
464 let mut input1 = builder.new_input(self, pact1);
465 let mut input2 = builder.new_input(other, pact2);
466 let (mut output, stream) = builder.new_output();
467 builder.set_notify(false);
468
469 builder.build(move |mut capabilities| {
470 // `capabilities` should be a single-element vector.
471 let capability = capabilities.pop().unwrap();
472 let mut logic = constructor(capability, operator_info);
473 move |_frontiers| {
474 let mut output_handle = output.activate();
475 logic(&mut input1, &mut input2, &mut output_handle);
476 }
477 });
478
479 stream
480 }
481
482 fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
483 where
484 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
485 P: ParallelizationContract<G::Timestamp, C1> {
486
487 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
488 let mut input = builder.new_input(self, pact);
489
490 builder.build(|_capabilities| {
491 move |frontiers| {
492 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
493 logic(&mut input_handle);
494 }
495 });
496 }
497}
498
499/// Creates a new data stream source for a scope.
500///
501/// The source is defined by a name, and a constructor which takes a default capability to
502/// a method that can be repeatedly called on a output handle. The method is then repeatedly
503/// invoked, and is expected to eventually send data and downgrade and release capabilities.
504///
505/// # Examples
506/// ```
507/// use timely::scheduling::Scheduler;
508/// use timely::dataflow::operators::Inspect;
509/// use timely::dataflow::operators::generic::operator::source;
510/// use timely::dataflow::Scope;
511///
512/// timely::example(|scope| {
513///
514/// source(scope, "Source", |capability, info| {
515///
516/// let activator = scope.activator_for(info.address);
517///
518/// let mut cap = Some(capability);
519/// move |output| {
520///
521/// let mut done = false;
522/// if let Some(cap) = cap.as_mut() {
523/// // get some data and send it.
524/// let time = cap.time().clone();
525/// output.session(&cap)
526/// .give(*cap.time());
527///
528/// // downgrade capability.
529/// cap.downgrade(&(time + 1));
530/// done = time > 20;
531/// }
532///
533/// if done { cap = None; }
534/// else { activator.activate(); }
535/// }
536/// })
537/// .container::<Vec<_>>()
538/// .inspect(|x| println!("number: {:?}", x));
539/// });
540/// ```
541pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
542where
543 CB: ContainerBuilder,
544 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
545 L: FnMut(&mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static {
546
547 let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
548 let operator_info = builder.operator_info();
549
550 let (mut output, stream) = builder.new_output();
551 builder.set_notify(false);
552
553 builder.build(move |mut capabilities| {
554 // `capabilities` should be a single-element vector.
555 let capability = capabilities.pop().unwrap();
556 let mut logic = constructor(capability, operator_info);
557 move |_frontier| {
558 logic(&mut output.activate());
559 }
560 });
561
562 stream
563}
564
565/// Constructs an empty stream.
566///
567/// This method is useful in patterns where an input is required, but there is no
568/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
569/// which are just silly.
570///
571/// # Examples
572/// ```
573/// use timely::dataflow::operators::Inspect;
574/// use timely::dataflow::operators::generic::operator::empty;
575/// use timely::dataflow::Scope;
576///
577/// timely::example(|scope| {
578///
579///
580/// empty::<_, Vec<_>>(scope) // type required in this example
581/// .inspect(|()| panic!("never called"));
582///
583/// });
584/// ```
585pub fn empty<G: Scope, C: Container + Data>(scope: &G) -> StreamCore<G, C> {
586 source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
587 // drop capability, do nothing
588 })
589}