timely/dataflow/operators/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use crate::Data;
4use crate::container::CapacityContainerBuilder;
5use crate::dataflow::{Stream, ScopeParent, Scope};
6use crate::dataflow::operators::core::{Input as InputCore};
7
8// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
9// TODO : more like a harness, with direct access to its inputs.
10
11// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
12// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
13// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
14
15/// Create a new `Stream` and `Handle` through which to supply input.
16pub trait Input : Scope {
17    /// Create a new `Stream` and `Handle` through which to supply input.
18    ///
19    /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used
20    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
21    /// data into the timely dataflow computation.
22    ///
23    /// The `Handle` also provides a means to indicate
24    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
25    /// to issue progress notifications.
26    ///
27    /// # Examples
28    /// ```
29    /// use timely::*;
30    /// use timely::dataflow::operators::{Input, Inspect};
31    ///
32    /// // construct and execute a timely dataflow
33    /// timely::execute(Config::thread(), |worker| {
34    ///
35    ///     // add an input and base computation off of it
36    ///     let mut input = worker.dataflow(|scope| {
37    ///         let (input, stream) = scope.new_input();
38    ///         stream.inspect(|x| println!("hello {:?}", x));
39    ///         input
40    ///     });
41    ///
42    ///     // introduce input, advance computation
43    ///     for round in 0..10 {
44    ///         input.send(round);
45    ///         input.advance_to(round + 1);
46    ///         worker.step();
47    ///     }
48    /// });
49    /// ```
50    fn new_input<D: Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, Stream<Self, D>);
51
52    /// Create a new stream from a supplied interactive handle.
53    ///
54    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
55    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
56    /// if it as attached to more than one stream.
57    ///
58    /// # Examples
59    /// ```
60    /// use timely::*;
61    /// use timely::dataflow::operators::{Input, Inspect};
62    /// use timely::dataflow::operators::input::Handle;
63    ///
64    /// // construct and execute a timely dataflow
65    /// timely::execute(Config::thread(), |worker| {
66    ///
67    ///     // add an input and base computation off of it
68    ///     let mut input = Handle::new();
69    ///     worker.dataflow(|scope| {
70    ///         scope.input_from(&mut input)
71    ///              .inspect(|x| println!("hello {:?}", x));
72    ///     });
73    ///
74    ///     // introduce input, advance computation
75    ///     for round in 0..10 {
76    ///         input.send(round);
77    ///         input.advance_to(round + 1);
78    ///         worker.step();
79    ///     }
80    /// });
81    /// ```
82    fn input_from<D: Data>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> Stream<Self, D>;
83}
84
85use crate::order::TotalOrder;
86impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
87    fn new_input<D: Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, Stream<G, D>) {
88        InputCore::new_input(self)
89    }
90
91    fn input_from<D: Data>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> Stream<G, D> {
92        InputCore::input_from(self, handle)
93    }
94}
95
96/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
97pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;