timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::Tee;
5use crate::dataflow::channels::pact::ParallelizationContract;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::{Container, Data};
16use crate::container::{ContainerBuilder, CapacityContainerBuilder};
17
18/// Methods to construct generic streaming and blocking operators.
19pub trait Operator<G: Scope, C1: Container> {
20 /// Creates a new dataflow operator that partitions its input stream by a parallelization
21 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
22 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
23 ///
24 /// # Examples
25 /// ```
26 /// use std::collections::HashMap;
27 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
28 /// use timely::dataflow::operators::generic::Operator;
29 /// use timely::dataflow::channels::pact::Pipeline;
30 ///
31 /// timely::example(|scope| {
32 /// (0u64..10).to_stream(scope)
33 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
34 /// let mut cap = Some(default_cap.delayed(&12));
35 /// let mut notificator = FrontierNotificator::default();
36 /// let mut stash = HashMap::new();
37 /// move |input, output| {
38 /// if let Some(ref c) = cap.take() {
39 /// output.session(&c).give(12);
40 /// }
41 /// while let Some((time, data)) = input.next() {
42 /// stash.entry(time.time().clone())
43 /// .or_insert(Vec::new())
44 /// .extend(data.drain(..));
45 /// }
46 /// notificator.for_each(&[input.frontier()], |time, _not| {
47 /// if let Some(mut vec) = stash.remove(time.time()) {
48 /// output.session(&time).give_iterator(vec.drain(..));
49 /// }
50 /// });
51 /// }
52 /// })
53 /// .container::<Vec<_>>();
54 /// });
55 /// ```
56 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
57 where
58 CB: ContainerBuilder,
59 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
60 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
61 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
62 P: ParallelizationContract<G::Timestamp, C1>;
63
64 /// Creates a new dataflow operator that partitions its input stream by a parallelization
65 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
66 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
67 ///
68 /// # Examples
69 /// ```
70 /// use std::collections::HashMap;
71 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
72 /// use timely::dataflow::operators::generic::Operator;
73 /// use timely::dataflow::channels::pact::Pipeline;
74 ///
75 /// timely::example(|scope| {
76 /// (0u64..10)
77 /// .to_stream(scope)
78 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
79 /// input.for_each(|time, data| {
80 /// output.session(&time).give_container(data);
81 /// notificator.notify_at(time.retain());
82 /// });
83 /// notificator.for_each(|time, _cnt, _not| {
84 /// println!("notified at {:?}", time);
85 /// });
86 /// });
87 /// });
88 /// ```
89 fn unary_notify<CB: ContainerBuilder,
90 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
91 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
92 &mut Notificator<G::Timestamp>)+'static,
93 P: ParallelizationContract<G::Timestamp, C1>>
94 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
95
96 /// Creates a new dataflow operator that partitions its input stream by a parallelization
97 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
98 /// `logic` can read from the input stream, and write to the output stream.
99 ///
100 /// # Examples
101 /// ```
102 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
103 /// use timely::dataflow::operators::generic::operator::Operator;
104 /// use timely::dataflow::channels::pact::Pipeline;
105 /// use timely::dataflow::Scope;
106 ///
107 /// timely::example(|scope| {
108 /// (0u64..10).to_stream(scope)
109 /// .unary(Pipeline, "example", |default_cap, _info| {
110 /// let mut cap = Some(default_cap.delayed(&12));
111 /// move |input, output| {
112 /// if let Some(ref c) = cap.take() {
113 /// output.session(&c).give(100);
114 /// }
115 /// while let Some((time, data)) = input.next() {
116 /// output.session(&time).give_container(data);
117 /// }
118 /// }
119 /// });
120 /// });
121 /// ```
122 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
123 where
124 CB: ContainerBuilder,
125 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
126 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
127 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
128 P: ParallelizationContract<G::Timestamp, C1>;
129
130 /// Creates a new dataflow operator that partitions its input streams by a parallelization
131 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
132 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
133 ///
134 /// # Examples
135 /// ```
136 /// use std::collections::HashMap;
137 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
138 /// use timely::dataflow::operators::generic::operator::Operator;
139 /// use timely::dataflow::channels::pact::Pipeline;
140 ///
141 /// timely::execute(timely::Config::thread(), |worker| {
142 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
143 /// let (in1_handle, in1) = scope.new_input();
144 /// let (in2_handle, in2) = scope.new_input();
145 /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
146 /// let mut notificator = FrontierNotificator::default();
147 /// let mut stash = HashMap::new();
148 /// move |input1, input2, output| {
149 /// while let Some((time, data)) = input1.next() {
150 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
151 /// notificator.notify_at(time.retain());
152 /// }
153 /// while let Some((time, data)) = input2.next() {
154 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
155 /// notificator.notify_at(time.retain());
156 /// }
157 /// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
158 /// if let Some(mut vec) = stash.remove(time.time()) {
159 /// output.session(&time).give_iterator(vec.drain(..));
160 /// }
161 /// });
162 /// }
163 /// })
164 /// .container::<Vec<_>>()
165 /// .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
166 ///
167 /// (in1_handle, in2_handle)
168 /// });
169 ///
170 /// for i in 1..10 {
171 /// in1.send(i - 1);
172 /// in1.advance_to(i);
173 /// in2.send(i - 1);
174 /// in2.advance_to(i);
175 /// }
176 /// }).unwrap();
177 /// ```
178 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
179 where
180 C2: Container + Data,
181 CB: ContainerBuilder,
182 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
183 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
184 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
185 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
186 P1: ParallelizationContract<G::Timestamp, C1>,
187 P2: ParallelizationContract<G::Timestamp, C2>;
188
189 /// Creates a new dataflow operator that partitions its input streams by a parallelization
190 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
191 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
192 ///
193 /// # Examples
194 /// ```
195 /// use std::collections::HashMap;
196 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
197 /// use timely::dataflow::operators::generic::operator::Operator;
198 /// use timely::dataflow::channels::pact::Pipeline;
199 ///
200 /// timely::execute(timely::Config::thread(), |worker| {
201 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
202 /// let (in1_handle, in1) = scope.new_input();
203 /// let (in2_handle, in2) = scope.new_input();
204 ///
205 /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
206 /// input1.for_each(|time, data| {
207 /// output.session(&time).give_container(data);
208 /// notificator.notify_at(time.retain());
209 /// });
210 /// input2.for_each(|time, data| {
211 /// output.session(&time).give_container(data);
212 /// notificator.notify_at(time.retain());
213 /// });
214 /// notificator.for_each(|time, _cnt, _not| {
215 /// println!("notified at {:?}", time);
216 /// });
217 /// });
218 ///
219 /// (in1_handle, in2_handle)
220 /// });
221 ///
222 /// for i in 1..10 {
223 /// in1.send(i - 1);
224 /// in1.advance_to(i);
225 /// in2.send(i - 1);
226 /// in2.advance_to(i);
227 /// }
228 /// }).unwrap();
229 /// ```
230 fn binary_notify<C2: Container + Data,
231 CB: ContainerBuilder,
232 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
233 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
234 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
235 &mut Notificator<G::Timestamp>)+'static,
236 P1: ParallelizationContract<G::Timestamp, C1>,
237 P2: ParallelizationContract<G::Timestamp, C2>>
238 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, CB::Container>;
239
240 /// Creates a new dataflow operator that partitions its input streams by a parallelization
241 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
242 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
243 ///
244 /// # Examples
245 /// ```
246 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
247 /// use timely::dataflow::operators::generic::operator::Operator;
248 /// use timely::dataflow::channels::pact::Pipeline;
249 /// use timely::dataflow::Scope;
250 ///
251 /// timely::example(|scope| {
252 /// let stream2 = (0u64..10).to_stream(scope);
253 /// (0u64..10).to_stream(scope)
254 /// .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
255 /// let mut cap = Some(default_cap.delayed(&12));
256 /// move |input1, input2, output| {
257 /// if let Some(ref c) = cap.take() {
258 /// output.session(&c).give(100);
259 /// }
260 /// while let Some((time, data)) = input1.next() {
261 /// output.session(&time).give_container(data);
262 /// }
263 /// while let Some((time, data)) = input2.next() {
264 /// output.session(&time).give_container(data);
265 /// }
266 /// }
267 /// }).inspect(|x| println!("{:?}", x));
268 /// });
269 /// ```
270 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
271 where
272 C2: Container + Data,
273 CB: ContainerBuilder,
274 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
275 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
276 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
277 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
278 P1: ParallelizationContract<G::Timestamp, C1>,
279 P2: ParallelizationContract<G::Timestamp, C2>;
280
281 /// Creates a new dataflow operator that partitions its input stream by a parallelization
282 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
283 /// and inspect the frontier at the input.
284 ///
285 /// # Examples
286 /// ```
287 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
288 /// use timely::dataflow::operators::generic::operator::Operator;
289 /// use timely::dataflow::channels::pact::Pipeline;
290 /// use timely::dataflow::Scope;
291 ///
292 /// timely::example(|scope| {
293 /// (0u64..10)
294 /// .to_stream(scope)
295 /// .sink(Pipeline, "example", |input| {
296 /// while let Some((time, data)) = input.next() {
297 /// for datum in data.iter() {
298 /// println!("{:?}:\t{:?}", time, datum);
299 /// }
300 /// }
301 /// });
302 /// });
303 /// ```
304 fn sink<L, P>(&self, pact: P, name: &str, logic: L)
305 where
306 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
307 P: ParallelizationContract<G::Timestamp, C1>;
308}
309
310impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1> {
311
312 fn unary_frontier<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
313 where
314 CB: ContainerBuilder,
315 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
316 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>,
317 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
318 P: ParallelizationContract<G::Timestamp, C1> {
319
320 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
321 let operator_info = builder.operator_info();
322
323 let mut input = builder.new_input(self, pact);
324 let (mut output, stream) = builder.new_output();
325
326 builder.build(move |mut capabilities| {
327 // `capabilities` should be a single-element vector.
328 let capability = capabilities.pop().unwrap();
329 let mut logic = constructor(capability, operator_info);
330 move |frontiers| {
331 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
332 let mut output_handle = output.activate();
333 logic(&mut input_handle, &mut output_handle);
334 }
335 });
336
337 stream
338 }
339
340 fn unary_notify<CB: ContainerBuilder,
341 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
342 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
343 &mut Notificator<G::Timestamp>)+'static,
344 P: ParallelizationContract<G::Timestamp, C1>>
345 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
346
347 self.unary_frontier(pact, name, move |capability, _info| {
348 let mut notificator = FrontierNotificator::default();
349 for time in init {
350 notificator.notify_at(capability.delayed(&time));
351 }
352
353 move |input, output| {
354 let frontier = &[input.frontier()];
355 let notificator = &mut Notificator::new(frontier, &mut notificator);
356 logic(input.handle, output, notificator);
357 }
358 })
359 }
360
361 fn unary<CB, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, CB::Container>
362 where
363 CB: ContainerBuilder,
364 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
365 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>,
366 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
367 P: ParallelizationContract<G::Timestamp, C1> {
368
369 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
370 let operator_info = builder.operator_info();
371
372 let mut input = builder.new_input(self, pact);
373 let (mut output, stream) = builder.new_output();
374 builder.set_notify(false);
375
376 builder.build(move |mut capabilities| {
377 // `capabilities` should be a single-element vector.
378 let capability = capabilities.pop().unwrap();
379 let mut logic = constructor(capability, operator_info);
380 move |_frontiers| {
381 let mut output_handle = output.activate();
382 logic(&mut input, &mut output_handle);
383 }
384 });
385
386 stream
387 }
388
389 fn binary_frontier<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
390 where
391 C2: Container + Data,
392 CB: ContainerBuilder,
393 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
394 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P1::Puller>,
395 &mut FrontieredInputHandleCore<G::Timestamp, C2, P2::Puller>,
396 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
397 P1: ParallelizationContract<G::Timestamp, C1>,
398 P2: ParallelizationContract<G::Timestamp, C2> {
399
400 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
401 let operator_info = builder.operator_info();
402
403 let mut input1 = builder.new_input(self, pact1);
404 let mut input2 = builder.new_input(other, pact2);
405 let (mut output, stream) = builder.new_output();
406
407 builder.build(move |mut capabilities| {
408 // `capabilities` should be a single-element vector.
409 let capability = capabilities.pop().unwrap();
410 let mut logic = constructor(capability, operator_info);
411 move |frontiers| {
412 let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
413 let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
414 let mut output_handle = output.activate();
415 logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
416 }
417 });
418
419 stream
420 }
421
422 fn binary_notify<C2: Container + Data,
423 CB: ContainerBuilder,
424 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
425 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
426 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
427 &mut Notificator<G::Timestamp>)+'static,
428 P1: ParallelizationContract<G::Timestamp, C1>,
429 P2: ParallelizationContract<G::Timestamp, C2>>
430 (&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, CB::Container> {
431
432 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
433 let mut notificator = FrontierNotificator::default();
434 for time in init {
435 notificator.notify_at(capability.delayed(&time));
436 }
437
438 move |input1, input2, output| {
439 let frontiers = &[input1.frontier(), input2.frontier()];
440 let notificator = &mut Notificator::new(frontiers, &mut notificator);
441 logic(input1.handle, input2.handle, output, notificator);
442 }
443 })
444
445 }
446
447
448 fn binary<C2, CB, B, L, P1, P2>(&self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, CB::Container>
449 where
450 C2: Container + Data,
451 CB: ContainerBuilder,
452 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
453 L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>,
454 &mut InputHandleCore<G::Timestamp, C2, P2::Puller>,
455 &mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static,
456 P1: ParallelizationContract<G::Timestamp, C1>,
457 P2: ParallelizationContract<G::Timestamp, C2> {
458
459 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
460 let operator_info = builder.operator_info();
461
462 let mut input1 = builder.new_input(self, pact1);
463 let mut input2 = builder.new_input(other, pact2);
464 let (mut output, stream) = builder.new_output();
465 builder.set_notify(false);
466
467 builder.build(move |mut capabilities| {
468 // `capabilities` should be a single-element vector.
469 let capability = capabilities.pop().unwrap();
470 let mut logic = constructor(capability, operator_info);
471 move |_frontiers| {
472 let mut output_handle = output.activate();
473 logic(&mut input1, &mut input2, &mut output_handle);
474 }
475 });
476
477 stream
478 }
479
480 fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
481 where
482 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, C1, P::Puller>)+'static,
483 P: ParallelizationContract<G::Timestamp, C1> {
484
485 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
486 let mut input = builder.new_input(self, pact);
487
488 builder.build(|_capabilities| {
489 move |frontiers| {
490 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
491 logic(&mut input_handle);
492 }
493 });
494 }
495}
496
497/// Creates a new data stream source for a scope.
498///
499/// The source is defined by a name, and a constructor which takes a default capability to
500/// a method that can be repeatedly called on a output handle. The method is then repeatedly
501/// invoked, and is expected to eventually send data and downgrade and release capabilities.
502///
503/// # Examples
504/// ```
505/// use timely::scheduling::Scheduler;
506/// use timely::dataflow::operators::Inspect;
507/// use timely::dataflow::operators::generic::operator::source;
508/// use timely::dataflow::Scope;
509///
510/// timely::example(|scope| {
511///
512/// source(scope, "Source", |capability, info| {
513///
514/// let activator = scope.activator_for(info.address);
515///
516/// let mut cap = Some(capability);
517/// move |output| {
518///
519/// let mut done = false;
520/// if let Some(cap) = cap.as_mut() {
521/// // get some data and send it.
522/// let time = cap.time().clone();
523/// output.session(&cap)
524/// .give(*cap.time());
525///
526/// // downgrade capability.
527/// cap.downgrade(&(time + 1));
528/// done = time > 20;
529/// }
530///
531/// if done { cap = None; }
532/// else { activator.activate(); }
533/// }
534/// })
535/// .container::<Vec<_>>()
536/// .inspect(|x| println!("number: {:?}", x));
537/// });
538/// ```
539pub fn source<G: Scope, CB, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, CB::Container>
540where
541 CB: ContainerBuilder,
542 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
543 L: FnMut(&mut OutputHandleCore<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>)+'static {
544
545 let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
546 let operator_info = builder.operator_info();
547
548 let (mut output, stream) = builder.new_output();
549 builder.set_notify(false);
550
551 builder.build(move |mut capabilities| {
552 // `capabilities` should be a single-element vector.
553 let capability = capabilities.pop().unwrap();
554 let mut logic = constructor(capability, operator_info);
555 move |_frontier| {
556 logic(&mut output.activate());
557 }
558 });
559
560 stream
561}
562
563/// Constructs an empty stream.
564///
565/// This method is useful in patterns where an input is required, but there is no
566/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
567/// which are just silly.
568///
569/// # Examples
570/// ```
571/// use timely::dataflow::operators::Inspect;
572/// use timely::dataflow::operators::generic::operator::empty;
573/// use timely::dataflow::Scope;
574///
575/// timely::example(|scope| {
576///
577///
578/// empty::<_, Vec<_>>(scope) // type required in this example
579/// .inspect(|()| panic!("never called"));
580///
581/// });
582/// ```
583pub fn empty<G: Scope, C: Container + Data>(scope: &G) -> StreamCore<G, C> {
584 source::<_, CapacityContainerBuilder<C>, _, _>(scope, "Empty", |_capability, _info| |_output| {
585 // drop capability, do nothing
586 })
587}