timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputSession, OutputBuilderSession, OutputBuilder};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, ContainerBuilder};
16use crate::container::CapacityContainerBuilder;
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1> {
20 /// Creates a new dataflow operator that partitions its input stream by a parallelization
21 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23 ///
24 /// # Examples
25 /// ```
26 /// use std::collections::HashMap;
27 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28 /// use timely::dataflow::operators::generic::Operator;
29 /// use timely::dataflow::channels::pact::Pipeline;
30 ///
31 /// timely::example(|scope| {
32 /// (0u64..10).to_stream(scope)
33 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
34 /// let mut cap = Some(default_cap.delayed(&12));
35 /// let mut notificator = FrontierNotificator::default();
36 /// let mut stash = HashMap::new();
37 /// move |(input, frontier), output| {
38 /// if let Some(ref c) = cap.take() {
39 /// output.session(&c).give(12);
40 /// }
41 /// input.for_each_time(|time, data| {
42 /// stash.entry(time.time().clone())
43 /// .or_insert(Vec::new())
44 /// .extend(data.flat_map(|d| d.drain(..)));
45 /// });
46 /// notificator.for_each(&[frontier], |time, _not| {
47 /// if let Some(mut vec) = stash.remove(time.time()) {
48 /// output.session(&time).give_iterator(vec.drain(..));
49 /// }
50 /// });
51 /// }
52 /// })
53 /// .container::<Vec<_>>();
54 /// });
55 /// ```
56 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57 where
58 CB: ContainerBuilder,
59 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60 L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
61 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
62 P: ParallelizationContract<G::Timestamp, C1>;
63
64 /// Creates a new dataflow operator that partitions its input stream by a parallelization
65 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67 ///
68 /// # Examples
69 /// ```
70 /// use std::collections::HashMap;
71 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72 /// use timely::dataflow::operators::generic::Operator;
73 /// use timely::dataflow::channels::pact::Pipeline;
74 ///
75 /// timely::example(|scope| {
76 /// (0u64..10)
77 /// .to_stream(scope)
78 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79 /// input.for_each_time(|time, data| {
80 /// output.session(&time).give_containers(data);
81 /// notificator.notify_at(time.retain());
82 /// });
83 /// notificator.for_each(|time, _cnt, _not| {
84 /// println!("notified at {:?}", time);
85 /// });
86 /// });
87 /// });
88 /// ```
89 fn unary_notify<CB: ContainerBuilder,
90 L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
91 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
92 &mut Notificator<G::Timestamp>)+'static,
93 P: ParallelizationContract<G::Timestamp, C1>>
94 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96 /// Creates a new dataflow operator that partitions its input stream by a parallelization
97 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98 /// `logic` can read from the input stream, and write to the output stream.
99 ///
100 /// # Examples
101 /// ```
102 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103 /// use timely::dataflow::operators::generic::operator::Operator;
104 /// use timely::dataflow::channels::pact::Pipeline;
105 /// use timely::dataflow::Scope;
106 ///
107 /// timely::example(|scope| {
108 /// (0u64..10).to_stream(scope)
109 /// .unary(Pipeline, "example", |default_cap, _info| {
110 /// let mut cap = Some(default_cap.delayed(&12));
111 /// move |input, output| {
112 /// if let Some(ref c) = cap.take() {
113 /// output.session(&c).give(100);
114 /// }
115 /// input.for_each_time(|time, data| {
116 /// output.session(&time).give_containers(data);
117 /// });
118 /// }
119 /// });
120 /// });
121 /// ```
122 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123 where
124 CB: ContainerBuilder,
125 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126 L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
127 &mut OutputBuilderSession<G::Timestamp, CB>)+'static,
128 P: ParallelizationContract<G::Timestamp, C1>;
129
130 /// Creates a new dataflow operator that partitions its input streams by a parallelization
131 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133 ///
134 /// # Examples
135 /// ```
136 /// use std::collections::HashMap;
137 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138 /// use timely::dataflow::operators::generic::operator::Operator;
139 /// use timely::dataflow::channels::pact::Pipeline;
140 ///
141 /// timely::execute(timely::Config::thread(), |worker| {
142 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143 /// let (in1_handle, in1) = scope.new_input();
144 /// let (in2_handle, in2) = scope.new_input();
145 /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146 /// let mut notificator = FrontierNotificator::default();
147 /// let mut stash = HashMap::new();
148 /// move |(input1, frontier1), (input2, frontier2), output| {
149 /// input1.for_each_time(|time, data| {
150 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
151 /// notificator.notify_at(time.retain());
152 /// });
153 /// input2.for_each_time(|time, data| {
154 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
155 /// notificator.notify_at(time.retain());
156 /// });
157 /// notificator.for_each(&[frontier1, frontier2], |time, _not| {
158 /// if let Some(mut vec) = stash.remove(time.time()) {
159 /// output.session(&time).give_iterator(vec.drain(..));
160 /// }
161 /// });
162 /// }
163 /// })
164 /// .container::<Vec<_>>()
165 /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166 ///
167 /// (in1_handle, in2_handle)
168 /// });
169 ///
170 /// for i in 1..10 {
171 /// in1.send(i - 1);
172 /// in1.advance_to(i);
173 /// in2.send(i - 1);
174 /// in2.advance_to(i);
175 /// }
176 /// }).unwrap();
177 /// ```
178 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179 where
180 C2: Container,
181 CB: ContainerBuilder,
182 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183 L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
184 (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
185 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
186 P1: ParallelizationContract<G::Timestamp, C1>,
187 P2: ParallelizationContract<G::Timestamp, C2>;
188
189 /// Creates a new dataflow operator that partitions its input streams by a parallelization
190 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192 ///
193 /// # Examples
194 /// ```
195 /// use std::collections::HashMap;
196 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197 /// use timely::dataflow::operators::generic::operator::Operator;
198 /// use timely::dataflow::channels::pact::Pipeline;
199 ///
200 /// timely::execute(timely::Config::thread(), |worker| {
201 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202 /// let (in1_handle, in1) = scope.new_input();
203 /// let (in2_handle, in2) = scope.new_input();
204 ///
205 /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206 /// input1.for_each_time(|time, data| {
207 /// output.session(&time).give_containers(data);
208 /// notificator.notify_at(time.retain());
209 /// });
210 /// input2.for_each_time(|time, data| {
211 /// output.session(&time).give_containers(data);
212 /// notificator.notify_at(time.retain());
213 /// });
214 /// notificator.for_each(|time, _cnt, _not| {
215 /// println!("notified at {:?}", time);
216 /// });
217 /// });
218 ///
219 /// (in1_handle, in2_handle)
220 /// });
221 ///
222 /// for i in 1..10 {
223 /// in1.send(i - 1);
224 /// in1.advance_to(i);
225 /// in2.send(i - 1);
226 /// in2.advance_to(i);
227 /// }
228 /// }).unwrap();
229 /// ```
230 fn binary_notify<C2: Container,
231 CB: ContainerBuilder,
232 L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
233 InputSession<'_, G::Timestamp, C2, P2::Puller>,
234 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
235 &mut Notificator<G::Timestamp>)+'static,
236 P1: ParallelizationContract<G::Timestamp, C1>,
237 P2: ParallelizationContract<G::Timestamp, C2>>
238 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240 /// Creates a new dataflow operator that partitions its input streams by a parallelization
241 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243 ///
244 /// # Examples
245 /// ```
246 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247 /// use timely::dataflow::operators::generic::operator::Operator;
248 /// use timely::dataflow::channels::pact::Pipeline;
249 /// use timely::dataflow::Scope;
250 ///
251 /// timely::example(|scope| {
252 /// let stream2 = (0u64..10).to_stream(scope);
253 /// (0u64..10).to_stream(scope)
254 /// .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255 /// let mut cap = Some(default_cap.delayed(&12));
256 /// move |input1, input2, output| {
257 /// if let Some(ref c) = cap.take() {
258 /// output.session(&c).give(100);
259 /// }
260 /// input1.for_each_time(|time, data| output.session(&time).give_containers(data));
261 /// input2.for_each_time(|time, data| output.session(&time).give_containers(data));
262 /// }
263 /// }).inspect(|x| println!("{:?}", x));
264 /// });
265 /// ```
266 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
267 where
268 C2: Container,
269 CB: ContainerBuilder,
270 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
271 L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
272 InputSession<'_, G::Timestamp, C2, P2::Puller>,
273 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
274 P1: ParallelizationContract<G::Timestamp, C1>,
275 P2: ParallelizationContract<G::Timestamp, C2>;
276
277 /// Creates a new dataflow operator that partitions its input stream by a parallelization
278 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
279 /// and inspect the frontier at the input.
280 ///
281 /// # Examples
282 /// ```
283 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
284 /// use timely::dataflow::operators::generic::operator::Operator;
285 /// use timely::dataflow::channels::pact::Pipeline;
286 /// use timely::dataflow::Scope;
287 ///
288 /// timely::example(|scope| {
289 /// (0u64..10)
290 /// .to_stream(scope)
291 /// .sink(Pipeline, "example", |(input, frontier)| {
292 /// input.for_each_time(|time, data| {
293 /// for datum in data.flatten() {
294 /// println!("{:?}:\t{:?}", time, datum);
295 /// }
296 /// });
297 /// });
298 /// });
299 /// ```
300 fn sink<L, P>(&self, pact: P, name: &str, logic: L)
301 where
302 L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
303 P: ParallelizationContract<G::Timestamp, C1>;
304}
305
306impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1> {
307
308 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
309 where
310 CB: ContainerBuilder,
311 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
312 L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
313 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
314 P: ParallelizationContract<G::Timestamp, C1> {
315
316 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
317 let operator_info = builder.operator_info();
318
319 let mut input = builder.new_input(self, pact);
320 let (output, stream) = builder.new_output();
321 let mut output = OutputBuilder::from(output);
322
323 builder.build(move |mut capabilities| {
324 // `capabilities` should be a single-element vector.
325 let capability = capabilities.pop().unwrap();
326 let mut logic = constructor(capability, operator_info);
327 move |frontiers| {
328 let mut output_handle = output.activate();
329 logic((input.activate(), &frontiers[0]), &mut output_handle);
330 }
331 });
332
333 stream
334 }
335
336 fn unary_notify<CB: ContainerBuilder,
337 L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
338 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
339 &mut Notificator<G::Timestamp>)+'static,
340 P: ParallelizationContract<G::Timestamp, C1>>
341 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
342
343 self.unary_frontier(pact, name, move |capability, _info| {
344 let mut notificator = FrontierNotificator::default();
345 for time in init {
346 notificator.notify_at(capability.delayed(&time));
347 }
348
349 move |(input, frontier), output| {
350 let frontiers = &[frontier];
351 let notificator = &mut Notificator::new(frontiers, &mut notificator);
352 logic(input, output, notificator);
353 }
354 })
355 }
356
357 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
358 where
359 CB: ContainerBuilder,
360 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
361 L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>,
362 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
363 P: ParallelizationContract<G::Timestamp, C1> {
364
365 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
366 let operator_info = builder.operator_info();
367
368 let mut input = builder.new_input(self, pact);
369 let (output, stream) = builder.new_output();
370 let mut output = OutputBuilder::from(output);
371 builder.set_notify(false);
372
373 builder.build(move |mut capabilities| {
374 // `capabilities` should be a single-element vector.
375 let capability = capabilities.pop().unwrap();
376 let mut logic = constructor(capability, operator_info);
377 move |_frontiers| logic(input.activate(), &mut output.activate())
378 });
379
380 stream
381 }
382
383 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
384 where
385 C2: Container,
386 CB: ContainerBuilder,
387 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
388 L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
389 (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
390 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
391 P1: ParallelizationContract<G::Timestamp, C1>,
392 P2: ParallelizationContract<G::Timestamp, C2> {
393
394 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
395 let operator_info = builder.operator_info();
396
397 let mut input1 = builder.new_input(self, pact1);
398 let mut input2 = builder.new_input(other, pact2);
399 let (output, stream) = builder.new_output();
400 let mut output = OutputBuilder::from(output);
401
402 builder.build(move |mut capabilities| {
403 // `capabilities` should be a single-element vector.
404 let capability = capabilities.pop().unwrap();
405 let mut logic = constructor(capability, operator_info);
406 move |frontiers| {
407 let mut output_handle = output.activate();
408 logic((input1.activate(), &frontiers[0]), (input2.activate(), &frontiers[1]), &mut output_handle);
409 }
410 });
411
412 stream
413 }
414
415 fn binary_notify<C2: Container,
416 CB: ContainerBuilder,
417 L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
418 InputSession<'_, G::Timestamp, C2, P2::Puller>,
419 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
420 &mut Notificator<G::Timestamp>)+'static,
421 P1: ParallelizationContract<G::Timestamp, C1>,
422 P2: ParallelizationContract<G::Timestamp, C2>>
423 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
424
425 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
426 let mut notificator = FrontierNotificator::default();
427 for time in init {
428 notificator.notify_at(capability.delayed(&time));
429 }
430
431 move |(input1, frontier1), (input2, frontier2), output| {
432 let frontiers = &[frontier1, frontier2];
433 let notificator = &mut Notificator::new(frontiers, &mut notificator);
434 logic(input1, input2, output, notificator);
435 }
436 })
437
438 }
439
440
441 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
442 where
443 C2: Container,
444 CB: ContainerBuilder,
445 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
446 L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>,
447 InputSession<'_, G::Timestamp, C2, P2::Puller>,
448 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
449 P1: ParallelizationContract<G::Timestamp, C1>,
450 P2: ParallelizationContract<G::Timestamp, C2> {
451
452 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
453 let operator_info = builder.operator_info();
454
455 let mut input1 = builder.new_input(self, pact1);
456 let mut input2 = builder.new_input(other, pact2);
457 let (output, stream) = builder.new_output();
458 let mut output = OutputBuilder::from(output);
459 builder.set_notify(false);
460
461 builder.build(move |mut capabilities| {
462 // `capabilities` should be a single-element vector.
463 let capability = capabilities.pop().unwrap();
464 let mut logic = constructor(capability, operator_info);
465 move |_frontiers| {
466 let mut output_handle = output.activate();
467 logic(input1.activate(), input2.activate(), &mut output_handle);
468 }
469 });
470
471 stream
472 }
473
474 fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
475 where
476 L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
477 P: ParallelizationContract<G::Timestamp, C1> {
478
479 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
480 let mut input = builder.new_input(self, pact);
481
482 builder.build(|_capabilities| {
483 move |frontiers| {
484 logic((input.activate(), &frontiers[0]));
485 }
486 });
487 }
488}
489
490/// Creates a new data stream source for a scope.
491///
492/// The source is defined by a name, and a constructor which takes a default capability to
493/// a method that can be repeatedly called on a output handle. The method is then repeatedly
494/// invoked, and is expected to eventually send data and downgrade and release capabilities.
495///
496/// # Examples
497/// ```
498/// use timely::scheduling::Scheduler;
499/// use timely::dataflow::operators::Inspect;
500/// use timely::dataflow::operators::generic::operator::source;
501/// use timely::dataflow::Scope;
502///
503/// timely::example(|scope| {
504///
505/// source(scope, "Source", |capability, info| {
506///
507/// let activator = scope.activator_for(info.address);
508///
509/// let mut cap = Some(capability);
510/// move |output| {
511///
512/// let mut done = false;
513/// if let Some(cap) = cap.as_mut() {
514/// // get some data and send it.
515/// let time = cap.time().clone();
516/// output.session(&cap)
517/// .give(*cap.time());
518///
519/// // downgrade capability.
520/// cap.downgrade(&(time + 1));
521/// done = time > 20;
522/// }
523///
524/// if done { cap = None; }
525/// else { activator.activate(); }
526/// }
527/// })
528/// .container::<Vec<_>>()
529/// .inspect(|x| println!("number: {:?}", x));
530/// });
531/// ```
532pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
533where
534 CB: ContainerBuilder,
535 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
536 L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static {
537
538 let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
539 let operator_info = builder.operator_info();
540
541 let (output, stream) = builder.new_output();
542 let mut output = OutputBuilder::from(output);
543 builder.set_notify(false);
544
545 builder.build(move |mut capabilities| {
546 // `capabilities` should be a single-element vector.
547 let capability = capabilities.pop().unwrap();
548 let mut logic = constructor(capability, operator_info);
549 move |_frontier| {
550 logic(&mut output.activate());
551 }
552 });
553
554 stream
555}
556
557/// Constructs an empty stream.
558///
559/// This method is useful in patterns where an input is required, but there is no
560/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
561/// which are just silly.
562///
563/// # Examples
564/// ```
565/// use timely::dataflow::operators::Inspect;
566/// use timely::dataflow::operators::generic::operator::empty;
567/// use timely::dataflow::Scope;
568///
569/// timely::example(|scope| {
570///
571///
572/// empty::<_, Vec<_>>(scope) // type required in this example
573/// .inspect(|()| panic!("never called"));
574///
575/// });
576/// ```
577pub fn empty<G: Scope, C: Container>(scope: &G) -> StreamCore<G, C> {
578 source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
579 // drop capability, do nothing
580 })
581}