timely/dataflow/operators/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use crate::Data;
4use crate::container::CapacityContainerBuilder;
5use crate::dataflow::{Stream, ScopeParent, Scope};
6use crate::dataflow::operators::core::{Input as InputCore};
7
8// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
9// TODO : more like a harness, with direct access to its inputs.
10
11// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
12// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
13// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
14
15/// Create a new `Stream` and `Handle` through which to supply input.
16pub trait Input : Scope {
17 /// Create a new `Stream` and `Handle` through which to supply input.
18 ///
19 /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used
20 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
21 /// data into the timely dataflow computation.
22 ///
23 /// The `Handle` also provides a means to indicate
24 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
25 /// to issue progress notifications.
26 ///
27 /// # Examples
28 /// ```
29 /// use timely::*;
30 /// use timely::dataflow::operators::{Input, Inspect};
31 ///
32 /// // construct and execute a timely dataflow
33 /// timely::execute(Config::thread(), |worker| {
34 ///
35 /// // add an input and base computation off of it
36 /// let mut input = worker.dataflow(|scope| {
37 /// let (input, stream) = scope.new_input();
38 /// stream.inspect(|x| println!("hello {:?}", x));
39 /// input
40 /// });
41 ///
42 /// // introduce input, advance computation
43 /// for round in 0..10 {
44 /// input.send(round);
45 /// input.advance_to(round + 1);
46 /// worker.step();
47 /// }
48 /// });
49 /// ```
50 fn new_input<D: Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, Stream<Self, D>);
51
52 /// Create a new stream from a supplied interactive handle.
53 ///
54 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
55 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
56 /// if it as attached to more than one stream.
57 ///
58 /// # Examples
59 /// ```
60 /// use timely::*;
61 /// use timely::dataflow::operators::{Input, Inspect};
62 /// use timely::dataflow::operators::input::Handle;
63 ///
64 /// // construct and execute a timely dataflow
65 /// timely::execute(Config::thread(), |worker| {
66 ///
67 /// // add an input and base computation off of it
68 /// let mut input = Handle::new();
69 /// worker.dataflow(|scope| {
70 /// scope.input_from(&mut input)
71 /// .inspect(|x| println!("hello {:?}", x));
72 /// });
73 ///
74 /// // introduce input, advance computation
75 /// for round in 0..10 {
76 /// input.send(round);
77 /// input.advance_to(round + 1);
78 /// worker.step();
79 /// }
80 /// });
81 /// ```
82 fn input_from<D: Data>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> Stream<Self, D>;
83}
84
85use crate::order::TotalOrder;
86impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
87 fn new_input<D: Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, Stream<G, D>) {
88 InputCore::new_input(self)
89 }
90
91 fn input_from<D: Data>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> Stream<G, D> {
92 InputCore::input_from(self, handle)
93 }
94}
95
96/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
97pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;