timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::progress::Timestamp;
6use crate::dataflow::channels::pact::ParallelizationContract;
7
8use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder};
9use crate::dataflow::operators::capability::Capability;
10
11use crate::dataflow::{Scope, Stream};
12
13use super::builder_rc::OperatorBuilder;
14use crate::dataflow::operators::generic::OperatorInfo;
15use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
16use crate::{Container, ContainerBuilder};
17use crate::container::CapacityContainerBuilder;
18
19/// Methods to construct generic streaming and blocking operators.
20pub trait Operator<'scope, T: Timestamp, C1> {
21 /// Creates a new dataflow operator that partitions its input stream by a parallelization
22 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
23 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
24 ///
25 /// # Examples
26 /// ```
27 /// use std::collections::HashMap;
28 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
29 /// use timely::dataflow::operators::generic::Operator;
30 /// use timely::dataflow::channels::pact::Pipeline;
31 ///
32 /// timely::example(|scope| {
33 /// (0u64..10)
34 /// .to_stream(scope)
35 /// .container::<Vec<_>>()
36 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
37 /// let mut cap = Some(default_cap.delayed(&12));
38 /// let mut notificator = FrontierNotificator::default();
39 /// let mut stash = HashMap::new();
40 /// move |(input, frontier), output| {
41 /// if let Some(ref c) = cap.take() {
42 /// output.session(&c).give(12);
43 /// }
44 /// input.for_each_time(|time, data| {
45 /// stash.entry(time.time().clone())
46 /// .or_insert(Vec::new())
47 /// .extend(data.flat_map(|d| d.drain(..)));
48 /// });
49 /// notificator.for_each(&[frontier], |time, _not| {
50 /// if let Some(mut vec) = stash.remove(time.time()) {
51 /// output.session(&time).give_iterator(vec.drain(..));
52 /// }
53 /// });
54 /// }
55 /// })
56 /// .container::<Vec<_>>();
57 /// });
58 /// ```
59 fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
60 where
61 CB: ContainerBuilder,
62 B: FnOnce(Capability<T>, OperatorInfo) -> L,
63 L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>),
64 &mut OutputBuilderSession<'_, T, CB>)+'static,
65 P: ParallelizationContract<T, C1>;
66
67 /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
68 /// and repeatedly invokes the closure supplied as `logic`, which can read from the input stream, write to
69 /// the output stream, and inspect the frontier at the input.
70 ///
71 /// # Examples
72 /// ```
73 /// use std::collections::HashMap;
74 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
75 /// use timely::dataflow::operators::generic::Operator;
76 /// use timely::dataflow::channels::pact::Pipeline;
77 ///
78 /// timely::example(|scope| {
79 /// (0u64..10)
80 /// .to_stream(scope)
81 /// .container::<Vec<_>>()
82 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
83 /// input.for_each_time(|time, data| {
84 /// output.session(&time).give_containers(data);
85 /// notificator.notify_at(time.retain(output.output_index()));
86 /// });
87 /// notificator.for_each(|time, _cnt, _not| {
88 /// println!("notified at {:?}", time);
89 /// });
90 /// });
91 /// });
92 /// ```
93 fn unary_notify<CB: ContainerBuilder,
94 L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
95 &mut OutputBuilderSession<'_, T, CB>,
96 &mut Notificator<T>)+'static,
97 P: ParallelizationContract<T, C1>>
98 (self, pact: P, name: &str, init: impl IntoIterator<Item=T>, logic: L) -> Stream<'scope, T, CB::Container>;
99
100 /// Creates a new dataflow operator that partitions its input stream by a parallelization
101 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
102 /// `logic` can read from the input stream, and write to the output stream.
103 ///
104 /// # Examples
105 /// ```
106 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
107 /// use timely::dataflow::operators::generic::operator::Operator;
108 /// use timely::dataflow::channels::pact::Pipeline;
109 /// use timely::dataflow::Scope;
110 ///
111 /// timely::example(|scope| {
112 /// (0u64..10)
113 /// .to_stream(scope)
114 /// .container::<Vec<_>>()
115 /// .unary(Pipeline, "example", |default_cap, _info| {
116 /// let mut cap = Some(default_cap.delayed(&12));
117 /// move |input, output| {
118 /// if let Some(ref c) = cap.take() {
119 /// output.session(&c).give(100);
120 /// }
121 /// input.for_each_time(|time, data| {
122 /// output.session(&time).give_containers(data);
123 /// });
124 /// }
125 /// });
126 /// });
127 /// ```
128 fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
129 where
130 CB: ContainerBuilder,
131 B: FnOnce(Capability<T>, OperatorInfo) -> L,
132 L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
133 &mut OutputBuilderSession<T, CB>)+'static,
134 P: ParallelizationContract<T, C1>;
135
136 /// Creates a new dataflow operator that partitions its input streams by a parallelization
137 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
138 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
139 ///
140 /// # Examples
141 /// ```
142 /// use std::collections::HashMap;
143 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
144 /// use timely::dataflow::operators::generic::operator::Operator;
145 /// use timely::dataflow::channels::pact::Pipeline;
146 ///
147 /// timely::execute(timely::Config::thread(), |worker| {
148 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
149 /// let (in1_handle, in1) = scope.new_input::<Vec<_>>();
150 /// let (in2_handle, in2) = scope.new_input::<Vec<_>>();
151 /// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
152 /// let mut notificator = FrontierNotificator::default();
153 /// let mut stash = HashMap::new();
154 /// move |(input1, frontier1), (input2, frontier2), output| {
155 /// input1.for_each_time(|time, data| {
156 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
157 /// notificator.notify_at(time.retain(output.output_index()));
158 /// });
159 /// input2.for_each_time(|time, data| {
160 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
161 /// notificator.notify_at(time.retain(output.output_index()));
162 /// });
163 /// notificator.for_each(&[frontier1, frontier2], |time, _not| {
164 /// if let Some(mut vec) = stash.remove(time.time()) {
165 /// output.session(&time).give_iterator(vec.drain(..));
166 /// }
167 /// });
168 /// }
169 /// })
170 /// .container::<Vec<_>>()
171 /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
172 ///
173 /// (in1_handle, in2_handle)
174 /// });
175 ///
176 /// for i in 1..10 {
177 /// in1.send(i - 1);
178 /// in1.advance_to(i);
179 /// in2.send(i - 1);
180 /// in2.advance_to(i);
181 /// }
182 /// }).unwrap();
183 /// ```
184 fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
185 where
186 C2: Container,
187 CB: ContainerBuilder,
188 B: FnOnce(Capability<T>, OperatorInfo) -> L,
189 L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>),
190 (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>),
191 &mut OutputBuilderSession<'_, T, CB>)+'static,
192 P1: ParallelizationContract<T, C1>,
193 P2: ParallelizationContract<T, C2>;
194
195 /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
196 /// and repeatedly invokes the closure supplied as `logic`, which can read from the input streams, write to
197 /// the output stream, and inspect the frontier at the inputs.
198 ///
199 /// # Examples
200 /// ```
201 /// use std::collections::HashMap;
202 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
203 /// use timely::dataflow::operators::generic::operator::Operator;
204 /// use timely::dataflow::channels::pact::Pipeline;
205 ///
206 /// timely::execute(timely::Config::thread(), |worker| {
207 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
208 /// let (in1_handle, in1) = scope.new_input::<Vec<_>>();
209 /// let (in2_handle, in2) = scope.new_input::<Vec<_>>();
210 ///
211 /// in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
212 /// input1.for_each_time(|time, data| {
213 /// output.session(&time).give_containers(data);
214 /// notificator.notify_at(time.retain(output.output_index()));
215 /// });
216 /// input2.for_each_time(|time, data| {
217 /// output.session(&time).give_containers(data);
218 /// notificator.notify_at(time.retain(output.output_index()));
219 /// });
220 /// notificator.for_each(|time, _cnt, _not| {
221 /// println!("notified at {:?}", time);
222 /// });
223 /// });
224 ///
225 /// (in1_handle, in2_handle)
226 /// });
227 ///
228 /// for i in 1..10 {
229 /// in1.send(i - 1);
230 /// in1.advance_to(i);
231 /// in2.send(i - 1);
232 /// in2.advance_to(i);
233 /// }
234 /// }).unwrap();
235 /// ```
236 fn binary_notify<C2: Container,
237 CB: ContainerBuilder,
238 L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
239 &mut InputHandleCore<T, C2, P2::Puller>,
240 &mut OutputBuilderSession<'_, T, CB>,
241 &mut Notificator<T>)+'static,
242 P1: ParallelizationContract<T, C1>,
243 P2: ParallelizationContract<T, C2>>
244 (self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=T>, logic: L) -> Stream<'scope, T, CB::Container>;
245
246 /// Creates a new dataflow operator that partitions its input streams by a parallelization
247 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
248 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
249 ///
250 /// # Examples
251 /// ```
252 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
253 /// use timely::dataflow::operators::generic::operator::Operator;
254 /// use timely::dataflow::channels::pact::Pipeline;
255 /// use timely::dataflow::Scope;
256 ///
257 /// timely::example(|scope| {
258 /// let stream1 = (0u64..10).to_stream(scope).container::<Vec<_>>();
259 /// let stream2 = (0u64..10).to_stream(scope).container::<Vec<_>>();
260 /// stream1
261 /// .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
262 /// let mut cap = Some(default_cap.delayed(&12));
263 /// move |input1, input2, output| {
264 /// if let Some(ref c) = cap.take() {
265 /// output.session(&c).give(100);
266 /// }
267 /// input1.for_each_time(|time, data| output.session(&time).give_containers(data));
268 /// input2.for_each_time(|time, data| output.session(&time).give_containers(data));
269 /// }
270 /// }).inspect(|x| println!("{:?}", x));
271 /// });
272 /// ```
273 fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
274 where
275 C2: Container,
276 CB: ContainerBuilder,
277 B: FnOnce(Capability<T>, OperatorInfo) -> L,
278 L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
279 &mut InputHandleCore<T, C2, P2::Puller>,
280 &mut OutputBuilderSession<'_, T, CB>)+'static,
281 P1: ParallelizationContract<T, C1>,
282 P2: ParallelizationContract<T, C2>;
283
284 /// Creates a new dataflow operator that partitions its input stream by a parallelization
285 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
286 /// and inspect the frontier at the input.
287 ///
288 /// # Examples
289 /// ```
290 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
291 /// use timely::dataflow::operators::generic::operator::Operator;
292 /// use timely::dataflow::channels::pact::Pipeline;
293 /// use timely::dataflow::Scope;
294 ///
295 /// timely::example(|scope| {
296 /// (0u64..10)
297 /// .to_stream(scope)
298 /// .container::<Vec<_>>()
299 /// .sink(Pipeline, "example", |(input, frontier)| {
300 /// input.for_each_time(|time, data| {
301 /// for datum in data.flatten() {
302 /// println!("{:?}:\t{:?}", time, datum);
303 /// }
304 /// });
305 /// });
306 /// });
307 /// ```
308 fn sink<L, P>(self, pact: P, name: &str, logic: L)
309 where
310 L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>))+'static,
311 P: ParallelizationContract<T, C1>;
312}
313
314impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'scope, T, C1> {
315
316 fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
317 where
318 CB: ContainerBuilder,
319 B: FnOnce(Capability<T>, OperatorInfo) -> L,
320 L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>),
321 &mut OutputBuilderSession<'_, T, CB>)+'static,
322 P: ParallelizationContract<T, C1> {
323
324 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
325 let operator_info = builder.operator_info();
326
327 let mut input = builder.new_input(self, pact);
328 let (output, stream) = builder.new_output();
329 let mut output = OutputBuilder::from(output);
330
331 builder.build(move |mut capabilities| {
332 // `capabilities` should be a single-element vector.
333 let capability = capabilities.pop().unwrap();
334 let mut logic = constructor(capability, operator_info);
335 move |frontiers| {
336 let mut output_handle = output.activate();
337 logic((&mut input, &frontiers[0]), &mut output_handle);
338 }
339 });
340
341 stream
342 }
343
344 fn unary_notify<CB: ContainerBuilder,
345 L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
346 &mut OutputBuilderSession<'_, T, CB>,
347 &mut Notificator<T>)+'static,
348 P: ParallelizationContract<T, C1>>
349 (self, pact: P, name: &str, init: impl IntoIterator<Item=T>, mut logic: L) -> Stream<'scope, T, CB::Container> {
350
351 self.unary_frontier(pact, name, move |capability, _info| {
352 let mut notificator = FrontierNotificator::default();
353 for time in init {
354 notificator.notify_at(capability.delayed(&time));
355 }
356
357 move |(input, frontier), output| {
358 let frontiers = &[frontier];
359 let notificator = &mut Notificator::new(frontiers, &mut notificator);
360 logic(input, output, notificator);
361 }
362 })
363 }
364
365 fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
366 where
367 CB: ContainerBuilder,
368 B: FnOnce(Capability<T>, OperatorInfo) -> L,
369 L: FnMut(&mut InputHandleCore<T, C1, P::Puller>,
370 &mut OutputBuilderSession<'_, T, CB>)+'static,
371 P: ParallelizationContract<T, C1> {
372
373 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
374 let operator_info = builder.operator_info();
375
376 let mut input = builder.new_input(self, pact);
377 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
378 let (output, stream) = builder.new_output();
379 let mut output = OutputBuilder::from(output);
380
381 builder.build(move |mut capabilities| {
382 // `capabilities` should be a single-element vector.
383 let capability = capabilities.pop().unwrap();
384 let mut logic = constructor(capability, operator_info);
385 move |_frontiers| logic(&mut input, &mut output.activate())
386 });
387
388 stream
389 }
390
391 fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
392 where
393 C2: Container,
394 CB: ContainerBuilder,
395 B: FnOnce(Capability<T>, OperatorInfo) -> L,
396 L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>),
397 (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>),
398 &mut OutputBuilderSession<'_, T, CB>)+'static,
399 P1: ParallelizationContract<T, C1>,
400 P2: ParallelizationContract<T, C2> {
401
402 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
403 let operator_info = builder.operator_info();
404
405 let mut input1 = builder.new_input(self, pact1);
406 let mut input2 = builder.new_input(other, pact2);
407 let (output, stream) = builder.new_output();
408 let mut output = OutputBuilder::from(output);
409
410 builder.build(move |mut capabilities| {
411 // `capabilities` should be a single-element vector.
412 let capability = capabilities.pop().unwrap();
413 let mut logic = constructor(capability, operator_info);
414 move |frontiers| {
415 let mut output_handle = output.activate();
416 logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle);
417 }
418 });
419
420 stream
421 }
422
423 fn binary_notify<C2: Container,
424 CB: ContainerBuilder,
425 L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
426 &mut InputHandleCore<T, C2, P2::Puller>,
427 &mut OutputBuilderSession<'_, T, CB>,
428 &mut Notificator<T>)+'static,
429 P1: ParallelizationContract<T, C1>,
430 P2: ParallelizationContract<T, C2>>
431 (self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=T>, mut logic: L) -> Stream<'scope, T, CB::Container> {
432
433 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
434 let mut notificator = FrontierNotificator::default();
435 for time in init {
436 notificator.notify_at(capability.delayed(&time));
437 }
438
439 move |(input1, frontier1), (input2, frontier2), output| {
440 let frontiers = &[frontier1, frontier2];
441 let notificator = &mut Notificator::new(frontiers, &mut notificator);
442 logic(input1, input2, output, notificator);
443 }
444 })
445
446 }
447
448
449 fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
450 where
451 C2: Container,
452 CB: ContainerBuilder,
453 B: FnOnce(Capability<T>, OperatorInfo) -> L,
454 L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>,
455 &mut InputHandleCore<T, C2, P2::Puller>,
456 &mut OutputBuilderSession<'_, T, CB>)+'static,
457 P1: ParallelizationContract<T, C1>,
458 P2: ParallelizationContract<T, C2> {
459
460 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
461 let operator_info = builder.operator_info();
462
463 let mut input1 = builder.new_input(self, pact1);
464 let mut input2 = builder.new_input(other, pact2);
465 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
466 builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never);
467 let (output, stream) = builder.new_output();
468 let mut output = OutputBuilder::from(output);
469
470 builder.build(move |mut capabilities| {
471 // `capabilities` should be a single-element vector.
472 let capability = capabilities.pop().unwrap();
473 let mut logic = constructor(capability, operator_info);
474 move |_frontiers| {
475 let mut output_handle = output.activate();
476 logic(&mut input1, &mut input2, &mut output_handle);
477 }
478 });
479
480 stream
481 }
482
483 fn sink<L, P>(self, pact: P, name: &str, mut logic: L)
484 where
485 L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>))+'static,
486 P: ParallelizationContract<T, C1> {
487
488 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
489 let mut input = builder.new_input(self, pact);
490
491 builder.build(|_capabilities| {
492 move |frontiers| {
493 logic((&mut input, &frontiers[0]));
494 }
495 });
496 }
497}
498
499/// Creates a new data stream source for a scope.
500///
501/// The source is defined by a name, and a constructor which takes a default capability to
502/// a method that can be repeatedly called on a output handle. The method is then repeatedly
503/// invoked, and is expected to eventually send data and downgrade and release capabilities.
504///
505/// # Examples
506/// ```
507/// use timely::dataflow::operators::Inspect;
508/// use timely::dataflow::operators::generic::operator::source;
509/// use timely::dataflow::Scope;
510///
511/// timely::example(|scope| {
512///
513/// source(scope, "Source", |capability, info| {
514///
515/// let activator = scope.activator_for(info.address);
516///
517/// let mut cap = Some(capability);
518/// move |output| {
519///
520/// let mut done = false;
521/// if let Some(cap) = cap.as_mut() {
522/// // get some data and send it.
523/// let time = cap.time().clone();
524/// output.session(&cap)
525/// .give(*cap.time());
526///
527/// // downgrade capability.
528/// cap.downgrade(&(time + 1));
529/// done = time > 20;
530/// }
531///
532/// if done { cap = None; }
533/// else { activator.activate(); }
534/// }
535/// })
536/// .container::<Vec<_>>()
537/// .inspect(|x| println!("number: {:?}", x));
538/// });
539/// ```
540pub fn source<'scope, T: Timestamp, CB, B, L>(scope: Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container>
541where
542 CB: ContainerBuilder,
543 B: FnOnce(Capability<T>, OperatorInfo) -> L,
544 L: FnMut(&mut OutputBuilderSession<'_, T, CB>)+'static {
545
546 let mut builder = OperatorBuilder::new(name.to_owned(), scope);
547 let operator_info = builder.operator_info();
548
549 let (output, stream) = builder.new_output();
550 let mut output = OutputBuilder::from(output);
551
552 builder.build(move |mut capabilities| {
553 // `capabilities` should be a single-element vector.
554 let capability = capabilities.pop().unwrap();
555 let mut logic = constructor(capability, operator_info);
556 move |_frontier| {
557 logic(&mut output.activate());
558 }
559 });
560
561 stream
562}
563
564/// Constructs an empty stream.
565///
566/// This method is useful in patterns where an input is required, but there is no
567/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
568/// which are just silly.
569///
570/// # Examples
571/// ```
572/// use timely::dataflow::operators::Inspect;
573/// use timely::dataflow::operators::generic::operator::empty;
574/// use timely::dataflow::Scope;
575///
576/// timely::example(|scope| {
577///
578///
579/// empty::<_, Vec<_>>(scope) // type required in this example
580/// .inspect(|()| panic!("never called"));
581///
582/// });
583/// ```
584pub fn empty<'scope, T: Timestamp, C: Container>(scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
585 source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
586 // drop capability, do nothing
587 })
588}