timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::Tee;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, Data};
16use crate::container::{ContainerBuilder, CapacityContainerBuilder};
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1: Container> {
20    /// Creates a new dataflow operator that partitions its input stream by a parallelization
21    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23    ///
24    /// # Examples
25    /// ```
26    /// use std::collections::HashMap;
27    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28    /// use timely::dataflow::operators::generic::Operator;
29    /// use timely::dataflow::channels::pact::Pipeline;
30    ///
31    /// timely::example(|scope| {
32    ///     (0u64..10).to_stream(scope)
33    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
34    ///             let mut cap = Some(default_cap.delayed(&12));
35    ///             let mut notificator = FrontierNotificator::default();
36    ///             let mut stash = HashMap::new();
37    ///             move |input, output| {
38    ///                 if let Some(ref c) = cap.take() {
39    ///                     output.session(&c).give(12);
40    ///                 }
41    ///                 while let Some((time, data)) = input.next() {
42    ///                     stash.entry(time.time().clone())
43    ///                          .or_insert(Vec::new())
44    ///                          .extend(data.drain(..));
45    ///                 }
46    ///                 notificator.for_each(&[input.frontier()], |time, _not| {
47    ///                     if let Some(mut vec) = stash.remove(time.time()) {
48    ///                         output.session(&time).give_iterator(vec.drain(..));
49    ///                     }
50    ///                 });
51    ///             }
52    ///         })
53    ///         .container::<Vec<_>>();
54    /// });
55    /// ```
56    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57    where
58        CB: ContainerBuilder,
59        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
61                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
62        P: ParallelizationContract<G::Timestamp, C1>;
63
64    /// Creates a new dataflow operator that partitions its input stream by a parallelization
65    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67    ///
68    /// # Examples
69    /// ```
70    /// use std::collections::HashMap;
71    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72    /// use timely::dataflow::operators::generic::Operator;
73    /// use timely::dataflow::channels::pact::Pipeline;
74    ///
75    /// timely::example(|scope| {
76    ///     (0u64..10)
77    ///         .to_stream(scope)
78    ///         .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79    ///             input.for_each(|time, data| {
80    ///                 output.session(&time).give_container(data);
81    ///                 notificator.notify_at(time.retain());
82    ///             });
83    ///             notificator.for_each(|time, _cnt, _not| {
84    ///                 println!("notified at {:?}", time);
85    ///             });
86    ///         });
87    /// });
88    /// ```
89    fn unary_notify<CB: ContainerBuilder,
90            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
91                     &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
92                     &mut Notificator<G::Timestamp>)+'static,
93             P: ParallelizationContract<G::Timestamp, C1>>
94             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96    /// Creates a new dataflow operator that partitions its input stream by a parallelization
97    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98    /// `logic` can read from the input stream, and write to the output stream.
99    ///
100    /// # Examples
101    /// ```
102    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103    /// use timely::dataflow::operators::generic::operator::Operator;
104    /// use timely::dataflow::channels::pact::Pipeline;
105    /// use timely::dataflow::Scope;
106    ///
107    /// timely::example(|scope| {
108    ///     (0u64..10).to_stream(scope)
109    ///         .unary(Pipeline, "example", |default_cap, _info| {
110    ///             let mut cap = Some(default_cap.delayed(&12));
111    ///             move |input, output| {
112    ///                 if let Some(ref c) = cap.take() {
113    ///                     output.session(&c).give(100);
114    ///                 }
115    ///                 while let Some((time, data)) = input.next() {
116    ///                     output.session(&time).give_container(data);
117    ///                 }
118    ///             }
119    ///         });
120    /// });
121    /// ```
122    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123    where
124        CB: ContainerBuilder,
125        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
127                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
128        P: ParallelizationContract<G::Timestamp, C1>;
129
130    /// Creates a new dataflow operator that partitions its input streams by a parallelization
131    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133    ///
134    /// # Examples
135    /// ```
136    /// use std::collections::HashMap;
137    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138    /// use timely::dataflow::operators::generic::operator::Operator;
139    /// use timely::dataflow::channels::pact::Pipeline;
140    ///
141    /// timely::execute(timely::Config::thread(), |worker| {
142    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143    ///        let (in1_handle, in1) = scope.new_input();
144    ///        let (in2_handle, in2) = scope.new_input();
145    ///        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146    ///            let mut notificator = FrontierNotificator::default();
147    ///            let mut stash = HashMap::new();
148    ///            move |input1, input2, output| {
149    ///                while let Some((time, data)) = input1.next() {
150    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
151    ///                    notificator.notify_at(time.retain());
152    ///                }
153    ///                while let Some((time, data)) = input2.next() {
154    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
155    ///                    notificator.notify_at(time.retain());
156    ///                }
157    ///                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
158    ///                    if let Some(mut vec) = stash.remove(time.time()) {
159    ///                        output.session(&time).give_iterator(vec.drain(..));
160    ///                    }
161    ///                });
162    ///            }
163    ///        })
164    ///        .container::<Vec<_>>()
165    ///        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166    ///
167    ///        (in1_handle, in2_handle)
168    ///    });
169    ///
170    ///    for i in 1..10 {
171    ///        in1.send(i - 1);
172    ///        in1.advance_to(i);
173    ///        in2.send(i - 1);
174    ///        in2.advance_to(i);
175    ///    }
176    /// }).unwrap();
177    /// ```
178    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179    where
180        C2: Container + Data,
181        CB: ContainerBuilder,
182        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
184                 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
185                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
186        P1: ParallelizationContract<G::Timestamp, C1>,
187        P2: ParallelizationContract<G::Timestamp, C2>;
188
189    /// Creates a new dataflow operator that partitions its input streams by a parallelization
190    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192    ///
193    /// # Examples
194    /// ```
195    /// use std::collections::HashMap;
196    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197    /// use timely::dataflow::operators::generic::operator::Operator;
198    /// use timely::dataflow::channels::pact::Pipeline;
199    ///
200    /// timely::execute(timely::Config::thread(), |worker| {
201    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202    ///        let (in1_handle, in1) = scope.new_input();
203    ///        let (in2_handle, in2) = scope.new_input();
204    ///
205    ///        in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206    ///            input1.for_each(|time, data| {
207    ///                output.session(&time).give_container(data);
208    ///                notificator.notify_at(time.retain());
209    ///            });
210    ///            input2.for_each(|time, data| {
211    ///                output.session(&time).give_container(data);
212    ///                notificator.notify_at(time.retain());
213    ///            });
214    ///            notificator.for_each(|time, _cnt, _not| {
215    ///                println!("notified at {:?}", time);
216    ///            });
217    ///        });
218    ///
219    ///        (in1_handle, in2_handle)
220    ///    });
221    ///
222    ///    for i in 1..10 {
223    ///        in1.send(i - 1);
224    ///        in1.advance_to(i);
225    ///        in2.send(i - 1);
226    ///        in2.advance_to(i);
227    ///    }
228    /// }).unwrap();
229    /// ```
230    fn binary_notify<C2: Container + Data,
231              CB: ContainerBuilder,
232              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
233                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
234                       &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
235                       &mut Notificator<G::Timestamp>)+'static,
236              P1: ParallelizationContract<G::Timestamp, C1>,
237              P2: ParallelizationContract<G::Timestamp, C2>>
238            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240    /// Creates a new dataflow operator that partitions its input streams by a parallelization
241    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243    ///
244    /// # Examples
245    /// ```
246    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247    /// use timely::dataflow::operators::generic::operator::Operator;
248    /// use timely::dataflow::channels::pact::Pipeline;
249    /// use timely::dataflow::Scope;
250    ///
251    /// timely::example(|scope| {
252    ///     let stream2 = (0u64..10).to_stream(scope);
253    ///     (0u64..10).to_stream(scope)
254    ///         .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255    ///             let mut cap = Some(default_cap.delayed(&12));
256    ///             move |input1, input2, output| {
257    ///                 if let Some(ref c) = cap.take() {
258    ///                     output.session(&c).give(100);
259    ///                 }
260    ///                 while let Some((time, data)) = input1.next() {
261    ///                     output.session(&time).give_container(data);
262    ///                 }
263    ///                 while let Some((time, data)) = input2.next() {
264    ///                     output.session(&time).give_container(data);
265    ///                 }
266    ///             }
267    ///         }).inspect(|x| println!("{:?}", x));
268    /// });
269    /// ```
270    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
271    where
272        C2: Container + Data,
273        CB: ContainerBuilder,
274        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
275        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
276                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
277                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
278        P1: ParallelizationContract<G::Timestamp, C1>,
279        P2: ParallelizationContract<G::Timestamp, C2>;
280
281    /// Creates a new dataflow operator that partitions its input stream by a parallelization
282    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
283    /// and inspect the frontier at the input.
284    ///
285    /// # Examples
286    /// ```
287    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
288    /// use timely::dataflow::operators::generic::operator::Operator;
289    /// use timely::dataflow::channels::pact::Pipeline;
290    /// use timely::dataflow::Scope;
291    ///
292    /// timely::example(|scope| {
293    ///     (0u64..10)
294    ///         .to_stream(scope)
295    ///         .sink(Pipeline, "example", |input| {
296    ///             while let Some((time, data)) = input.next() {
297    ///                 for datum in data.iter() {
298    ///                     println!("{:?}:\t{:?}", time, datum);
299    ///                 }
300    ///             }
301    ///         });
302    /// });
303    /// ```
304    fn sink<L, P>(&self, pact: P, name: &str, logic: L)
305    where
306        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
307        P: ParallelizationContract<G::Timestamp, C1>;
308}
309
310impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
311
312    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
313    where
314        CB: ContainerBuilder,
315        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
316        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
317                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
318        P: ParallelizationContract<G::Timestamp, C1> {
319
320        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
321        let operator_info = builder.operator_info();
322
323        let mut input = builder.new_input(self, pact);
324        let (mut output, stream) = builder.new_output();
325
326        builder.build(move |mut capabilities| {
327            // `capabilities` should be a single-element vector.
328            let capability = capabilities.pop().unwrap();
329            let mut logic = constructor(capability, operator_info);
330            move |frontiers| {
331                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
332                let mut output_handle = output.activate();
333                logic(&mut input_handle, &mut output_handle);
334            }
335        });
336
337        stream
338    }
339
340    fn unary_notify<CB: ContainerBuilder,
341            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
342                     &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
343                     &mut Notificator<G::Timestamp>)+'static,
344             P: ParallelizationContract<G::Timestamp, C1>>
345             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
346
347        self.unary_frontier(pact, name, move |capability, _info| {
348            let mut notificator = FrontierNotificator::default();
349            for time in init {
350                notificator.notify_at(capability.delayed(&time));
351            }
352
353            move |input, output| {
354                let frontier = &[input.frontier()];
355                let notificator = &mut Notificator::new(frontier, &mut notificator);
356                logic(input.handle, output, notificator);
357            }
358        })
359    }
360
361    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
362    where
363        CB: ContainerBuilder,
364        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
365        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
366                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
367        P: ParallelizationContract<G::Timestamp, C1> {
368
369        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
370        let operator_info = builder.operator_info();
371
372        let mut input = builder.new_input(self, pact);
373        let (mut output, stream) = builder.new_output();
374        builder.set_notify(false);
375
376        builder.build(move |mut capabilities| {
377            // `capabilities` should be a single-element vector.
378            let capability = capabilities.pop().unwrap();
379            let mut logic = constructor(capability, operator_info);
380            move |_frontiers| {
381                let mut output_handle = output.activate();
382                logic(&mut input, &mut output_handle);
383            }
384        });
385
386        stream
387    }
388
389    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
390    where
391        C2: Container + Data,
392        CB: ContainerBuilder,
393        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
394        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
395                 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
396                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
397        P1: ParallelizationContract<G::Timestamp, C1>,
398        P2: ParallelizationContract<G::Timestamp, C2> {
399
400        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
401        let operator_info = builder.operator_info();
402
403        let mut input1 = builder.new_input(self, pact1);
404        let mut input2 = builder.new_input(other, pact2);
405        let (mut output, stream) = builder.new_output();
406
407        builder.build(move |mut capabilities| {
408            // `capabilities` should be a single-element vector.
409            let capability = capabilities.pop().unwrap();
410            let mut logic = constructor(capability, operator_info);
411            move |frontiers| {
412                let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
413                let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
414                let mut output_handle = output.activate();
415                logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
416            }
417        });
418
419        stream
420    }
421
422    fn binary_notify<C2: Container + Data,
423              CB: ContainerBuilder,
424              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
425                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
426                       &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
427                       &mut Notificator<G::Timestamp>)+'static,
428              P1: ParallelizationContract<G::Timestamp, C1>,
429              P2: ParallelizationContract<G::Timestamp, C2>>
430            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
431
432        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
433            let mut notificator = FrontierNotificator::default();
434            for time in init {
435                notificator.notify_at(capability.delayed(&time));
436            }
437
438            move |input1, input2, output| {
439                let frontiers = &[input1.frontier(), input2.frontier()];
440                let notificator = &mut Notificator::new(frontiers, &mut notificator);
441                logic(input1.handle, input2.handle, output, notificator);
442            }
443        })
444
445    }
446
447
448    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
449    where
450        C2: Container + Data,
451        CB: ContainerBuilder,
452        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
453        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
454                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
455                 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
456        P1: ParallelizationContract<G::Timestamp, C1>,
457        P2: ParallelizationContract<G::Timestamp, C2> {
458
459        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
460        let operator_info = builder.operator_info();
461
462        let mut input1 = builder.new_input(self, pact1);
463        let mut input2 = builder.new_input(other, pact2);
464        let (mut output, stream) = builder.new_output();
465        builder.set_notify(false);
466
467        builder.build(move |mut capabilities| {
468            // `capabilities` should be a single-element vector.
469            let capability = capabilities.pop().unwrap();
470            let mut logic = constructor(capability, operator_info);
471            move |_frontiers| {
472                let mut output_handle = output.activate();
473                logic(&mut input1, &mut input2, &mut output_handle);
474            }
475        });
476
477        stream
478    }
479
480    fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
481    where
482        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
483        P: ParallelizationContract<G::Timestamp, C1> {
484
485        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
486        let mut input = builder.new_input(self, pact);
487
488        builder.build(|_capabilities| {
489            move |frontiers| {
490                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
491                logic(&mut input_handle);
492            }
493        });
494    }
495}
496
497/// Creates a new data stream source for a scope.
498///
499/// The source is defined by a name, and a constructor which takes a default capability to
500/// a method that can be repeatedly called on a output handle. The method is then repeatedly
501/// invoked, and is expected to eventually send data and downgrade and release capabilities.
502///
503/// # Examples
504/// ```
505/// use timely::scheduling::Scheduler;
506/// use timely::dataflow::operators::Inspect;
507/// use timely::dataflow::operators::generic::operator::source;
508/// use timely::dataflow::Scope;
509///
510/// timely::example(|scope| {
511///
512///     source(scope, "Source", |capability, info| {
513///
514///         let activator = scope.activator_for(info.address);
515///
516///         let mut cap = Some(capability);
517///         move |output| {
518///
519///             let mut done = false;
520///             if let Some(cap) = cap.as_mut() {
521///                 // get some data and send it.
522///                 let time = cap.time().clone();
523///                 output.session(&cap)
524///                       .give(*cap.time());
525///
526///                 // downgrade capability.
527///                 cap.downgrade(&(time + 1));
528///                 done = time > 20;
529///             }
530///
531///             if done { cap = None; }
532///             else    { activator.activate(); }
533///         }
534///     })
535///     .container::<Vec<_>>()
536///     .inspect(|x| println!("number: {:?}", x));
537/// });
538/// ```
539pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
540where
541    CB: ContainerBuilder,
542    B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
543    L: FnMut(&mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static {
544
545    let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
546    let operator_info = builder.operator_info();
547
548    let (mut output, stream) = builder.new_output();
549    builder.set_notify(false);
550
551    builder.build(move |mut capabilities| {
552        // `capabilities` should be a single-element vector.
553        let capability = capabilities.pop().unwrap();
554        let mut logic = constructor(capability, operator_info);
555        move |_frontier| {
556            logic(&mut output.activate());
557        }
558    });
559
560    stream
561}
562
563/// Constructs an empty stream.
564///
565/// This method is useful in patterns where an input is required, but there is no
566/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
567/// which are just silly.
568///
569/// # Examples
570/// ```
571/// use timely::dataflow::operators::Inspect;
572/// use timely::dataflow::operators::generic::operator::empty;
573/// use timely::dataflow::Scope;
574///
575/// timely::example(|scope| {
576///
577///
578///     empty::<_, Vec<_>>(scope)     // type required in this example
579///         .inspect(|()| panic!("never called"));
580///
581/// });
582/// ```
583pub fn empty<G: Scope, C: Container + Data>(scope: &G) -> StreamCore<G, C> {
584    source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
585        // drop capability, do nothing
586    })
587}