timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::progress::frontier::MutableAntichain;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, Stream};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, ContainerBuilder};
16use crate::container::CapacityContainerBuilder;
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1> {
20 /// Creates a new dataflow operator that partitions its input stream by a parallelization
21 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23 ///
24 /// # Examples
25 /// ```
26 /// use std::collections::HashMap;
27 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28 /// use timely::dataflow::operators::generic::Operator;
29 /// use timely::dataflow::channels::pact::Pipeline;
30 ///
31 /// timely::example(|scope| {
32 /// (0u64..10)
33 /// .to_stream(scope)
34 /// .container::<Vec<_>>()
35 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
36 /// let mut cap = Some(default_cap.delayed(&12));
37 /// let mut notificator = FrontierNotificator::default();
38 /// let mut stash = HashMap::new();
39 /// move |(input, frontier), output| {
40 /// if let Some(ref c) = cap.take() {
41 /// output.session(&c).give(12);
42 /// }
43 /// input.for_each_time(|time, data| {
44 /// stash.entry(time.time().clone())
45 /// .or_insert(Vec::new())
46 /// .extend(data.flat_map(|d| d.drain(..)));
47 /// });
48 /// notificator.for_each(&[frontier], |time, _not| {
49 /// if let Some(mut vec) = stash.remove(time.time()) {
50 /// output.session(&time).give_iterator(vec.drain(..));
51 /// }
52 /// });
53 /// }
54 /// })
55 /// .container::<Vec<_>>();
56 /// });
57 /// ```
58 fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
59 where
60 CB: ContainerBuilder,
61 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
62 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
63 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
64 P: ParallelizationContract<G::Timestamp, C1>;
65
66 /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
67 /// and repeatedly invokes the closure supplied as `logic`, which can read from the input stream, write to
68 /// the output stream, and inspect the frontier at the input.
69 ///
70 /// # Examples
71 /// ```
72 /// use std::collections::HashMap;
73 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
74 /// use timely::dataflow::operators::generic::Operator;
75 /// use timely::dataflow::channels::pact::Pipeline;
76 ///
77 /// timely::example(|scope| {
78 /// (0u64..10)
79 /// .to_stream(scope)
80 /// .container::<Vec<_>>()
81 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
82 /// input.for_each_time(|time, data| {
83 /// output.session(&time).give_containers(data);
84 /// notificator.notify_at(time.retain(output.output_index()));
85 /// });
86 /// notificator.for_each(|time, _cnt, _not| {
87 /// println!("notified at {:?}", time);
88 /// });
89 /// });
90 /// });
91 /// ```
92 fn unary_notify<CB: ContainerBuilder,
93 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
94 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
95 &mut Notificator<G::Timestamp>)+'static,
96 P: ParallelizationContract<G::Timestamp, C1>>
97 (self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> Stream<G, CB::Container>;
98
99 /// Creates a new dataflow operator that partitions its input stream by a parallelization
100 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
101 /// `logic` can read from the input stream, and write to the output stream.
102 ///
103 /// # Examples
104 /// ```
105 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
106 /// use timely::dataflow::operators::generic::operator::Operator;
107 /// use timely::dataflow::channels::pact::Pipeline;
108 /// use timely::dataflow::Scope;
109 ///
110 /// timely::example(|scope| {
111 /// (0u64..10)
112 /// .to_stream(scope)
113 /// .container::<Vec<_>>()
114 /// .unary(Pipeline, "example", |default_cap, _info| {
115 /// let mut cap = Some(default_cap.delayed(&12));
116 /// move |input, output| {
117 /// if let Some(ref c) = cap.take() {
118 /// output.session(&c).give(100);
119 /// }
120 /// input.for_each_time(|time, data| {
121 /// output.session(&time).give_containers(data);
122 /// });
123 /// }
124 /// });
125 /// });
126 /// ```
127 fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
128 where
129 CB: ContainerBuilder,
130 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
131 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
132 &mut OutputBuilderSession<G::Timestamp, CB>)+'static,
133 P: ParallelizationContract<G::Timestamp, C1>;
134
135 /// Creates a new dataflow operator that partitions its input streams by a parallelization
136 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
137 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
138 ///
139 /// # Examples
140 /// ```
141 /// use std::collections::HashMap;
142 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
143 /// use timely::dataflow::operators::generic::operator::Operator;
144 /// use timely::dataflow::channels::pact::Pipeline;
145 ///
146 /// timely::execute(timely::Config::thread(), |worker| {
147 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
148 /// let (in1_handle, in1) = scope.new_input::<Vec<_>>();
149 /// let (in2_handle, in2) = scope.new_input::<Vec<_>>();
150 /// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
151 /// let mut notificator = FrontierNotificator::default();
152 /// let mut stash = HashMap::new();
153 /// move |(input1, frontier1), (input2, frontier2), output| {
154 /// input1.for_each_time(|time, data| {
155 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
156 /// notificator.notify_at(time.retain(output.output_index()));
157 /// });
158 /// input2.for_each_time(|time, data| {
159 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
160 /// notificator.notify_at(time.retain(output.output_index()));
161 /// });
162 /// notificator.for_each(&[frontier1, frontier2], |time, _not| {
163 /// if let Some(mut vec) = stash.remove(time.time()) {
164 /// output.session(&time).give_iterator(vec.drain(..));
165 /// }
166 /// });
167 /// }
168 /// })
169 /// .container::<Vec<_>>()
170 /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
171 ///
172 /// (in1_handle, in2_handle)
173 /// });
174 ///
175 /// for i in 1..10 {
176 /// in1.send(i - 1);
177 /// in1.advance_to(i);
178 /// in2.send(i - 1);
179 /// in2.advance_to(i);
180 /// }
181 /// }).unwrap();
182 /// ```
183 fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
184 where
185 C2: Container,
186 CB: ContainerBuilder,
187 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
188 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
189 (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
190 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
191 P1: ParallelizationContract<G::Timestamp, C1>,
192 P2: ParallelizationContract<G::Timestamp, C2>;
193
194 /// Creates a new dataflow operator that partitions its input stream by a parallelization strategy `pact`,
195 /// and repeatedly invokes the closure supplied as `logic`, which can read from the input streams, write to
196 /// the output stream, and inspect the frontier at the inputs.
197 ///
198 /// # Examples
199 /// ```
200 /// use std::collections::HashMap;
201 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
202 /// use timely::dataflow::operators::generic::operator::Operator;
203 /// use timely::dataflow::channels::pact::Pipeline;
204 ///
205 /// timely::execute(timely::Config::thread(), |worker| {
206 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
207 /// let (in1_handle, in1) = scope.new_input::<Vec<_>>();
208 /// let (in2_handle, in2) = scope.new_input::<Vec<_>>();
209 ///
210 /// in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
211 /// input1.for_each_time(|time, data| {
212 /// output.session(&time).give_containers(data);
213 /// notificator.notify_at(time.retain(output.output_index()));
214 /// });
215 /// input2.for_each_time(|time, data| {
216 /// output.session(&time).give_containers(data);
217 /// notificator.notify_at(time.retain(output.output_index()));
218 /// });
219 /// notificator.for_each(|time, _cnt, _not| {
220 /// println!("notified at {:?}", time);
221 /// });
222 /// });
223 ///
224 /// (in1_handle, in2_handle)
225 /// });
226 ///
227 /// for i in 1..10 {
228 /// in1.send(i - 1);
229 /// in1.advance_to(i);
230 /// in2.send(i - 1);
231 /// in2.advance_to(i);
232 /// }
233 /// }).unwrap();
234 /// ```
235 fn binary_notify<C2: Container,
236 CB: ContainerBuilder,
237 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
238 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
239 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
240 &mut Notificator<G::Timestamp>)+'static,
241 P1: ParallelizationContract<G::Timestamp, C1>,
242 P2: ParallelizationContract<G::Timestamp, C2>>
243 (self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> Stream<G, CB::Container>;
244
245 /// Creates a new dataflow operator that partitions its input streams by a parallelization
246 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
247 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
248 ///
249 /// # Examples
250 /// ```
251 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
252 /// use timely::dataflow::operators::generic::operator::Operator;
253 /// use timely::dataflow::channels::pact::Pipeline;
254 /// use timely::dataflow::Scope;
255 ///
256 /// timely::example(|scope| {
257 /// let stream1 = (0u64..10).to_stream(scope).container::<Vec<_>>();
258 /// let stream2 = (0u64..10).to_stream(scope).container::<Vec<_>>();
259 /// stream1
260 /// .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
261 /// let mut cap = Some(default_cap.delayed(&12));
262 /// move |input1, input2, output| {
263 /// if let Some(ref c) = cap.take() {
264 /// output.session(&c).give(100);
265 /// }
266 /// input1.for_each_time(|time, data| output.session(&time).give_containers(data));
267 /// input2.for_each_time(|time, data| output.session(&time).give_containers(data));
268 /// }
269 /// }).inspect(|x| println!("{:?}", x));
270 /// });
271 /// ```
272 fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
273 where
274 C2: Container,
275 CB: ContainerBuilder,
276 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
277 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
278 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
279 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
280 P1: ParallelizationContract<G::Timestamp, C1>,
281 P2: ParallelizationContract<G::Timestamp, C2>;
282
283 /// Creates a new dataflow operator that partitions its input stream by a parallelization
284 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
285 /// and inspect the frontier at the input.
286 ///
287 /// # Examples
288 /// ```
289 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
290 /// use timely::dataflow::operators::generic::operator::Operator;
291 /// use timely::dataflow::channels::pact::Pipeline;
292 /// use timely::dataflow::Scope;
293 ///
294 /// timely::example(|scope| {
295 /// (0u64..10)
296 /// .to_stream(scope)
297 /// .container::<Vec<_>>()
298 /// .sink(Pipeline, "example", |(input, frontier)| {
299 /// input.for_each_time(|time, data| {
300 /// for datum in data.flatten() {
301 /// println!("{:?}:\t{:?}", time, datum);
302 /// }
303 /// });
304 /// });
305 /// });
306 /// ```
307 fn sink<L, P>(self, pact: P, name: &str, logic: L)
308 where
309 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
310 P: ParallelizationContract<G::Timestamp, C1>;
311}
312
313impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {
314
315 fn unary_frontier<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
316 where
317 CB: ContainerBuilder,
318 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
319 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>),
320 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
321 P: ParallelizationContract<G::Timestamp, C1> {
322
323 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
324 let operator_info = builder.operator_info();
325
326 let mut input = builder.new_input(self, pact);
327 let (output, stream) = builder.new_output();
328 let mut output = OutputBuilder::from(output);
329
330 builder.build(move |mut capabilities| {
331 // `capabilities` should be a single-element vector.
332 let capability = capabilities.pop().unwrap();
333 let mut logic = constructor(capability, operator_info);
334 move |frontiers| {
335 let mut output_handle = output.activate();
336 logic((&mut input, &frontiers[0]), &mut output_handle);
337 }
338 });
339
340 stream
341 }
342
343 fn unary_notify<CB: ContainerBuilder,
344 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
345 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
346 &mut Notificator<G::Timestamp>)+'static,
347 P: ParallelizationContract<G::Timestamp, C1>>
348 (self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> Stream<G, CB::Container> {
349
350 self.unary_frontier(pact, name, move |capability, _info| {
351 let mut notificator = FrontierNotificator::default();
352 for time in init {
353 notificator.notify_at(capability.delayed(&time));
354 }
355
356 move |(input, frontier), output| {
357 let frontiers = &[frontier];
358 let notificator = &mut Notificator::new(frontiers, &mut notificator);
359 logic(input, output, notificator);
360 }
361 })
362 }
363
364 fn unary<CB, B, L, P>(self, pact: P, name: &str, constructor: B) -> Stream<G, CB::Container>
365 where
366 CB: ContainerBuilder,
367 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
368 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
369 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
370 P: ParallelizationContract<G::Timestamp, C1> {
371
372 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
373 let operator_info = builder.operator_info();
374
375 let mut input = builder.new_input(self, pact);
376 let (output, stream) = builder.new_output();
377 let mut output = OutputBuilder::from(output);
378 builder.set_notify(false);
379
380 builder.build(move |mut capabilities| {
381 // `capabilities` should be a single-element vector.
382 let capability = capabilities.pop().unwrap();
383 let mut logic = constructor(capability, operator_info);
384 move |_frontiers| logic(&mut input, &mut output.activate())
385 });
386
387 stream
388 }
389
390 fn binary_frontier<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
391 where
392 C2: Container,
393 CB: ContainerBuilder,
394 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
395 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>),
396 (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>),
397 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
398 P1: ParallelizationContract<G::Timestamp, C1>,
399 P2: ParallelizationContract<G::Timestamp, C2> {
400
401 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
402 let operator_info = builder.operator_info();
403
404 let mut input1 = builder.new_input(self, pact1);
405 let mut input2 = builder.new_input(other, pact2);
406 let (output, stream) = builder.new_output();
407 let mut output = OutputBuilder::from(output);
408
409 builder.build(move |mut capabilities| {
410 // `capabilities` should be a single-element vector.
411 let capability = capabilities.pop().unwrap();
412 let mut logic = constructor(capability, operator_info);
413 move |frontiers| {
414 let mut output_handle = output.activate();
415 logic((&mut input1, &frontiers[0]), (&mut input2, &frontiers[1]), &mut output_handle);
416 }
417 });
418
419 stream
420 }
421
422 fn binary_notify<C2: Container,
423 CB: ContainerBuilder,
424 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
425 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
426 &mut OutputBuilderSession<'_, G::Timestamp, CB>,
427 &mut Notificator<G::Timestamp>)+'static,
428 P1: ParallelizationContract<G::Timestamp, C1>,
429 P2: ParallelizationContract<G::Timestamp, C2>>
430 (self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> Stream<G, CB::Container> {
431
432 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
433 let mut notificator = FrontierNotificator::default();
434 for time in init {
435 notificator.notify_at(capability.delayed(&time));
436 }
437
438 move |(input1, frontier1), (input2, frontier2), output| {
439 let frontiers = &[frontier1, frontier2];
440 let notificator = &mut Notificator::new(frontiers, &mut notificator);
441 logic(input1, input2, output, notificator);
442 }
443 })
444
445 }
446
447
448 fn binary<C2, CB, B, L, P1, P2>(self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream<G, CB::Container>
449 where
450 C2: Container,
451 CB: ContainerBuilder,
452 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
453 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
454 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
455 &mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static,
456 P1: ParallelizationContract<G::Timestamp, C1>,
457 P2: ParallelizationContract<G::Timestamp, C2> {
458
459 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
460 let operator_info = builder.operator_info();
461
462 let mut input1 = builder.new_input(self, pact1);
463 let mut input2 = builder.new_input(other, pact2);
464 let (output, stream) = builder.new_output();
465 let mut output = OutputBuilder::from(output);
466 builder.set_notify(false);
467
468 builder.build(move |mut capabilities| {
469 // `capabilities` should be a single-element vector.
470 let capability = capabilities.pop().unwrap();
471 let mut logic = constructor(capability, operator_info);
472 move |_frontiers| {
473 let mut output_handle = output.activate();
474 logic(&mut input1, &mut input2, &mut output_handle);
475 }
476 });
477
478 stream
479 }
480
481 fn sink<L, P>(self, pact: P, name: &str, mut logic: L)
482 where
483 L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>))+'static,
484 P: ParallelizationContract<G::Timestamp, C1> {
485
486 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
487 let mut input = builder.new_input(self, pact);
488
489 builder.build(|_capabilities| {
490 move |frontiers| {
491 logic((&mut input, &frontiers[0]));
492 }
493 });
494 }
495}
496
497/// Creates a new data stream source for a scope.
498///
499/// The source is defined by a name, and a constructor which takes a default capability to
500/// a method that can be repeatedly called on a output handle. The method is then repeatedly
501/// invoked, and is expected to eventually send data and downgrade and release capabilities.
502///
503/// # Examples
504/// ```
505/// use timely::scheduling::Scheduler;
506/// use timely::dataflow::operators::Inspect;
507/// use timely::dataflow::operators::generic::operator::source;
508/// use timely::dataflow::Scope;
509///
510/// timely::example(|scope| {
511///
512/// source(scope, "Source", |capability, info| {
513///
514/// let activator = scope.activator_for(info.address);
515///
516/// let mut cap = Some(capability);
517/// move |output| {
518///
519/// let mut done = false;
520/// if let Some(cap) = cap.as_mut() {
521/// // get some data and send it.
522/// let time = cap.time().clone();
523/// output.session(&cap)
524/// .give(*cap.time());
525///
526/// // downgrade capability.
527/// cap.downgrade(&(time + 1));
528/// done = time > 20;
529/// }
530///
531/// if done { cap = None; }
532/// else { activator.activate(); }
533/// }
534/// })
535/// .container::<Vec<_>>()
536/// .inspect(|x| println!("number: {:?}", x));
537/// });
538/// ```
539pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> Stream<G, CB::Container>
540where
541 CB: ContainerBuilder,
542 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
543 L: FnMut(&mut OutputBuilderSession<'_, G::Timestamp, CB>)+'static {
544
545 let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
546 let operator_info = builder.operator_info();
547
548 let (output, stream) = builder.new_output();
549 let mut output = OutputBuilder::from(output);
550 builder.set_notify(false);
551
552 builder.build(move |mut capabilities| {
553 // `capabilities` should be a single-element vector.
554 let capability = capabilities.pop().unwrap();
555 let mut logic = constructor(capability, operator_info);
556 move |_frontier| {
557 logic(&mut output.activate());
558 }
559 });
560
561 stream
562}
563
564/// Constructs an empty stream.
565///
566/// This method is useful in patterns where an input is required, but there is no
567/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
568/// which are just silly.
569///
570/// # Examples
571/// ```
572/// use timely::dataflow::operators::Inspect;
573/// use timely::dataflow::operators::generic::operator::empty;
574/// use timely::dataflow::Scope;
575///
576/// timely::example(|scope| {
577///
578///
579/// empty::<_, Vec<_>>(scope) // type required in this example
580/// .inspect(|()| panic!("never called"));
581///
582/// });
583/// ```
584pub fn empty<G: Scope, C: Container>(scope: &G) -> Stream<G, C> {
585 source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
586 // drop capability, do nothing
587 })
588}