Skip to main content

timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, Stream};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, ContainerBuilder};
16use crate::container::CapacityContainerBuilder;
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1> {
20    /// Creates a new dataflow operator that partitions its input stream by a parallelization
21    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23    ///
24    /// # Examples
25    /// ```
26    /// use std::collections::HashMap;
27    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28    /// use timely::dataflow::operators::generic::Operator;
29    /// use timely::dataflow::channels::pact::Pipeline;
30    ///
31    /// timely::example(|scope| {
32    ///     (0u64..10)
33    ///         .to_stream(scope)
34    ///         .container::<Vec<_>>()
35    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
36    ///             let mut cap = Some(default_cap.delayed(&12));
37    ///             let mut notificator = FrontierNotificator::default();
38    ///             let mut stash = HashMap::new();
39    ///             move |(input, frontier), output| {
40    ///                 if let Some(ref c) = cap.take() {
41    ///                     output.session(&c).give(12);
42    ///                 }
43    ///                 input.for_each_time(|time, data| {
44    ///                     stash.entry(time.time().clone())
45    ///                          .or_insert(Vec::new())
46    ///                          .extend(data.flat_map(|d| d.drain(..)));
47    ///                 });
48    ///                 notificator.for_each(&[frontier], |time, _not| {
49    ///                     if let Some(mut vec) = stash.remove(time.time()) {
50    ///                         output.session(&time).give_iterator(vec.drain(..));
51    ///                     }
52    ///                 });
53    ///             }
54    ///         })
55    ///         .container::<Vec<_>>();
56    /// });
57    /// ```
58    fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
59    where
60        CB: ContainerBuilder,
61        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
62        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
63                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
64        P: ParallelizationContract<G::Timestamp, C1>;
65
66    /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
67    /// and repeatedly invokes the closure supplied as `logic`, which can read from the input stream, write to
68    /// the output stream, and inspect the frontier at the input.
69    ///
70    /// # Examples
71    /// ```
72    /// use std::collections::HashMap;
73    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
74    /// use timely::dataflow::operators::generic::Operator;
75    /// use timely::dataflow::channels::pact::Pipeline;
76    ///
77    /// timely::example(|scope| {
78    ///     (0u64..10)
79    ///         .to_stream(scope)
80    ///         .container::<Vec<_>>()
81    ///         .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
82    ///             input.for_each_time(|time, data| {
83    ///                 output.session(&time).give_containers(data);
84    ///                 notificator.notify_at(time.retain(output.output_index()));
85    ///             });
86    ///             notificator.for_each(|time, _cnt, _not| {
87    ///                 println!("notified at {:?}", time);
88    ///             });
89    ///         });
90    /// });
91    /// ```
92    fn unary_notify<CB: ContainerBuilder,
93            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
94                     &mut OutputBuilderSession<'_, G::Timestamp, CB>,
95                     &mut Notificator<G::Timestamp>)+'static,
96             P: ParallelizationContract<G::Timestamp, C1>>
97             (self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> Stream<G, CB::Container>;
98
99    /// Creates a new dataflow operator that partitions its input stream by a parallelization
100    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
101    /// `logic` can read from the input stream, and write to the output stream.
102    ///
103    /// # Examples
104    /// ```
105    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
106    /// use timely::dataflow::operators::generic::operator::Operator;
107    /// use timely::dataflow::channels::pact::Pipeline;
108    /// use timely::dataflow::Scope;
109    ///
110    /// timely::example(|scope| {
111    ///     (0u64..10)
112    ///         .to_stream(scope)
113    ///         .container::<Vec<_>>()
114    ///         .unary(Pipeline, "example", |default_cap, _info| {
115    ///             let mut cap = Some(default_cap.delayed(&12));
116    ///             move |input, output| {
117    ///                 if let Some(ref c) = cap.take() {
118    ///                     output.session(&c).give(100);
119    ///                 }
120    ///                 input.for_each_time(|time, data| {
121    ///                     output.session(&time).give_containers(data);
122    ///                 });
123    ///             }
124    ///         });
125    /// });
126    /// ```
127    fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
128    where
129        CB: ContainerBuilder,
130        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
131        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
132                 &mut OutputBuilderSession<G::Timestamp, CB>)+'static,
133        P: ParallelizationContract<G::Timestamp, C1>;
134
135    /// Creates a new dataflow operator that partitions its input streams by a parallelization
136    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
137    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
138    ///
139    /// # Examples
140    /// ```
141    /// use std::collections::HashMap;
142    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
143    /// use timely::dataflow::operators::generic::operator::Operator;
144    /// use timely::dataflow::channels::pact::Pipeline;
145    ///
146    /// timely::execute(timely::Config::thread(), |worker| {
147    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
148    ///        let (in1_handle, in1) = scope.new_input::<Vec<_>>();
149    ///        let (in2_handle, in2) = scope.new_input::<Vec<_>>();
150    ///        in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
151    ///            let mut notificator = FrontierNotificator::default();
152    ///            let mut stash = HashMap::new();
153    ///            move |(input1, frontier1), (input2, frontier2), output| {
154    ///                input1.for_each_time(|time, data| {
155    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
156    ///                    notificator.notify_at(time.retain(output.output_index()));
157    ///                });
158    ///                input2.for_each_time(|time, data| {
159    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
160    ///                    notificator.notify_at(time.retain(output.output_index()));
161    ///                });
162    ///                notificator.for_each(&[frontier1, frontier2], |time, _not| {
163    ///                    if let Some(mut vec) = stash.remove(time.time()) {
164    ///                        output.session(&time).give_iterator(vec.drain(..));
165    ///                    }
166    ///                });
167    ///            }
168    ///        })
169    ///        .container::<Vec<_>>()
170    ///        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
171    ///
172    ///        (in1_handle, in2_handle)
173    ///    });
174    ///
175    ///    for i in 1..10 {
176    ///        in1.send(i - 1);
177    ///        in1.advance_to(i);
178    ///        in2.send(i - 1);
179    ///        in2.advance_to(i);
180    ///    }
181    /// }).unwrap();
182    /// ```
183    fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
184    where
185        C2: Container,
186        CB: ContainerBuilder,
187        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
188        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
189                 (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
190                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
191        P1: ParallelizationContract<G::Timestamp, C1>,
192        P2: ParallelizationContract<G::Timestamp, C2>;
193
194    /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
195    /// and repeatedly invokes the closure supplied as `logic`, which can read from the input streams, write to
196    /// the output stream, and inspect the frontier at the inputs.
197    ///
198    /// # Examples
199    /// ```
200    /// use std::collections::HashMap;
201    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
202    /// use timely::dataflow::operators::generic::operator::Operator;
203    /// use timely::dataflow::channels::pact::Pipeline;
204    ///
205    /// timely::execute(timely::Config::thread(), |worker| {
206    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
207    ///        let (in1_handle, in1) = scope.new_input::<Vec<_>>();
208    ///        let (in2_handle, in2) = scope.new_input::<Vec<_>>();
209    ///
210    ///        in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
211    ///            input1.for_each_time(|time, data| {
212    ///                output.session(&time).give_containers(data);
213    ///                notificator.notify_at(time.retain(output.output_index()));
214    ///            });
215    ///            input2.for_each_time(|time, data| {
216    ///                output.session(&time).give_containers(data);
217    ///                notificator.notify_at(time.retain(output.output_index()));
218    ///            });
219    ///            notificator.for_each(|time, _cnt, _not| {
220    ///                println!("notified at {:?}", time);
221    ///            });
222    ///        });
223    ///
224    ///        (in1_handle, in2_handle)
225    ///    });
226    ///
227    ///    for i in 1..10 {
228    ///        in1.send(i - 1);
229    ///        in1.advance_to(i);
230    ///        in2.send(i - 1);
231    ///        in2.advance_to(i);
232    ///    }
233    /// }).unwrap();
234    /// ```
235    fn binary_notify<C2: Container,
236              CB: ContainerBuilder,
237              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
238                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
239                       &mut OutputBuilderSession<'_, G::Timestamp, CB>,
240                       &mut Notificator<G::Timestamp>)+'static,
241              P1: ParallelizationContract<G::Timestamp, C1>,
242              P2: ParallelizationContract<G::Timestamp, C2>>
243            (self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> Stream<G, CB::Container>;
244
245    /// Creates a new dataflow operator that partitions its input streams by a parallelization
246    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
247    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
248    ///
249    /// # Examples
250    /// ```
251    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
252    /// use timely::dataflow::operators::generic::operator::Operator;
253    /// use timely::dataflow::channels::pact::Pipeline;
254    /// use timely::dataflow::Scope;
255    ///
256    /// timely::example(|scope| {
257    ///     let stream1 = (0u64..10).to_stream(scope).container::<Vec<_>>();
258    ///     let stream2 = (0u64..10).to_stream(scope).container::<Vec<_>>();
259    ///     stream1
260    ///         .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
261    ///             let mut cap = Some(default_cap.delayed(&12));
262    ///             move |input1, input2, output| {
263    ///                 if let Some(ref c) = cap.take() {
264    ///                     output.session(&c).give(100);
265    ///                 }
266    ///                 input1.for_each_time(|time, data| output.session(&time).give_containers(data));
267    ///                 input2.for_each_time(|time, data| output.session(&time).give_containers(data));
268    ///             }
269    ///         }).inspect(|x| println!("{:?}", x));
270    /// });
271    /// ```
272    fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
273    where
274        C2: Container,
275        CB: ContainerBuilder,
276        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
277        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
278                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
279                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
280        P1: ParallelizationContract<G::Timestamp, C1>,
281        P2: ParallelizationContract<G::Timestamp, C2>;
282
283    /// Creates a new dataflow operator that partitions its input stream by a parallelization
284    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
285    /// and inspect the frontier at the input.
286    ///
287    /// # Examples
288    /// ```
289    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
290    /// use timely::dataflow::operators::generic::operator::Operator;
291    /// use timely::dataflow::channels::pact::Pipeline;
292    /// use timely::dataflow::Scope;
293    ///
294    /// timely::example(|scope| {
295    ///     (0u64..10)
296    ///         .to_stream(scope)
297    ///         .container::<Vec<_>>()
298    ///         .sink(Pipeline, "example", |(input, frontier)| {
299    ///             input.for_each_time(|time, data| {
300    ///                 for datum in data.flatten() {
301    ///                     println!("{:?}:\t{:?}", time, datum);
302    ///                 }
303    ///             });
304    ///         });
305    /// });
306    /// ```
307    fn sink<L, P>(self, pact: P, name: &str, logic: L)
308    where
309        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
310        P: ParallelizationContract<G::Timestamp, C1>;
311}
312
313impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {
314
315    fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
316    where
317        CB: ContainerBuilder,
318        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
319        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
320                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
321        P: ParallelizationContract<G::Timestamp, C1> {
322
323        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
324        let operator_info = builder.operator_info();
325
326        let mut input = builder.new_input(self, pact);
327        let (output, stream) = builder.new_output();
328        let mut output = OutputBuilder::from(output);
329
330        builder.build(move |mut capabilities| {
331            // `capabilities` should be a single-element vector.
332            let capability = capabilities.pop().unwrap();
333            let mut logic = constructor(capability, operator_info);
334            move |frontiers| {
335                let mut output_handle = output.activate();
336                logic((&mut input, &frontiers[0]), &mut output_handle);
337            }
338        });
339
340        stream
341    }
342
343    fn unary_notify<CB: ContainerBuilder,
344            L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
345                     &mut OutputBuilderSession<'_, G::Timestamp, CB>,
346                     &mut Notificator<G::Timestamp>)+'static,
347             P: ParallelizationContract<G::Timestamp, C1>>
348             (self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> Stream<G, CB::Container> {
349
350        self.unary_frontier(pact, name, move |capability, _info| {
351            let mut notificator = FrontierNotificator::default();
352            for time in init {
353                notificator.notify_at(capability.delayed(&time));
354            }
355
356            move |(input, frontier), output| {
357                let frontiers = &[frontier];
358                let notificator = &mut Notificator::new(frontiers, &mut notificator);
359                logic(input, output, notificator);
360            }
361        })
362    }
363
364    fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
365    where
366        CB: ContainerBuilder,
367        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
368        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
369                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
370        P: ParallelizationContract<G::Timestamp, C1> {
371
372        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
373        let operator_info = builder.operator_info();
374
375        let mut input = builder.new_input(self, pact);
376        let (output, stream) = builder.new_output();
377        let mut output = OutputBuilder::from(output);
378        builder.set_notify(false);
379
380        builder.build(move |mut capabilities| {
381            // `capabilities` should be a single-element vector.
382            let capability = capabilities.pop().unwrap();
383            let mut logic = constructor(capability, operator_info);
384            move |_frontiers| logic(&mut input, &mut output.activate())
385        });
386
387        stream
388    }
389
390    fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
391    where
392        C2: Container,
393        CB: ContainerBuilder,
394        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
395        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
396                 (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
397                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
398        P1: ParallelizationContract<G::Timestamp, C1>,
399        P2: ParallelizationContract<G::Timestamp, C2> {
400
401        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
402        let operator_info = builder.operator_info();
403
404        let mut input1 = builder.new_input(self, pact1);
405        let mut input2 = builder.new_input(other, pact2);
406        let (output, stream) = builder.new_output();
407        let mut output = OutputBuilder::from(output);
408
409        builder.build(move |mut capabilities| {
410            // `capabilities` should be a single-element vector.
411            let capability = capabilities.pop().unwrap();
412            let mut logic = constructor(capability, operator_info);
413            move |frontiers| {
414                let mut output_handle = output.activate();
415                logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle);
416            }
417        });
418
419        stream
420    }
421
422    fn binary_notify<C2: Container,
423              CB: ContainerBuilder,
424              L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
425                       &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
426                       &mut OutputBuilderSession<'_, G::Timestamp, CB>,
427                       &mut Notificator<G::Timestamp>)+'static,
428              P1: ParallelizationContract<G::Timestamp, C1>,
429              P2: ParallelizationContract<G::Timestamp, C2>>
430            (self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> Stream<G, CB::Container> {
431
432        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
433            let mut notificator = FrontierNotificator::default();
434            for time in init {
435                notificator.notify_at(capability.delayed(&time));
436            }
437
438            move |(input1, frontier1), (input2, frontier2), output| {
439                let frontiers = &[frontier1, frontier2];
440                let notificator = &mut Notificator::new(frontiers, &mut notificator);
441                logic(input1, input2, output, notificator);
442            }
443        })
444
445    }
446
447
448    fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
449    where
450        C2: Container,
451        CB: ContainerBuilder,
452        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
453        L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
454                 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
455                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
456        P1: ParallelizationContract<G::Timestamp, C1>,
457        P2: ParallelizationContract<G::Timestamp, C2> {
458
459        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
460        let operator_info = builder.operator_info();
461
462        let mut input1 = builder.new_input(self, pact1);
463        let mut input2 = builder.new_input(other, pact2);
464        let (output, stream) = builder.new_output();
465        let mut output = OutputBuilder::from(output);
466        builder.set_notify(false);
467
468        builder.build(move |mut capabilities| {
469            // `capabilities` should be a single-element vector.
470            let capability = capabilities.pop().unwrap();
471            let mut logic = constructor(capability, operator_info);
472            move |_frontiers| {
473                let mut output_handle = output.activate();
474                logic(&mut input1, &mut input2, &mut output_handle);
475            }
476        });
477
478        stream
479    }
480
481    fn sink<L, P>(self, pact: P, name: &str, mut logic: L)
482    where
483        L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
484        P: ParallelizationContract<G::Timestamp, C1> {
485
486        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
487        let mut input = builder.new_input(self, pact);
488
489        builder.build(|_capabilities| {
490            move |frontiers| {
491                logic((&mut input, &frontiers[0]));
492            }
493        });
494    }
495}
496
497/// Creates a new data stream source for a scope.
498///
499/// The source is defined by a name, and a constructor which takes a default capability to
500/// a method that can be repeatedly called on a output handle. The method is then repeatedly
501/// invoked, and is expected to eventually send data and downgrade and release capabilities.
502///
503/// # Examples
504/// ```
505/// use timely::scheduling::Scheduler;
506/// use timely::dataflow::operators::Inspect;
507/// use timely::dataflow::operators::generic::operator::source;
508/// use timely::dataflow::Scope;
509///
510/// timely::example(|scope| {
511///
512///     source(scope, "Source", |capability, info| {
513///
514///         let activator = scope.activator_for(info.address);
515///
516///         let mut cap = Some(capability);
517///         move |output| {
518///
519///             let mut done = false;
520///             if let Some(cap) = cap.as_mut() {
521///                 // get some data and send it.
522///                 let time = cap.time().clone();
523///                 output.session(&cap)
524///                       .give(*cap.time());
525///
526///                 // downgrade capability.
527///                 cap.downgrade(&(time + 1));
528///                 done = time > 20;
529///             }
530///
531///             if done { cap = None; }
532///             else    { activator.activate(); }
533///         }
534///     })
535///     .container::<Vec<_>>()
536///     .inspect(|x| println!("number: {:?}", x));
537/// });
538/// ```
539pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> Stream<G, CB::Container>
540where
541    CB: ContainerBuilder,
542    B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
543    L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static {
544
545    let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
546    let operator_info = builder.operator_info();
547
548    let (output, stream) = builder.new_output();
549    let mut output = OutputBuilder::from(output);
550    builder.set_notify(false);
551
552    builder.build(move |mut capabilities| {
553        // `capabilities` should be a single-element vector.
554        let capability = capabilities.pop().unwrap();
555        let mut logic = constructor(capability, operator_info);
556        move |_frontier| {
557            logic(&mut output.activate());
558        }
559    });
560
561    stream
562}
563
564/// Constructs an empty stream.
565///
566/// This method is useful in patterns where an input is required, but there is no
567/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
568/// which are just silly.
569///
570/// # Examples
571/// ```
572/// use timely::dataflow::operators::Inspect;
573/// use timely::dataflow::operators::generic::operator::empty;
574/// use timely::dataflow::Scope;
575///
576/// timely::example(|scope| {
577///
578///
579///     empty::<_, Vec<_>>(scope)     // type required in this example
580///         .inspect(|()| panic!("never called"));
581///
582/// });
583/// ```
584pub fn empty<G: Scope, C: Container>(scope: &G) -> Stream<G, C> {
585    source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
586        // drop capability, do nothing
587    })
588}