timely/dataflow/operators/vec/unordered_input.rs
1//! Create new `Streams` connected to external inputs.
2
3use crate::container::CapacityContainerBuilder;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::{ActivateCapability};
6use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore};
7use crate::dataflow::{StreamVec, Scope};
8
9/// Create a new `StreamVec` and `Handle` through which to supply input.
10pub trait UnorderedInput<'scope, T: Timestamp> {
11 /// Create a new capability-based `StreamVec` and `Handle` through which to supply input. This
12 /// input supports multiple open epochs (timestamps) at the same time.
13 ///
14 /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `StreamVec` can be used
15 /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
16 /// data into the timely dataflow computation.
17 ///
18 /// The `Capability` returned is for the default value of the timestamp type in use. The
19 /// capability can be dropped to inform the system that the input has advanced beyond the
20 /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
21 /// should be obtained first, via the `delayed` function for `Capability`.
22 ///
23 /// To communicate the end-of-input drop all available capabilities.
24 ///
25 /// # Examples
26 ///
27 /// ```
28 /// use std::sync::{Arc, Mutex};
29 ///
30 /// use timely::*;
31 /// use timely::dataflow::operators::*;
32 /// use timely::dataflow::operators::vec::UnorderedInput;
33 /// use timely::dataflow::operators::capture::Extract;
34 ///
35 /// // get send and recv endpoints, wrap send to share
36 /// let (send, recv) = ::std::sync::mpsc::channel();
37 /// let send = Arc::new(Mutex::new(send));
38 ///
39 /// timely::execute(Config::thread(), move |worker| {
40 ///
41 /// // this is only to validate the output.
42 /// let send = send.lock().unwrap().clone();
43 ///
44 /// // create and capture the unordered input.
45 /// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
46 /// let (input, stream) = scope.new_unordered_input();
47 /// stream.capture_into(send);
48 /// input
49 /// });
50 ///
51 /// // feed values 0..10 at times 0..10.
52 /// for round in 0..10 {
53 /// input.activate().session(&cap).give(round);
54 /// cap = cap.delayed(&(round + 1));
55 /// worker.step();
56 /// }
57 /// }).unwrap();
58 ///
59 /// let extract = recv.extract();
60 /// for i in 0..10 {
61 /// assert_eq!(extract[i], (i, vec![i]));
62 /// }
63 /// ```
64 fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>);
65}
66
67
68impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
69 fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>) {
70 UnorderedInputCore::new_unordered_input(self)
71 }
72}
73
74/// An unordered handle specialized to vectors.
75pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, CapacityContainerBuilder<Vec<D>>>;