timely/dataflow/operators/unordered_input.rs
1//! Create new `Streams` connected to external inputs.
2
3use crate::Data;
4
5use crate::container::CapacityContainerBuilder;
6use crate::dataflow::operators::{ActivateCapability};
7use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore};
8use crate::dataflow::{Stream, Scope};
9
10/// Create a new `Stream` and `Handle` through which to supply input.
11pub trait UnorderedInput<G: Scope> {
12 /// Create a new capability-based `Stream` and `Handle` through which to supply input. This
13 /// input supports multiple open epochs (timestamps) at the same time.
14 ///
15 /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used
16 /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
17 /// data into the timely dataflow computation.
18 ///
19 /// The `Capability` returned is for the default value of the timestamp type in use. The
20 /// capability can be dropped to inform the system that the input has advanced beyond the
21 /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
22 /// should be obtained first, via the `delayed` function for `Capability`.
23 ///
24 /// To communicate the end-of-input drop all available capabilities.
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use std::sync::{Arc, Mutex};
30 ///
31 /// use timely::*;
32 /// use timely::dataflow::operators::*;
33 /// use timely::dataflow::operators::capture::Extract;
34 /// use timely::dataflow::Stream;
35 ///
36 /// // get send and recv endpoints, wrap send to share
37 /// let (send, recv) = ::std::sync::mpsc::channel();
38 /// let send = Arc::new(Mutex::new(send));
39 ///
40 /// timely::execute(Config::thread(), move |worker| {
41 ///
42 /// // this is only to validate the output.
43 /// let send = send.lock().unwrap().clone();
44 ///
45 /// // create and capture the unordered input.
46 /// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
47 /// let (input, stream) = scope.new_unordered_input();
48 /// stream.capture_into(send);
49 /// input
50 /// });
51 ///
52 /// // feed values 0..10 at times 0..10.
53 /// for round in 0..10 {
54 /// input.session(cap.clone()).give(round);
55 /// cap = cap.delayed(&(round + 1));
56 /// worker.step();
57 /// }
58 /// }).unwrap();
59 ///
60 /// let extract = recv.extract();
61 /// for i in 0..10 {
62 /// assert_eq!(extract[i], (i, vec![i]));
63 /// }
64 /// ```
65 fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
66}
67
68
69impl<G: Scope> UnorderedInput<G> for G {
70 fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>) {
71 UnorderedInputCore::new_unordered_input(self)
72 }
73}
74
75/// An unordered handle specialized to vectors.
76pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, CapacityContainerBuilder<Vec<D>>>;