timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::Tee;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, Data};
16use crate::container::{ContainerBuilder, CapacityContainerBuilder};
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1: Container> {
20    /// Creates a new dataflow operator that partitions its input stream by a parallelization
21    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23    ///
24    /// # Examples
25    /// ```
26    /// use std::collections::HashMap;
27    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28    /// use timely::dataflow::operators::generic::Operator;
29    /// use timely::dataflow::channels::pact::Pipeline;
30    ///
31    /// timely::example(|scope| {
32    ///     (0u64..10).to_stream(scope)
33    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
34    ///             let mut cap = Some(default_cap.delayed(&12));
35    ///             let mut notificator = FrontierNotificator::default();
36    ///             let mut stash = HashMap::new();
37    ///             move |input, output| {
38    ///                 if let Some(ref c) = cap.take() {
39    ///                     output.session(&c).give(12);
40    ///                 }
41    ///                 while let Some((time, data)) = input.next() {
42    ///                     stash.entry(time.time().clone())
43    ///                          .or_insert(Vec::new())
44    ///                          .extend(data.drain(..));
45    ///                 }
46    ///                 notificator.for_each(&[input.frontier()], |time, _not| {
47    ///                     if let Some(mut vec) = stash.remove(time.time()) {
48    ///                         output.session(&time).give_iterator(vec.drain(..));
49    ///                     }
50    ///                 });
51    ///             }
52    ///         })
53    ///         .container::<Vec<_>>();
54    /// });
55    /// ```
56    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57    where
58        CB: ContainerBuilder,
59        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
61                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
62        P: ParallelizationContract<G::Timestamp, C1>;
63
64    /// Creates a new dataflow operator that partitions its input stream by a parallelization
65    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67    ///
68    /// # Examples
69    /// ```
70    /// use std::collections::HashMap;
71    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72    /// use timely::dataflow::operators::generic::Operator;
73    /// use timely::dataflow::channels::pact::Pipeline;
74    ///
75    /// timely::example(|scope| {
76    ///     (0u64..10)
77    ///         .to_stream(scope)
78    ///         .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79    ///             input.for_each(|time, data| {
80    ///                 output.session(&time).give_container(data);
81    ///                 notificator.notify_at(time.retain());
82    ///             });
83    ///             notificator.for_each(|time, _cnt, _not| {
84    ///                 println!("notified at {:?}", time);
85    ///             });
86    ///         });
87    /// });
88    /// ```
89    fn unary_notify<CB: ContainerBuilder,
90            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
91                     &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
92                     &mut Notificator<G::Timestamp>)+'static,
93             P: ParallelizationContract<G::Timestamp, C1>>
94             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96    /// Creates a new dataflow operator that partitions its input stream by a parallelization
97    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98    /// `logic` can read from the input stream, and write to the output stream.
99    ///
100    /// # Examples
101    /// ```
102    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103    /// use timely::dataflow::operators::generic::operator::Operator;
104    /// use timely::dataflow::channels::pact::Pipeline;
105    /// use timely::dataflow::Scope;
106    ///
107    /// timely::example(|scope| {
108    ///     (0u64..10).to_stream(scope)
109    ///         .unary(Pipeline, "example", |default_cap, _info| {
110    ///             let mut cap = Some(default_cap.delayed(&12));
111    ///             move |input, output| {
112    ///                 if let Some(ref c) = cap.take() {
113    ///                     output.session(&c).give(100);
114    ///                 }
115    ///                 while let Some((time, data)) = input.next() {
116    ///                     output.session(&time).give_container(data);
117    ///                 }
118    ///             }
119    ///         });
120    /// });
121    /// ```
122    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123    where
124        CB: ContainerBuilder,
125        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
127                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
128        P: ParallelizationContract<G::Timestamp, C1>;
129
130    /// Creates a new dataflow operator that partitions its input streams by a parallelization
131    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133    ///
134    /// # Examples
135    /// ```
136    /// use std::collections::HashMap;
137    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138    /// use timely::dataflow::operators::generic::operator::Operator;
139    /// use timely::dataflow::channels::pact::Pipeline;
140    ///
141    /// timely::execute(timely::Config::thread(), |worker| {
142    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143    ///        let (in1_handle, in1) = scope.new_input();
144    ///        let (in2_handle, in2) = scope.new_input();
145    ///        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146    ///            let mut notificator = FrontierNotificator::default();
147    ///            let mut stash = HashMap::new();
148    ///            move |input1, input2, output| {
149    ///                while let Some((time, data)) = input1.next() {
150    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
151    ///                    notificator.notify_at(time.retain());
152    ///                }
153    ///                while let Some((time, data)) = input2.next() {
154    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
155    ///                    notificator.notify_at(time.retain());
156    ///                }
157    ///                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
158    ///                    if let Some(mut vec) = stash.remove(time.time()) {
159    ///                        output.session(&time).give_iterator(vec.drain(..));
160    ///                    }
161    ///                });
162    ///            }
163    ///        })
164    ///        .container::<Vec<_>>()
165    ///        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166    ///
167    ///        (in1_handle, in2_handle)
168    ///    });
169    ///
170    ///    for i in 1..10 {
171    ///        in1.send(i - 1);
172    ///        in1.advance_to(i);
173    ///        in2.send(i - 1);
174    ///        in2.advance_to(i);
175    ///    }
176    /// }).unwrap();
177    /// ```
178    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179    where
180        C2: Container + Data,
181        CB: ContainerBuilder,
182        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
184                 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
185                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
186        P1: ParallelizationContract<G::Timestamp, C1>,
187        P2: ParallelizationContract<G::Timestamp, C2>;
188
189    /// Creates a new dataflow operator that partitions its input streams by a parallelization
190    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192    ///
193    /// # Examples
194    /// ```
195    /// use std::collections::HashMap;
196    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197    /// use timely::dataflow::operators::generic::operator::Operator;
198    /// use timely::dataflow::channels::pact::Pipeline;
199    ///
200    /// timely::execute(timely::Config::thread(), |worker| {
201    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202    ///        let (in1_handle, in1) = scope.new_input();
203    ///        let (in2_handle, in2) = scope.new_input();
204    ///
205    ///        in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206    ///            input1.for_each(|time, data| {
207    ///                output.session(&time).give_container(data);
208    ///                notificator.notify_at(time.retain());
209    ///            });
210    ///            input2.for_each(|time, data| {
211    ///                output.session(&time).give_container(data);
212    ///                notificator.notify_at(time.retain());
213    ///            });
214    ///            notificator.for_each(|time, _cnt, _not| {
215    ///                println!("notified at {:?}", time);
216    ///            });
217    ///        });
218    ///
219    ///        (in1_handle, in2_handle)
220    ///    });
221    ///
222    ///    for i in 1..10 {
223    ///        in1.send(i - 1);
224    ///        in1.advance_to(i);
225    ///        in2.send(i - 1);
226    ///        in2.advance_to(i);
227    ///    }
228    /// }).unwrap();
229    /// ```
230    fn binary_notify<C2: Container + Data,
231              CB: ContainerBuilder,
232              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
233                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
234                       &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
235                       &mut Notificator<G::Timestamp>)+'static,
236              P1: ParallelizationContract<G::Timestamp, C1>,
237              P2: ParallelizationContract<G::Timestamp, C2>>
238            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240    /// Creates a new dataflow operator that partitions its input streams by a parallelization
241    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243    ///
244    /// # Examples
245    /// ```
246    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247    /// use timely::dataflow::operators::generic::operator::Operator;
248    /// use timely::dataflow::channels::pact::Pipeline;
249    /// use timely::dataflow::Scope;
250    ///
251    /// timely::example(|scope| {
252    ///     let stream2 = (0u64..10).to_stream(scope);
253    ///     (0u64..10).to_stream(scope)
254    ///         .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255    ///             let mut cap = Some(default_cap.delayed(&12));
256    ///             move |input1, input2, output| {
257    ///                 if let Some(ref c) = cap.take() {
258    ///                     output.session(&c).give(100);
259    ///                 }
260    ///                 while let Some((time, data)) = input1.next() {
261    ///                     output.session(&time).give_container(data);
262    ///                 }
263    ///                 while let Some((time, data)) = input2.next() {
264    ///                     output.session(&time).give_container(data);
265    ///                 }
266    ///             }
267    ///         }).inspect(|x| println!("{:?}", x));
268    /// });
269    /// ```
270    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
271    where
272        C2: Container + Data,
273        CB: ContainerBuilder,
274        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
275        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
276                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
277                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
278        P1: ParallelizationContract<G::Timestamp, C1>,
279        P2: ParallelizationContract<G::Timestamp, C2>;
280
281    /// Creates a new dataflow operator that partitions its input stream by a parallelization
282    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
283    /// and inspect the frontier at the input.
284    ///
285    /// # Examples
286    /// ```
287    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
288    /// use timely::dataflow::operators::generic::operator::Operator;
289    /// use timely::dataflow::channels::pact::Pipeline;
290    /// use timely::dataflow::Scope;
291    ///
292    /// timely::example(|scope| {
293    ///     (0u64..10)
294    ///         .to_stream(scope)
295    ///         .sink(Pipeline, "example", |input| {
296    ///             while let Some((time, data)) = input.next() {
297    ///                 for datum in data.iter() {
298    ///                     println!("{:?}:\t{:?}", time, datum);
299    ///                 }
300    ///             }
301    ///         });
302    /// });
303    /// ```
304    fn sink<L, P>(&self, pact: P, name: &str, logic: L)
305    where
306        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
307        P: ParallelizationContract<G::Timestamp, C1>;
308}
309
310impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
311
312    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
313    where
314        CB: ContainerBuilder,
315        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
316        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
317                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
318        P: ParallelizationContract<G::Timestamp, C1> {
319
320        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
321        let operator_info = builder.operator_info();
322
323        let mut input = builder.new_input(self, pact);
324        let (mut output, stream) = builder.new_output();
325
326        builder.build(move |mut capabilities| {
327            // `capabilities` should be a single-element vector.
328            let capability = capabilities.pop().unwrap();
329            let mut logic = constructor(capability, operator_info);
330            move |frontiers| {
331                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
332                let mut output_handle = output.activate();
333                logic(&mut input_handle, &mut output_handle);
334            }
335        });
336
337        stream
338    }
339
340    fn unary_notify<CB: ContainerBuilder,
341            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
342                     &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
343                     &mut Notificator<G::Timestamp>)+'static,
344             P: ParallelizationContract<G::Timestamp, C1>>
345             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
346
347        self.unary_frontier(pact, name, move |capability, _info| {
348            let mut notificator = FrontierNotificator::default();
349            for time in init {
350                notificator.notify_at(capability.delayed(&time));
351            }
352
353            let logging = self.scope().logging();
354            move |input, output| {
355                let frontier = &[input.frontier()];
356                let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
357                logic(input.handle, output, notificator);
358            }
359        })
360    }
361
362    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
363    where
364        CB: ContainerBuilder,
365        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
366        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
367                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
368        P: ParallelizationContract<G::Timestamp, C1> {
369
370        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
371        let operator_info = builder.operator_info();
372
373        let mut input = builder.new_input(self, pact);
374        let (mut output, stream) = builder.new_output();
375        builder.set_notify(false);
376
377        builder.build(move |mut capabilities| {
378            // `capabilities` should be a single-element vector.
379            let capability = capabilities.pop().unwrap();
380            let mut logic = constructor(capability, operator_info);
381            move |_frontiers| {
382                let mut output_handle = output.activate();
383                logic(&mut input, &mut output_handle);
384            }
385        });
386
387        stream
388    }
389
390    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
391    where
392        C2: Container + Data,
393        CB: ContainerBuilder,
394        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
395        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
396                 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
397                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
398        P1: ParallelizationContract<G::Timestamp, C1>,
399        P2: ParallelizationContract<G::Timestamp, C2> {
400
401        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
402        let operator_info = builder.operator_info();
403
404        let mut input1 = builder.new_input(self, pact1);
405        let mut input2 = builder.new_input(other, pact2);
406        let (mut output, stream) = builder.new_output();
407
408        builder.build(move |mut capabilities| {
409            // `capabilities` should be a single-element vector.
410            let capability = capabilities.pop().unwrap();
411            let mut logic = constructor(capability, operator_info);
412            move |frontiers| {
413                let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
414                let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
415                let mut output_handle = output.activate();
416                logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
417            }
418        });
419
420        stream
421    }
422
423    fn binary_notify<C2: Container + Data,
424              CB: ContainerBuilder,
425              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
426                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
427                       &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
428                       &mut Notificator<G::Timestamp>)+'static,
429              P1: ParallelizationContract<G::Timestamp, C1>,
430              P2: ParallelizationContract<G::Timestamp, C2>>
431            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
432
433        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
434            let mut notificator = FrontierNotificator::default();
435            for time in init {
436                notificator.notify_at(capability.delayed(&time));
437            }
438
439            let logging = self.scope().logging();
440            move |input1, input2, output| {
441                let frontiers = &[input1.frontier(), input2.frontier()];
442                let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
443                logic(input1.handle, input2.handle, output, notificator);
444            }
445        })
446
447    }
448
449
450    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
451    where
452        C2: Container + Data,
453        CB: ContainerBuilder,
454        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
455        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
456                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
457                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
458        P1: ParallelizationContract<G::Timestamp, C1>,
459        P2: ParallelizationContract<G::Timestamp, C2> {
460
461        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
462        let operator_info = builder.operator_info();
463
464        let mut input1 = builder.new_input(self, pact1);
465        let mut input2 = builder.new_input(other, pact2);
466        let (mut output, stream) = builder.new_output();
467        builder.set_notify(false);
468
469        builder.build(move |mut capabilities| {
470            // `capabilities` should be a single-element vector.
471            let capability = capabilities.pop().unwrap();
472            let mut logic = constructor(capability, operator_info);
473            move |_frontiers| {
474                let mut output_handle = output.activate();
475                logic(&mut input1, &mut input2, &mut output_handle);
476            }
477        });
478
479        stream
480    }
481
482    fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
483    where
484        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
485        P: ParallelizationContract<G::Timestamp, C1> {
486
487        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
488        let mut input = builder.new_input(self, pact);
489
490        builder.build(|_capabilities| {
491            move |frontiers| {
492                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
493                logic(&mut input_handle);
494            }
495        });
496    }
497}
498
499/// Creates a new data stream source for a scope.
500///
501/// The source is defined by a name, and a constructor which takes a default capability to
502/// a method that can be repeatedly called on a output handle. The method is then repeatedly
503/// invoked, and is expected to eventually send data and downgrade and release capabilities.
504///
505/// # Examples
506/// ```
507/// use timely::scheduling::Scheduler;
508/// use timely::dataflow::operators::Inspect;
509/// use timely::dataflow::operators::generic::operator::source;
510/// use timely::dataflow::Scope;
511///
512/// timely::example(|scope| {
513///
514///     source(scope, "Source", |capability, info| {
515///
516///         let activator = scope.activator_for(info.address);
517///
518///         let mut cap = Some(capability);
519///         move |output| {
520///
521///             let mut done = false;
522///             if let Some(cap) = cap.as_mut() {
523///                 // get some data and send it.
524///                 let time = cap.time().clone();
525///                 output.session(&cap)
526///                       .give(*cap.time());
527///
528///                 // downgrade capability.
529///                 cap.downgrade(&(time + 1));
530///                 done = time > 20;
531///             }
532///
533///             if done { cap = None; }
534///             else    { activator.activate(); }
535///         }
536///     })
537///     .container::<Vec<_>>()
538///     .inspect(|x| println!("number: {:?}", x));
539/// });
540/// ```
541pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
542where
543    CB: ContainerBuilder,
544    B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
545    L: FnMut(&mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static {
546
547    let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
548    let operator_info = builder.operator_info();
549
550    let (mut output, stream) = builder.new_output();
551    builder.set_notify(false);
552
553    builder.build(move |mut capabilities| {
554        // `capabilities` should be a single-element vector.
555        let capability = capabilities.pop().unwrap();
556        let mut logic = constructor(capability, operator_info);
557        move |_frontier| {
558            logic(&mut output.activate());
559        }
560    });
561
562    stream
563}
564
565/// Constructs an empty stream.
566///
567/// This method is useful in patterns where an input is required, but there is no
568/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
569/// which are just silly.
570///
571/// # Examples
572/// ```
573/// use timely::dataflow::operators::Inspect;
574/// use timely::dataflow::operators::generic::operator::empty;
575/// use timely::dataflow::Scope;
576///
577/// timely::example(|scope| {
578///
579///
580///     empty::<_, Vec<_>>(scope)     // type required in this example
581///         .inspect(|()| panic!("never called"));
582///
583/// });
584/// ```
585pub fn empty<G: Scope, C: Container + Data>(scope: &G) -> StreamCore<G, C> {
586    source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
587        // drop capability, do nothing
588    })
589}