timely/dataflow/operators/vec/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use crate::container::CapacityContainerBuilder;
4use crate::dataflow::{StreamVec, ScopeParent, Scope};
5use crate::dataflow::operators::core::{Input as InputCore};
6
7// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
8// TODO : more like a harness, with direct access to its inputs.
9
10// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
11// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
12// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
13
14/// Create a new `StreamVec` and `Handle` through which to supply input.
15pub trait Input : Scope {
16 /// Create a new `StreamVec` and `Handle` through which to supply input.
17 ///
18 /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used
19 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
20 /// data into the timely dataflow computation.
21 ///
22 /// The `Handle` also provides a means to indicate
23 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
24 /// to issue progress notifications.
25 ///
26 /// # Examples
27 /// ```
28 /// use timely::*;
29 /// use timely::dataflow::operators::{Input, Inspect};
30 ///
31 /// // construct and execute a timely dataflow
32 /// timely::execute(Config::thread(), |worker| {
33 ///
34 /// // add an input and base computation off of it
35 /// let mut input = worker.dataflow(|scope| {
36 /// let (input, stream) = scope.new_input::<Vec<_>>();
37 /// stream.inspect(|x| println!("hello {:?}", x));
38 /// input
39 /// });
40 ///
41 /// // introduce input, advance computation
42 /// for round in 0..10 {
43 /// input.send(round);
44 /// input.advance_to(round + 1);
45 /// worker.step();
46 /// }
47 /// });
48 /// ```
49 fn new_input<D: Clone+'static>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, StreamVec<Self, D>);
50
51 /// Create a new stream from a supplied interactive handle.
52 ///
53 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
54 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
55 /// if it as attached to more than one stream.
56 ///
57 /// # Examples
58 /// ```
59 /// use timely::*;
60 /// use timely::dataflow::operators::{Input, Inspect};
61 /// use timely::dataflow::InputHandle;
62 ///
63 /// // construct and execute a timely dataflow
64 /// timely::execute(Config::thread(), |worker| {
65 ///
66 /// // add an input and base computation off of it
67 /// let mut input = InputHandle::new();
68 /// worker.dataflow(|scope| {
69 /// scope.input_from(&mut input)
70 /// .container::<Vec<_>>()
71 /// .inspect(|x| println!("hello {:?}", x));
72 /// });
73 ///
74 /// // introduce input, advance computation
75 /// for round in 0..10 {
76 /// input.send(round);
77 /// input.advance_to(round + 1);
78 /// worker.step();
79 /// }
80 /// });
81 /// ```
82 fn input_from<D: Clone+'static>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> StreamVec<Self, D>;
83}
84
85use crate::order::TotalOrder;
86impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
87 fn new_input<D: Clone+'static>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, StreamVec<G, D>) {
88 InputCore::new_input(self)
89 }
90
91 fn input_from<D: Clone+'static>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> StreamVec<G, D> {
92 InputCore::input_from(self, handle)
93 }
94}
95
96/// A handle to an input `StreamVec`, used to introduce data to a timely dataflow computation.
97pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;