timely/dataflow/operators/vec/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use crate::container::CapacityContainerBuilder;
4use crate::progress::Timestamp;
5use crate::dataflow::{StreamVec, Scope};
6use crate::dataflow::operators::core::{Input as InputCore};
7
8// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
9// TODO : more like a harness, with direct access to its inputs.
10
11// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
12// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
13// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
14
15/// Create a new `StreamVec` and `Handle` through which to supply input.
16pub trait Input<'scope> {
17 /// The timestamp at which this input scope operates.
18 type Timestamp: Timestamp;
19
20 /// Create a new `StreamVec` and `Handle` through which to supply input.
21 ///
22 /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used
23 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
24 /// data into the timely dataflow computation.
25 ///
26 /// The `Handle` also provides a means to indicate
27 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
28 /// to issue progress notifications.
29 ///
30 /// # Examples
31 /// ```
32 /// use timely::*;
33 /// use timely::dataflow::operators::{Input, Inspect};
34 ///
35 /// // construct and execute a timely dataflow
36 /// timely::execute(Config::thread(), |worker| {
37 ///
38 /// // add an input and base computation off of it
39 /// let mut input = worker.dataflow(|scope| {
40 /// let (input, stream) = scope.new_input::<Vec<_>>();
41 /// stream.inspect(|x| println!("hello {:?}", x));
42 /// input
43 /// });
44 ///
45 /// // introduce input, advance computation
46 /// for round in 0..10 {
47 /// input.send(round);
48 /// input.advance_to(round + 1);
49 /// worker.step();
50 /// }
51 /// });
52 /// ```
53 fn new_input<D: Clone+'static>(&self) -> (Handle<Self::Timestamp, D>, StreamVec<'scope, Self::Timestamp, D>);
54
55 /// Create a new stream from a supplied interactive handle.
56 ///
57 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
58 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
59 /// if it as attached to more than one stream.
60 ///
61 /// # Examples
62 /// ```
63 /// use timely::*;
64 /// use timely::dataflow::operators::{Input, Inspect};
65 /// use timely::dataflow::InputHandle;
66 ///
67 /// // construct and execute a timely dataflow
68 /// timely::execute(Config::thread(), |worker| {
69 ///
70 /// // add an input and base computation off of it
71 /// let mut input = InputHandle::new();
72 /// worker.dataflow(|scope| {
73 /// scope.input_from(&mut input)
74 /// .container::<Vec<_>>()
75 /// .inspect(|x| println!("hello {:?}", x));
76 /// });
77 ///
78 /// // introduce input, advance computation
79 /// for round in 0..10 {
80 /// input.send(round);
81 /// input.advance_to(round + 1);
82 /// worker.step();
83 /// }
84 /// });
85 /// ```
86 fn input_from<D: Clone+'static>(&self, handle: &mut Handle<Self::Timestamp, D>) -> StreamVec<'scope, Self::Timestamp, D>;
87}
88
89use crate::order::TotalOrder;
90impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
91 type Timestamp = T;
92 fn new_input<D: Clone+'static>(&self) -> (Handle<T, D>, StreamVec<'scope, T, D>) {
93 InputCore::new_input(self)
94 }
95
96 fn input_from<D: Clone+'static>(&self, handle: &mut Handle<T, D>) -> StreamVec<'scope, T, D> {
97 InputCore::input_from(self, handle)
98 }
99}
100
101/// A handle to an input `StreamVec`, used to introduce data to a timely dataflow computation.
102pub type Handle<T, D> = crate::dataflow::operators::core::input::Handle<T, CapacityContainerBuilder<Vec<D>>>;