Skip to main content

timely/dataflow/operators/vec/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use crate::container::CapacityContainerBuilder;
4use crate::progress::Timestamp;
5use crate::dataflow::{StreamVec, Scope};
6use crate::dataflow::operators::core::{Input as InputCore};
7
8// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
9// TODO : more like a harness, with direct access to its inputs.
10
11// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
12// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
13// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
14
15/// Create a new `StreamVec` and `Handle` through which to supply input.
16pub trait Input<'scope> {
17    /// The timestamp at which this input scope operates.
18    type Timestamp: Timestamp;
19
20    /// Create a new `StreamVec` and `Handle` through which to supply input.
21    ///
22    /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used
23    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
24    /// data into the timely dataflow computation.
25    ///
26    /// The `Handle` also provides a means to indicate
27    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
28    /// to issue progress notifications.
29    ///
30    /// # Examples
31    /// ```
32    /// use timely::*;
33    /// use timely::dataflow::operators::{Input, Inspect};
34    ///
35    /// // construct and execute a timely dataflow
36    /// timely::execute(Config::thread(), |worker| {
37    ///
38    ///     // add an input and base computation off of it
39    ///     let mut input = worker.dataflow(|scope| {
40    ///         let (input, stream) = scope.new_input::<Vec<_>>();
41    ///         stream.inspect(|x| println!("hello {:?}", x));
42    ///         input
43    ///     });
44    ///
45    ///     // introduce input, advance computation
46    ///     for round in 0..10 {
47    ///         input.send(round);
48    ///         input.advance_to(round + 1);
49    ///         worker.step();
50    ///     }
51    /// });
52    /// ```
53    fn new_input<D: Clone+'static>(&self) -> (Handle<Self::Timestamp, D>, StreamVec<'scope, Self::Timestamp, D>);
54
55    /// Create a new stream from a supplied interactive handle.
56    ///
57    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
58    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
59    /// if it as attached to more than one stream.
60    ///
61    /// # Examples
62    /// ```
63    /// use timely::*;
64    /// use timely::dataflow::operators::{Input, Inspect};
65    /// use timely::dataflow::InputHandle;
66    ///
67    /// // construct and execute a timely dataflow
68    /// timely::execute(Config::thread(), |worker| {
69    ///
70    ///     // add an input and base computation off of it
71    ///     let mut input = InputHandle::new();
72    ///     worker.dataflow(|scope| {
73    ///         scope.input_from(&mut input)
74    ///              .container::<Vec<_>>()
75    ///              .inspect(|x| println!("hello {:?}", x));
76    ///     });
77    ///
78    ///     // introduce input, advance computation
79    ///     for round in 0..10 {
80    ///         input.send(round);
81    ///         input.advance_to(round + 1);
82    ///         worker.step();
83    ///     }
84    /// });
85    /// ```
86    fn input_from<D: Clone+'static>(&self, handle: &mut Handle<Self::Timestamp, D>) -> StreamVec<'scope, Self::Timestamp, D>;
87}
88
89use crate::order::TotalOrder;
90impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
91    type Timestamp = T;
92    fn new_input<D: Clone+'static>(&self) -> (Handle<T, D>, StreamVec<'scope, T, D>) {
93        InputCore::new_input(self)
94    }
95
96    fn input_from<D: Clone+'static>(&self, handle: &mut Handle<T, D>) -> StreamVec<'scope, T, D> {
97        InputCore::input_from(self, handle)
98    }
99}
100
101/// A handle to an input `StreamVec`, used to introduce data to a timely dataflow computation.
102pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;