Skip to main content

timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::progress::Timestamp;
6use crate::dataflow::channels::pact::ParallelizationContract;
7
8use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder};
9use crate::dataflow::operators::capability::Capability;
10
11use crate::dataflow::{Scope, Stream};
12
13use super::builder_rc::OperatorBuilder;
14use crate::dataflow::operators::generic::OperatorInfo;
15use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
16use crate::{Container, ContainerBuilder};
17use crate::container::CapacityContainerBuilder;
18
19/// Methods to construct generic streaming and blocking operators.
20pub trait Operator<'scope, T: Timestamp, C1> {
21    /// Creates a new dataflow operator that partitions its input stream by a parallelization
22    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
23    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
24    ///
25    /// # Examples
26    /// ```
27    /// use std::collections::HashMap;
28    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
29    /// use timely::dataflow::operators::generic::Operator;
30    /// use timely::dataflow::channels::pact::Pipeline;
31    ///
32    /// timely::example(|scope| {
33    ///     (0u64..10)
34    ///         .to_stream(scope)
35    ///         .container::<Vec<_>>()
36    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
37    ///             let mut cap = Some(default_cap.delayed(&12));
38    ///             let mut notificator = FrontierNotificator::default();
39    ///             let mut stash = HashMap::new();
40    ///             move |(input, frontier), output| {
41    ///                 if let Some(ref c) = cap.take() {
42    ///                     output.session(&c).give(12);
43    ///                 }
44    ///                 input.for_each_time(|time, data| {
45    ///                     stash.entry(time.time().clone())
46    ///                          .or_insert(Vec::new())
47    ///                          .extend(data.flat_map(|d| d.drain(..)));
48    ///                 });
49    ///                 notificator.for_each(&[frontier], |time, _not| {
50    ///                     if let Some(mut vec) = stash.remove(time.time()) {
51    ///                         output.session(&time).give_iterator(vec.drain(..));
52    ///                     }
53    ///                 });
54    ///             }
55    ///         })
56    ///         .container::<Vec<_>>();
57    /// });
58    /// ```
59    fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
60    where
61        CB: ContainerBuilder,
62        B: FnOnce(Capability<T>, OperatorInfo) -> L,
63        L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>),
64                 &mut OutputBuilderSession<'_, T, CB>)+'static,
65        P: ParallelizationContract<T, C1>;
66
67    /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
68    /// and repeatedly invokes the closure supplied as `logic`, which can read from the input stream, write to
69    /// the output stream, and inspect the frontier at the input.
70    ///
71    /// # Examples
72    /// ```
73    /// use std::collections::HashMap;
74    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
75    /// use timely::dataflow::operators::generic::Operator;
76    /// use timely::dataflow::channels::pact::Pipeline;
77    ///
78    /// timely::example(|scope| {
79    ///     (0u64..10)
80    ///         .to_stream(scope)
81    ///         .container::<Vec<_>>()
82    ///         .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
83    ///             input.for_each_time(|time, data| {
84    ///                 output.session(&time).give_containers(data);
85    ///                 notificator.notify_at(time.retain(output.output_index()));
86    ///             });
87    ///             notificator.for_each(|time, _cnt, _not| {
88    ///                 println!("notified at {:?}", time);
89    ///             });
90    ///         });
91    /// });
92    /// ```
93    fn unary_notify<CB: ContainerBuilder,
94            L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
95                     &mut OutputBuilderSession<'_, T, CB>,
96                     &mut Notificator<T>)+'static,
97             P: ParallelizationContract<T, C1>>
98             (self, pact: P, name: &str, init: impl IntoIterator<Item=T>, logic: L) -> Stream<'scope, T, CB::Container>;
99
100    /// Creates a new dataflow operator that partitions its input stream by a parallelization
101    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
102    /// `logic` can read from the input stream, and write to the output stream.
103    ///
104    /// # Examples
105    /// ```
106    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
107    /// use timely::dataflow::operators::generic::operator::Operator;
108    /// use timely::dataflow::channels::pact::Pipeline;
109    /// use timely::dataflow::Scope;
110    ///
111    /// timely::example(|scope| {
112    ///     (0u64..10)
113    ///         .to_stream(scope)
114    ///         .container::<Vec<_>>()
115    ///         .unary(Pipeline, "example", |default_cap, _info| {
116    ///             let mut cap = Some(default_cap.delayed(&12));
117    ///             move |input, output| {
118    ///                 if let Some(ref c) = cap.take() {
119    ///                     output.session(&c).give(100);
120    ///                 }
121    ///                 input.for_each_time(|time, data| {
122    ///                     output.session(&time).give_containers(data);
123    ///                 });
124    ///             }
125    ///         });
126    /// });
127    /// ```
128    fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
129    where
130        CB: ContainerBuilder,
131        B: FnOnce(Capability<T>, OperatorInfo) -> L,
132        L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
133                 &mut OutputBuilderSession<T, CB>)+'static,
134        P: ParallelizationContract<T, C1>;
135
136    /// Creates a new dataflow operator that partitions its input streams by a parallelization
137    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
138    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
139    ///
140    /// # Examples
141    /// ```
142    /// use std::collections::HashMap;
143    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
144    /// use timely::dataflow::operators::generic::operator::Operator;
145    /// use timely::dataflow::channels::pact::Pipeline;
146    ///
147    /// timely::execute(timely::Config::thread(), |worker| {
148    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
149    ///        let (in1_handle, in1) = scope.new_input::<Vec<_>>();
150    ///        let (in2_handle, in2) = scope.new_input::<Vec<_>>();
151    ///        in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
152    ///            let mut notificator = FrontierNotificator::default();
153    ///            let mut stash = HashMap::new();
154    ///            move |(input1, frontier1), (input2, frontier2), output| {
155    ///                input1.for_each_time(|time, data| {
156    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
157    ///                    notificator.notify_at(time.retain(output.output_index()));
158    ///                });
159    ///                input2.for_each_time(|time, data| {
160    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
161    ///                    notificator.notify_at(time.retain(output.output_index()));
162    ///                });
163    ///                notificator.for_each(&[frontier1, frontier2], |time, _not| {
164    ///                    if let Some(mut vec) = stash.remove(time.time()) {
165    ///                        output.session(&time).give_iterator(vec.drain(..));
166    ///                    }
167    ///                });
168    ///            }
169    ///        })
170    ///        .container::<Vec<_>>()
171    ///        .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
172    ///
173    ///        (in1_handle, in2_handle)
174    ///    });
175    ///
176    ///    for i in 1..10 {
177    ///        in1.send(i - 1);
178    ///        in1.advance_to(i);
179    ///        in2.send(i - 1);
180    ///        in2.advance_to(i);
181    ///    }
182    /// }).unwrap();
183    /// ```
184    fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
185    where
186        C2: Container,
187        CB: ContainerBuilder,
188        B: FnOnce(Capability<T>, OperatorInfo) -> L,
189        L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>),
190                 (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>),
191                 &mut OutputBuilderSession<'_, T, CB>)+'static,
192        P1: ParallelizationContract<T, C1>,
193        P2: ParallelizationContract<T, C2>;
194
195    /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
196    /// and repeatedly invokes the closure supplied as `logic`, which can read from the input streams, write to
197    /// the output stream, and inspect the frontier at the inputs.
198    ///
199    /// # Examples
200    /// ```
201    /// use std::collections::HashMap;
202    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
203    /// use timely::dataflow::operators::generic::operator::Operator;
204    /// use timely::dataflow::channels::pact::Pipeline;
205    ///
206    /// timely::execute(timely::Config::thread(), |worker| {
207    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
208    ///        let (in1_handle, in1) = scope.new_input::<Vec<_>>();
209    ///        let (in2_handle, in2) = scope.new_input::<Vec<_>>();
210    ///
211    ///        in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
212    ///            input1.for_each_time(|time, data| {
213    ///                output.session(&time).give_containers(data);
214    ///                notificator.notify_at(time.retain(output.output_index()));
215    ///            });
216    ///            input2.for_each_time(|time, data| {
217    ///                output.session(&time).give_containers(data);
218    ///                notificator.notify_at(time.retain(output.output_index()));
219    ///            });
220    ///            notificator.for_each(|time, _cnt, _not| {
221    ///                println!("notified at {:?}", time);
222    ///            });
223    ///        });
224    ///
225    ///        (in1_handle, in2_handle)
226    ///    });
227    ///
228    ///    for i in 1..10 {
229    ///        in1.send(i - 1);
230    ///        in1.advance_to(i);
231    ///        in2.send(i - 1);
232    ///        in2.advance_to(i);
233    ///    }
234    /// }).unwrap();
235    /// ```
236    fn binary_notify<C2: Container,
237              CB: ContainerBuilder,
238              L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
239                       &mut InputHandleCore<T, C2, P2::Puller>,
240                       &mut OutputBuilderSession<'_, T, CB>,
241                       &mut Notificator<T>)+'static,
242              P1: ParallelizationContract<T, C1>,
243              P2: ParallelizationContract<T, C2>>
244            (self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=T>, logic: L) -> Stream<'scope, T, CB::Container>;
245
246    /// Creates a new dataflow operator that partitions its input streams by a parallelization
247    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
248    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
249    ///
250    /// # Examples
251    /// ```
252    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
253    /// use timely::dataflow::operators::generic::operator::Operator;
254    /// use timely::dataflow::channels::pact::Pipeline;
255    /// use timely::dataflow::Scope;
256    ///
257    /// timely::example(|scope| {
258    ///     let stream1 = (0u64..10).to_stream(scope).container::<Vec<_>>();
259    ///     let stream2 = (0u64..10).to_stream(scope).container::<Vec<_>>();
260    ///     stream1
261    ///         .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
262    ///             let mut cap = Some(default_cap.delayed(&12));
263    ///             move |input1, input2, output| {
264    ///                 if let Some(ref c) = cap.take() {
265    ///                     output.session(&c).give(100);
266    ///                 }
267    ///                 input1.for_each_time(|time, data| output.session(&time).give_containers(data));
268    ///                 input2.for_each_time(|time, data| output.session(&time).give_containers(data));
269    ///             }
270    ///         }).inspect(|x| println!("{:?}", x));
271    /// });
272    /// ```
273    fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
274    where
275        C2: Container,
276        CB: ContainerBuilder,
277        B: FnOnce(Capability<T>, OperatorInfo) -> L,
278        L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
279                 &mut InputHandleCore<T, C2, P2::Puller>,
280                 &mut OutputBuilderSession<'_, T, CB>)+'static,
281        P1: ParallelizationContract<T, C1>,
282        P2: ParallelizationContract<T, C2>;
283
284    /// Creates a new dataflow operator that partitions its input stream by a parallelization
285    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
286    /// and inspect the frontier at the input.
287    ///
288    /// # Examples
289    /// ```
290    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
291    /// use timely::dataflow::operators::generic::operator::Operator;
292    /// use timely::dataflow::channels::pact::Pipeline;
293    /// use timely::dataflow::Scope;
294    ///
295    /// timely::example(|scope| {
296    ///     (0u64..10)
297    ///         .to_stream(scope)
298    ///         .container::<Vec<_>>()
299    ///         .sink(Pipeline, "example", |(input, frontier)| {
300    ///             input.for_each_time(|time, data| {
301    ///                 for datum in data.flatten() {
302    ///                     println!("{:?}:\t{:?}", time, datum);
303    ///                 }
304    ///             });
305    ///         });
306    /// });
307    /// ```
308    fn sink<L, P>(self, pact: P, name: &str, logic: L)
309    where
310        L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>))+'static,
311        P: ParallelizationContract<T, C1>;
312}
313
314impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'scope, T, C1> {
315
316    fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
317    where
318        CB: ContainerBuilder,
319        B: FnOnce(Capability<T>, OperatorInfo) -> L,
320        L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>),
321                 &mut OutputBuilderSession<'_, T, CB>)+'static,
322        P: ParallelizationContract<T, C1> {
323
324        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
325        let operator_info = builder.operator_info();
326
327        let mut input = builder.new_input(self, pact);
328        let (output, stream) = builder.new_output();
329        let mut output = OutputBuilder::from(output);
330
331        builder.build(move |mut capabilities| {
332            // `capabilities` should be a single-element vector.
333            let capability = capabilities.pop().unwrap();
334            let mut logic = constructor(capability, operator_info);
335            move |frontiers| {
336                let mut output_handle = output.activate();
337                logic((&mut input, &frontiers[0]), &mut output_handle);
338            }
339        });
340
341        stream
342    }
343
344    fn unary_notify<CB: ContainerBuilder,
345            L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
346                     &mut OutputBuilderSession<'_, T, CB>,
347                     &mut Notificator<T>)+'static,
348             P: ParallelizationContract<T, C1>>
349             (self, pact: P, name: &str, init: impl IntoIterator<Item=T>, mut logic: L) -> Stream<'scope, T, CB::Container> {
350
351        self.unary_frontier(pact, name, move |capability, _info| {
352            let mut notificator = FrontierNotificator::default();
353            for time in init {
354                notificator.notify_at(capability.delayed(&time));
355            }
356
357            move |(input, frontier), output| {
358                let frontiers = &[frontier];
359                let notificator = &mut Notificator::new(frontiers, &mut notificator);
360                logic(input, output, notificator);
361            }
362        })
363    }
364
365    fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
366    where
367        CB: ContainerBuilder,
368        B: FnOnce(Capability<T>, OperatorInfo) -> L,
369        L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
370                 &mut OutputBuilderSession<'_, T, CB>)+'static,
371        P: ParallelizationContract<T, C1> {
372
373        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
374        let operator_info = builder.operator_info();
375
376        let mut input = builder.new_input(self, pact);
377        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
378        let (output, stream) = builder.new_output();
379        let mut output = OutputBuilder::from(output);
380
381        builder.build(move |mut capabilities| {
382            // `capabilities` should be a single-element vector.
383            let capability = capabilities.pop().unwrap();
384            let mut logic = constructor(capability, operator_info);
385            move |_frontiers| logic(&mut input, &mut output.activate())
386        });
387
388        stream
389    }
390
391    fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
392    where
393        C2: Container,
394        CB: ContainerBuilder,
395        B: FnOnce(Capability<T>, OperatorInfo) -> L,
396        L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>),
397                 (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>),
398                 &mut OutputBuilderSession<'_, T, CB>)+'static,
399        P1: ParallelizationContract<T, C1>,
400        P2: ParallelizationContract<T, C2> {
401
402        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
403        let operator_info = builder.operator_info();
404
405        let mut input1 = builder.new_input(self, pact1);
406        let mut input2 = builder.new_input(other, pact2);
407        let (output, stream) = builder.new_output();
408        let mut output = OutputBuilder::from(output);
409
410        builder.build(move |mut capabilities| {
411            // `capabilities` should be a single-element vector.
412            let capability = capabilities.pop().unwrap();
413            let mut logic = constructor(capability, operator_info);
414            move |frontiers| {
415                let mut output_handle = output.activate();
416                logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle);
417            }
418        });
419
420        stream
421    }
422
423    fn binary_notify<C2: Container,
424              CB: ContainerBuilder,
425              L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
426                       &mut InputHandleCore<T, C2, P2::Puller>,
427                       &mut OutputBuilderSession<'_, T, CB>,
428                       &mut Notificator<T>)+'static,
429              P1: ParallelizationContract<T, C1>,
430              P2: ParallelizationContract<T, C2>>
431            (self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=T>, mut logic: L) -> Stream<'scope, T, CB::Container> {
432
433        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
434            let mut notificator = FrontierNotificator::default();
435            for time in init {
436                notificator.notify_at(capability.delayed(&time));
437            }
438
439            move |(input1, frontier1), (input2, frontier2), output| {
440                let frontiers = &[frontier1, frontier2];
441                let notificator = &mut Notificator::new(frontiers, &mut notificator);
442                logic(input1, input2, output, notificator);
443            }
444        })
445
446    }
447
448
449    fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
450    where
451        C2: Container,
452        CB: ContainerBuilder,
453        B: FnOnce(Capability<T>, OperatorInfo) -> L,
454        L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
455                 &mut InputHandleCore<T, C2, P2::Puller>,
456                 &mut OutputBuilderSession<'_, T, CB>)+'static,
457        P1: ParallelizationContract<T, C1>,
458        P2: ParallelizationContract<T, C2> {
459
460        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
461        let operator_info = builder.operator_info();
462
463        let mut input1 = builder.new_input(self, pact1);
464        let mut input2 = builder.new_input(other, pact2);
465        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
466        builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never);
467        let (output, stream) = builder.new_output();
468        let mut output = OutputBuilder::from(output);
469
470        builder.build(move |mut capabilities| {
471            // `capabilities` should be a single-element vector.
472            let capability = capabilities.pop().unwrap();
473            let mut logic = constructor(capability, operator_info);
474            move |_frontiers| {
475                let mut output_handle = output.activate();
476                logic(&mut input1, &mut input2, &mut output_handle);
477            }
478        });
479
480        stream
481    }
482
483    fn sink<L, P>(self, pact: P, name: &str, mut logic: L)
484    where
485        L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>))+'static,
486        P: ParallelizationContract<T, C1> {
487
488        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
489        let mut input = builder.new_input(self, pact);
490
491        builder.build(|_capabilities| {
492            move |frontiers| {
493                logic((&mut input, &frontiers[0]));
494            }
495        });
496    }
497}
498
499/// Creates a new data stream source for a scope.
500///
501/// The source is defined by a name, and a constructor which takes a default capability to
502/// a method that can be repeatedly called on a output handle. The method is then repeatedly
503/// invoked, and is expected to eventually send data and downgrade and release capabilities.
504///
505/// # Examples
506/// ```
507/// use timely::dataflow::operators::Inspect;
508/// use timely::dataflow::operators::generic::operator::source;
509/// use timely::dataflow::Scope;
510///
511/// timely::example(|scope| {
512///
513///     source(scope, "Source", |capability, info| {
514///
515///         let activator = scope.activator_for(info.address);
516///
517///         let mut cap = Some(capability);
518///         move |output| {
519///
520///             let mut done = false;
521///             if let Some(cap) = cap.as_mut() {
522///                 // get some data and send it.
523///                 let time = cap.time().clone();
524///                 output.session(&cap)
525///                       .give(*cap.time());
526///
527///                 // downgrade capability.
528///                 cap.downgrade(&(time + 1));
529///                 done = time > 20;
530///             }
531///
532///             if done { cap = None; }
533///             else    { activator.activate(); }
534///         }
535///     })
536///     .container::<Vec<_>>()
537///     .inspect(|x| println!("number: {:?}", x));
538/// });
539/// ```
540pub fn source<'scope, T: Timestamp, CB, B, L>(scope: Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
541where
542    CB: ContainerBuilder,
543    B: FnOnce(Capability<T>, OperatorInfo) -> L,
544    L: FnMut(&mut OutputBuilderSession<'_, T, CB>)+'static {
545
546    let mut builder = OperatorBuilder::new(name.to_owned(), scope);
547    let operator_info = builder.operator_info();
548
549    let (output, stream) = builder.new_output();
550    let mut output = OutputBuilder::from(output);
551
552    builder.build(move |mut capabilities| {
553        // `capabilities` should be a single-element vector.
554        let capability = capabilities.pop().unwrap();
555        let mut logic = constructor(capability, operator_info);
556        move |_frontier| {
557            logic(&mut output.activate());
558        }
559    });
560
561    stream
562}
563
564/// Constructs an empty stream.
565///
566/// This method is useful in patterns where an input is required, but there is no
567/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
568/// which are just silly.
569///
570/// # Examples
571/// ```
572/// use timely::dataflow::operators::Inspect;
573/// use timely::dataflow::operators::generic::operator::empty;
574/// use timely::dataflow::Scope;
575///
576/// timely::example(|scope| {
577///
578///
579///     empty::<_, Vec<_>>(scope)     // type required in this example
580///         .inspect(|()| panic!("never called"));
581///
582/// });
583/// ```
584pub fn empty<'scope, T: Timestamp, C: Container>(scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
585    source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
586        // drop capability, do nothing
587    })
588}