timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputSession, OutputBuilderSession, OutputBuilder};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, ContainerBuilder};
16use crate::container::CapacityContainerBuilder;
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1> {
20    /// Creates a new dataflow operator that partitions its input stream by a parallelization
21    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23    ///
24    /// # Examples
25    /// ```
26    /// use std::collections::HashMap;
27    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28    /// use timely::dataflow::operators::generic::Operator;
29    /// use timely::dataflow::channels::pact::Pipeline;
30    ///
31    /// timely::example(|scope| {
32    ///     (0u64..10).to_stream(scope)
33    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
34    ///             let mut cap = Some(default_cap.delayed(&12));
35    ///             let mut notificator = FrontierNotificator::default();
36    ///             let mut stash = HashMap::new();
37    ///             move |(input, frontier), output| {
38    ///                 if let Some(ref c) = cap.take() {
39    ///                     output.session(&c).give(12);
40    ///                 }
41    ///                 input.for_each_time(|time, data| {
42    ///                     stash.entry(time.time().clone())
43    ///                          .or_insert(Vec::new())
44    ///                          .extend(data.flat_map(|d| d.drain(..)));
45    ///                 });
46    ///                 notificator.for_each(&[frontier], |time, _not| {
47    ///                     if let Some(mut vec) = stash.remove(time.time()) {
48    ///                         output.session(&time).give_iterator(vec.drain(..));
49    ///                     }
50    ///                 });
51    ///             }
52    ///         })
53    ///         .container::<Vec<_>>();
54    /// });
55    /// ```
56    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57    where
58        CB: ContainerBuilder,
59        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60        L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
61                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
62        P: ParallelizationContract<G::Timestamp, C1>;
63
64    /// Creates a new dataflow operator that partitions its input stream by a parallelization
65    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67    ///
68    /// # Examples
69    /// ```
70    /// use std::collections::HashMap;
71    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72    /// use timely::dataflow::operators::generic::Operator;
73    /// use timely::dataflow::channels::pact::Pipeline;
74    ///
75    /// timely::example(|scope| {
76    ///     (0u64..10)
77    ///         .to_stream(scope)
78    ///         .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79    ///             input.for_each_time(|time, data| {
80    ///                 output.session(&time).give_containers(data);
81    ///                 notificator.notify_at(time.retain());
82    ///             });
83    ///             notificator.for_each(|time, _cnt, _not| {
84    ///                 println!("notified at {:?}", time);
85    ///             });
86    ///         });
87    /// });
88    /// ```
89    fn unary_notify<CB: ContainerBuilder,
90            L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
91                     &mut OutputBuilderSession<'_, G::Timestamp, CB>,
92                     &mut Notificator<G::Timestamp>)+'static,
93             P: ParallelizationContract<G::Timestamp, C1>>
94             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96    /// Creates a new dataflow operator that partitions its input stream by a parallelization
97    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98    /// `logic` can read from the input stream, and write to the output stream.
99    ///
100    /// # Examples
101    /// ```
102    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103    /// use timely::dataflow::operators::generic::operator::Operator;
104    /// use timely::dataflow::channels::pact::Pipeline;
105    /// use timely::dataflow::Scope;
106    ///
107    /// timely::example(|scope| {
108    ///     (0u64..10).to_stream(scope)
109    ///         .unary(Pipeline, "example", |default_cap, _info| {
110    ///             let mut cap = Some(default_cap.delayed(&12));
111    ///             move |input, output| {
112    ///                 if let Some(ref c) = cap.take() {
113    ///                     output.session(&c).give(100);
114    ///                 }
115    ///                 input.for_each_time(|time, data| {
116    ///                     output.session(&time).give_containers(data);
117    ///                 });
118    ///             }
119    ///         });
120    /// });
121    /// ```
122    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123    where
124        CB: ContainerBuilder,
125        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126        L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
127                 &mut OutputBuilderSession<G::Timestamp, CB>)+'static,
128        P: ParallelizationContract<G::Timestamp, C1>;
129
130    /// Creates a new dataflow operator that partitions its input streams by a parallelization
131    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133    ///
134    /// # Examples
135    /// ```
136    /// use std::collections::HashMap;
137    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138    /// use timely::dataflow::operators::generic::operator::Operator;
139    /// use timely::dataflow::channels::pact::Pipeline;
140    ///
141    /// timely::execute(timely::Config::thread(), |worker| {
142    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143    ///        let (in1_handle, in1) = scope.new_input();
144    ///        let (in2_handle, in2) = scope.new_input();
145    ///        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146    ///            let mut notificator = FrontierNotificator::default();
147    ///            let mut stash = HashMap::new();
148    ///            move |(input1, frontier1), (input2, frontier2), output| {
149    ///                input1.for_each_time(|time, data| {
150    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
151    ///                    notificator.notify_at(time.retain());
152    ///                });
153    ///                input2.for_each_time(|time, data| {
154    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
155    ///                    notificator.notify_at(time.retain());
156    ///                });
157    ///                notificator.for_each(&[frontier1, frontier2], |time, _not| {
158    ///                    if let Some(mut vec) = stash.remove(time.time()) {
159    ///                        output.session(&time).give_iterator(vec.drain(..));
160    ///                    }
161    ///                });
162    ///            }
163    ///        })
164    ///        .container::<Vec<_>>()
165    ///        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166    ///
167    ///        (in1_handle, in2_handle)
168    ///    });
169    ///
170    ///    for i in 1..10 {
171    ///        in1.send(i - 1);
172    ///        in1.advance_to(i);
173    ///        in2.send(i - 1);
174    ///        in2.advance_to(i);
175    ///    }
176    /// }).unwrap();
177    /// ```
178    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179    where
180        C2: Container,
181        CB: ContainerBuilder,
182        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183        L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
184                 (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
185                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
186        P1: ParallelizationContract<G::Timestamp, C1>,
187        P2: ParallelizationContract<G::Timestamp, C2>;
188
189    /// Creates a new dataflow operator that partitions its input streams by a parallelization
190    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192    ///
193    /// # Examples
194    /// ```
195    /// use std::collections::HashMap;
196    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197    /// use timely::dataflow::operators::generic::operator::Operator;
198    /// use timely::dataflow::channels::pact::Pipeline;
199    ///
200    /// timely::execute(timely::Config::thread(), |worker| {
201    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202    ///        let (in1_handle, in1) = scope.new_input();
203    ///        let (in2_handle, in2) = scope.new_input();
204    ///
205    ///        in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206    ///            input1.for_each_time(|time, data| {
207    ///                output.session(&time).give_containers(data);
208    ///                notificator.notify_at(time.retain());
209    ///            });
210    ///            input2.for_each_time(|time, data| {
211    ///                output.session(&time).give_containers(data);
212    ///                notificator.notify_at(time.retain());
213    ///            });
214    ///            notificator.for_each(|time, _cnt, _not| {
215    ///                println!("notified at {:?}", time);
216    ///            });
217    ///        });
218    ///
219    ///        (in1_handle, in2_handle)
220    ///    });
221    ///
222    ///    for i in 1..10 {
223    ///        in1.send(i - 1);
224    ///        in1.advance_to(i);
225    ///        in2.send(i - 1);
226    ///        in2.advance_to(i);
227    ///    }
228    /// }).unwrap();
229    /// ```
230    fn binary_notify<C2: Container,
231              CB: ContainerBuilder,
232              L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
233                       InputSession<'_, G::Timestamp, C2, P2::Puller>,
234                       &mut OutputBuilderSession<'_, G::Timestamp, CB>,
235                       &mut Notificator<G::Timestamp>)+'static,
236              P1: ParallelizationContract<G::Timestamp, C1>,
237              P2: ParallelizationContract<G::Timestamp, C2>>
238            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240    /// Creates a new dataflow operator that partitions its input streams by a parallelization
241    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243    ///
244    /// # Examples
245    /// ```
246    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247    /// use timely::dataflow::operators::generic::operator::Operator;
248    /// use timely::dataflow::channels::pact::Pipeline;
249    /// use timely::dataflow::Scope;
250    ///
251    /// timely::example(|scope| {
252    ///     let stream2 = (0u64..10).to_stream(scope);
253    ///     (0u64..10).to_stream(scope)
254    ///         .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255    ///             let mut cap = Some(default_cap.delayed(&12));
256    ///             move |input1, input2, output| {
257    ///                 if let Some(ref c) = cap.take() {
258    ///                     output.session(&c).give(100);
259    ///                 }
260    ///                 input1.for_each_time(|time, data| output.session(&time).give_containers(data));
261    ///                 input2.for_each_time(|time, data| output.session(&time).give_containers(data));
262    ///             }
263    ///         }).inspect(|x| println!("{:?}", x));
264    /// });
265    /// ```
266    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
267    where
268        C2: Container,
269        CB: ContainerBuilder,
270        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
271        L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
272                 InputSession<'_, G::Timestamp, C2, P2::Puller>,
273                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
274        P1: ParallelizationContract<G::Timestamp, C1>,
275        P2: ParallelizationContract<G::Timestamp, C2>;
276
277    /// Creates a new dataflow operator that partitions its input stream by a parallelization
278    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
279    /// and inspect the frontier at the input.
280    ///
281    /// # Examples
282    /// ```
283    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
284    /// use timely::dataflow::operators::generic::operator::Operator;
285    /// use timely::dataflow::channels::pact::Pipeline;
286    /// use timely::dataflow::Scope;
287    ///
288    /// timely::example(|scope| {
289    ///     (0u64..10)
290    ///         .to_stream(scope)
291    ///         .sink(Pipeline, "example", |(input, frontier)| {
292    ///             input.for_each_time(|time, data| {
293    ///                 for datum in data.flatten() {
294    ///                     println!("{:?}:\t{:?}", time, datum);
295    ///                 }
296    ///             });
297    ///         });
298    /// });
299    /// ```
300    fn sink<L, P>(&self, pact: P, name: &str, logic: L)
301    where
302        L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
303        P: ParallelizationContract<G::Timestamp, C1>;
304}
305
306impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
307
308    fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
309    where
310        CB: ContainerBuilder,
311        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
312        L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
313                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
314        P: ParallelizationContract<G::Timestamp, C1> {
315
316        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
317        let operator_info = builder.operator_info();
318
319        let mut input = builder.new_input(self, pact);
320        let (output, stream) = builder.new_output();
321        let mut output = OutputBuilder::from(output);
322
323        builder.build(move |mut capabilities| {
324            // `capabilities` should be a single-element vector.
325            let capability = capabilities.pop().unwrap();
326            let mut logic = constructor(capability, operator_info);
327            move |frontiers| {
328                let mut output_handle = output.activate();
329                logic((input.activate(), &frontiers[0]), &mut output_handle);
330            }
331        });
332
333        stream
334    }
335
336    fn unary_notify<CB: ContainerBuilder,
337            L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
338                     &mut OutputBuilderSession<'_, G::Timestamp, CB>,
339                     &mut Notificator<G::Timestamp>)+'static,
340             P: ParallelizationContract<G::Timestamp, C1>>
341             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
342
343        self.unary_frontier(pact, name, move |capability, _info| {
344            let mut notificator = FrontierNotificator::default();
345            for time in init {
346                notificator.notify_at(capability.delayed(&time));
347            }
348
349            move |(input, frontier), output| {
350                let frontiers = &[frontier];
351                let notificator = &mut Notificator::new(frontiers, &mut notificator);
352                logic(input, output, notificator);
353            }
354        })
355    }
356
357    fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
358    where
359        CB: ContainerBuilder,
360        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
361        L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
362                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
363        P: ParallelizationContract<G::Timestamp, C1> {
364
365        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
366        let operator_info = builder.operator_info();
367
368        let mut input = builder.new_input(self, pact);
369        let (output, stream) = builder.new_output();
370        let mut output = OutputBuilder::from(output);
371        builder.set_notify(false);
372
373        builder.build(move |mut capabilities| {
374            // `capabilities` should be a single-element vector.
375            let capability = capabilities.pop().unwrap();
376            let mut logic = constructor(capability, operator_info);
377            move |_frontiers| logic(input.activate(), &mut output.activate())
378        });
379
380        stream
381    }
382
383    fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
384    where
385        C2: Container,
386        CB: ContainerBuilder,
387        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
388        L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
389                 (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
390                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
391        P1: ParallelizationContract<G::Timestamp, C1>,
392        P2: ParallelizationContract<G::Timestamp, C2> {
393
394        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
395        let operator_info = builder.operator_info();
396
397        let mut input1 = builder.new_input(self, pact1);
398        let mut input2 = builder.new_input(other, pact2);
399        let (output, stream) = builder.new_output();
400        let mut output = OutputBuilder::from(output);
401
402        builder.build(move |mut capabilities| {
403            // `capabilities` should be a single-element vector.
404            let capability = capabilities.pop().unwrap();
405            let mut logic = constructor(capability, operator_info);
406            move |frontiers| {
407                let mut output_handle = output.activate();
408                logic((input1.activate(), &frontiers[0]), (input2.activate(), &frontiers[1]), &mut output_handle);
409            }
410        });
411
412        stream
413    }
414
415    fn binary_notify<C2: Container,
416              CB: ContainerBuilder,
417              L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
418                       InputSession<'_, G::Timestamp, C2, P2::Puller>,
419                       &mut OutputBuilderSession<'_, G::Timestamp, CB>,
420                       &mut Notificator<G::Timestamp>)+'static,
421              P1: ParallelizationContract<G::Timestamp, C1>,
422              P2: ParallelizationContract<G::Timestamp, C2>>
423            (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
424
425        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
426            let mut notificator = FrontierNotificator::default();
427            for time in init {
428                notificator.notify_at(capability.delayed(&time));
429            }
430
431            move |(input1, frontier1), (input2, frontier2), output| {
432                let frontiers = &[frontier1, frontier2];
433                let notificator = &mut Notificator::new(frontiers, &mut notificator);
434                logic(input1, input2, output, notificator);
435            }
436        })
437
438    }
439
440
441    fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
442    where
443        C2: Container,
444        CB: ContainerBuilder,
445        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
446        L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
447                 InputSession<'_, G::Timestamp, C2, P2::Puller>,
448                 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
449        P1: ParallelizationContract<G::Timestamp, C1>,
450        P2: ParallelizationContract<G::Timestamp, C2> {
451
452        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
453        let operator_info = builder.operator_info();
454
455        let mut input1 = builder.new_input(self, pact1);
456        let mut input2 = builder.new_input(other, pact2);
457        let (output, stream) = builder.new_output();
458        let mut output = OutputBuilder::from(output);
459        builder.set_notify(false);
460
461        builder.build(move |mut capabilities| {
462            // `capabilities` should be a single-element vector.
463            let capability = capabilities.pop().unwrap();
464            let mut logic = constructor(capability, operator_info);
465            move |_frontiers| {
466                let mut output_handle = output.activate();
467                logic(input1.activate(), input2.activate(), &mut output_handle);
468            }
469        });
470
471        stream
472    }
473
474    fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
475    where
476        L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
477        P: ParallelizationContract<G::Timestamp, C1> {
478
479        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
480        let mut input = builder.new_input(self, pact);
481
482        builder.build(|_capabilities| {
483            move |frontiers| {
484                logic((input.activate(), &frontiers[0]));
485            }
486        });
487    }
488}
489
490/// Creates a new data stream source for a scope.
491///
492/// The source is defined by a name, and a constructor which takes a default capability to
493/// a method that can be repeatedly called on a output handle. The method is then repeatedly
494/// invoked, and is expected to eventually send data and downgrade and release capabilities.
495///
496/// # Examples
497/// ```
498/// use timely::scheduling::Scheduler;
499/// use timely::dataflow::operators::Inspect;
500/// use timely::dataflow::operators::generic::operator::source;
501/// use timely::dataflow::Scope;
502///
503/// timely::example(|scope| {
504///
505///     source(scope, "Source", |capability, info| {
506///
507///         let activator = scope.activator_for(info.address);
508///
509///         let mut cap = Some(capability);
510///         move |output| {
511///
512///             let mut done = false;
513///             if let Some(cap) = cap.as_mut() {
514///                 // get some data and send it.
515///                 let time = cap.time().clone();
516///                 output.session(&cap)
517///                       .give(*cap.time());
518///
519///                 // downgrade capability.
520///                 cap.downgrade(&(time + 1));
521///                 done = time > 20;
522///             }
523///
524///             if done { cap = None; }
525///             else    { activator.activate(); }
526///         }
527///     })
528///     .container::<Vec<_>>()
529///     .inspect(|x| println!("number: {:?}", x));
530/// });
531/// ```
532pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
533where
534    CB: ContainerBuilder,
535    B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
536    L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static {
537
538    let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
539    let operator_info = builder.operator_info();
540
541    let (output, stream) = builder.new_output();
542    let mut output = OutputBuilder::from(output);
543    builder.set_notify(false);
544
545    builder.build(move |mut capabilities| {
546        // `capabilities` should be a single-element vector.
547        let capability = capabilities.pop().unwrap();
548        let mut logic = constructor(capability, operator_info);
549        move |_frontier| {
550            logic(&mut output.activate());
551        }
552    });
553
554    stream
555}
556
557/// Constructs an empty stream.
558///
559/// This method is useful in patterns where an input is required, but there is no
560/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
561/// which are just silly.
562///
563/// # Examples
564/// ```
565/// use timely::dataflow::operators::Inspect;
566/// use timely::dataflow::operators::generic::operator::empty;
567/// use timely::dataflow::Scope;
568///
569/// timely::example(|scope| {
570///
571///
572///     empty::<_, Vec<_>>(scope)     // type required in this example
573///         .inspect(|()| panic!("never called"));
574///
575/// });
576/// ```
577pub fn empty<G: Scope, C: Container>(scope: &G) -> StreamCore<G, C> {
578    source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
579        // drop capability, do nothing
580    })
581}