Skip to main content

timely/dataflow/operators/vec/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use crate::container::CapacityContainerBuilder;
4use crate::dataflow::{StreamVec, ScopeParent, Scope};
5use crate::dataflow::operators::core::{Input as InputCore};
6
7// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
8// TODO : more like a harness, with direct access to its inputs.
9
10// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
11// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
12// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
13
14/// Create a new `StreamVec` and `Handle` through which to supply input.
15pub trait Input : Scope {
16    /// Create a new `StreamVec` and `Handle` through which to supply input.
17    ///
18    /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used
19    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
20    /// data into the timely dataflow computation.
21    ///
22    /// The `Handle` also provides a means to indicate
23    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
24    /// to issue progress notifications.
25    ///
26    /// # Examples
27    /// ```
28    /// use timely::*;
29    /// use timely::dataflow::operators::{Input, Inspect};
30    ///
31    /// // construct and execute a timely dataflow
32    /// timely::execute(Config::thread(), |worker| {
33    ///
34    ///     // add an input and base computation off of it
35    ///     let mut input = worker.dataflow(|scope| {
36    ///         let (input, stream) = scope.new_input::<Vec<_>>();
37    ///         stream.inspect(|x| println!("hello {:?}", x));
38    ///         input
39    ///     });
40    ///
41    ///     // introduce input, advance computation
42    ///     for round in 0..10 {
43    ///         input.send(round);
44    ///         input.advance_to(round + 1);
45    ///         worker.step();
46    ///     }
47    /// });
48    /// ```
49    fn new_input<D: Clone+'static>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, StreamVec<Self, D>);
50
51    /// Create a new stream from a supplied interactive handle.
52    ///
53    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
54    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
55    /// if it as attached to more than one stream.
56    ///
57    /// # Examples
58    /// ```
59    /// use timely::*;
60    /// use timely::dataflow::operators::{Input, Inspect};
61    /// use timely::dataflow::InputHandle;
62    ///
63    /// // construct and execute a timely dataflow
64    /// timely::execute(Config::thread(), |worker| {
65    ///
66    ///     // add an input and base computation off of it
67    ///     let mut input = InputHandle::new();
68    ///     worker.dataflow(|scope| {
69    ///         scope.input_from(&mut input)
70    ///              .container::<Vec<_>>()
71    ///              .inspect(|x| println!("hello {:?}", x));
72    ///     });
73    ///
74    ///     // introduce input, advance computation
75    ///     for round in 0..10 {
76    ///         input.send(round);
77    ///         input.advance_to(round + 1);
78    ///         worker.step();
79    ///     }
80    /// });
81    /// ```
82    fn input_from<D: Clone+'static>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> StreamVec<Self, D>;
83}
84
85use crate::order::TotalOrder;
86impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
87    fn new_input<D: Clone+'static>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, StreamVec<G, D>) {
88        InputCore::new_input(self)
89    }
90
91    fn input_from<D: Clone+'static>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> StreamVec<G, D> {
92        InputCore::input_from(self, handle)
93    }
94}
95
96/// A handle to an input `StreamVec`, used to introduce data to a timely dataflow computation.
97pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;