1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
//! Input sessions for simplified collection updates.
//!
//! Although users can directly manipulate timely dataflow streams as collection inputs,
//! the `InputSession` type can make this more efficient and less error-prone. Specifically,
//! the type batches up updates with their logical times and ships them with coarsened
//! timely dataflow capabilities, exposing more concurrency to the operator implementations
//! than are evident from the logical times, which appear to execute in sequence.

use timely::progress::Timestamp;
use timely::dataflow::operators::Input as TimelyInput;
use timely::dataflow::operators::input::Handle;
use timely::dataflow::scopes::ScopeParent;

use crate::Data;
use crate::difference::Semigroup;
use crate::collection::{Collection, AsCollection};

/// Create a new collection and input handle to control the collection.
pub trait Input : TimelyInput {
    /// Create a new collection and input handle to subsequently control the collection.
    ///
    /// # Examples
    ///
    /// ```
    /// use timely::Config;
    /// use differential_dataflow::input::Input;
    ///
    /// ::timely::execute(Config::thread(), |worker| {
    ///
    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
    ///         // create input handle and collection.
    ///         let (handle, data) = scope.new_collection();
    ///         let probe = data.map(|x| x * 2)
    ///                         .inspect(|x| println!("{:?}", x))
    ///                         .probe();
    ///         (handle, probe)
    ///     });
    ///
    ///     handle.insert(1);
    ///     handle.insert(5);
    ///
    /// }).unwrap();
    /// ```
    fn new_collection<D, R>(&mut self) -> (InputSession<<Self as ScopeParent>::Timestamp, D, R>, Collection<Self, D, R>)
    where D: Data, R: Semigroup+'static;
    /// Create a new collection and input handle from initial data.
    ///
    /// # Examples
    ///
    /// ```
    /// use timely::Config;
    /// use differential_dataflow::input::Input;
    ///
    /// ::timely::execute(Config::thread(), |worker| {
    ///
    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
    ///         // create input handle and collection.
    ///          let (handle, data) = scope.new_collection_from(0 .. 10);
    ///          let probe = data.map(|x| x * 2)
    ///                          .inspect(|x| println!("{:?}", x))
    ///                          .probe();
    ///          (handle, probe)
    ///     });
    ///
    ///     handle.insert(1);
    ///     handle.insert(5);
    ///
    /// }).unwrap();
    /// ```
    fn new_collection_from<I>(&mut self, data: I) -> (InputSession<<Self as ScopeParent>::Timestamp, I::Item, isize>, Collection<Self, I::Item, isize>)
    where I: IntoIterator+'static, I::Item: Data;
    /// Create a new collection and input handle from initial data.
    ///
    /// # Examples
    ///
    /// ```
    /// use timely::Config;
    /// use differential_dataflow::input::Input;
    ///
    /// ::timely::execute(Config::thread(), |worker| {
    ///
    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
    ///         // create input handle and collection.
    ///         let (handle, data) = scope.new_collection_from(0 .. 10);
    ///         let probe = data.map(|x| x * 2)
    ///                         .inspect(|x| println!("{:?}", x))
    ///                         .probe();
    ///         (handle, probe)
    ///     });
    ///
    ///     handle.insert(1);
    ///     handle.insert(5);
    ///
    /// }).unwrap();
    /// ```
    fn new_collection_from_raw<D, R, I>(&mut self, data: I) -> (InputSession<<Self as ScopeParent>::Timestamp, D, R>, Collection<Self, D, R>)
    where I: IntoIterator<Item=(D,<Self as ScopeParent>::Timestamp,R)>+'static, D: Data, R: Semigroup+'static;
}

use crate::lattice::Lattice;
impl<G: TimelyInput> Input for G where <G as ScopeParent>::Timestamp: Lattice {
    fn new_collection<D, R>(&mut self) -> (InputSession<<G as ScopeParent>::Timestamp, D, R>, Collection<G, D, R>)
    where D: Data, R: Semigroup+'static{
        let (handle, stream) = self.new_input();
        (InputSession::from(handle), stream.as_collection())
    }
    fn new_collection_from<I>(&mut self, data: I) -> (InputSession<<G as ScopeParent>::Timestamp, I::Item, isize>, Collection<G, I::Item, isize>)
    where I: IntoIterator+'static, I::Item: Data {
        self.new_collection_from_raw(data.into_iter().map(|d| (d, <G::Timestamp as timely::progress::Timestamp>::minimum(), 1)))
    }
    fn new_collection_from_raw<D,R,I>(&mut self, data: I) -> (InputSession<<G as ScopeParent>::Timestamp, D, R>, Collection<G, D, R>)
    where
        D: Data,
        R: Semigroup+'static,
        I: IntoIterator<Item=(D,<Self as ScopeParent>::Timestamp,R)>+'static,
    {
        use timely::dataflow::operators::ToStream;

        let (handle, stream) = self.new_input();
        let source = data.to_stream(self).as_collection();

        (InputSession::from(handle), stream.as_collection().concat(&source))
    }}

/// An input session wrapping a single timely dataflow capability.
///
/// Each timely dataflow message has a corresponding capability, which is a logical time in the
/// timely dataflow system. Differential dataflow updates can happen at a much higher rate than
/// timely dataflow's progress tracking infrastructure supports, because the logical times are
/// promoted to data and updates are batched together. The `InputSession` type does this batching.
///
/// # Examples
///
/// ```
/// use timely::Config;
/// use differential_dataflow::input::Input;
///
/// ::timely::execute(Config::thread(), |worker| {
///
///     let (mut handle, probe) = worker.dataflow(|scope| {
///         // create input handle and collection.
///         let (handle, data) = scope.new_collection_from(0 .. 10);
///         let probe = data.map(|x| x * 2)
///                         .inspect(|x| println!("{:?}", x))
///                         .probe();
///         (handle, probe)
///     });
///
///     handle.insert(3);
///     handle.advance_to(1);
///     handle.insert(5);
///     handle.advance_to(2);
///     handle.flush();
///
///     while probe.less_than(handle.time()) {
///         worker.step();
///     }
///
///     handle.remove(5);
///     handle.advance_to(3);
///     handle.flush();
///
///     while probe.less_than(handle.time()) {
///         worker.step();
///     }
///
/// }).unwrap();
/// ```
pub struct InputSession<T: Timestamp+Clone, D: Data, R: Semigroup+'static> {
    time: T,
    buffer: Vec<(D, T, R)>,
    handle: Handle<T,(D,T,R)>,
}

impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
    /// Adds an element to the collection.
    pub fn insert(&mut self, element: D) { self.update(element, 1); }
    /// Removes an element from the collection.
    pub fn remove(&mut self, element: D) { self.update(element,-1); }
}

// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i64> {
//     /// Adds an element to the collection.
//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
//     /// Removes an element from the collection.
//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
// }

// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i32> {
//     /// Adds an element to the collection.
//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
//     /// Removes an element from the collection.
//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
// }

impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> InputSession<T, D, R> {

    /// Introduces a handle as collection.
    pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> Collection<G, D, R>
    where
        G: ScopeParent<Timestamp=T>,
    {
        scope
            .input_from(&mut self.handle)
            .as_collection()
    }

    /// Allocates a new input handle.
    pub fn new() -> Self {
        let handle: Handle<T,_> = Handle::new();
        InputSession {
            time: handle.time().clone(),
            buffer: Vec::new(),
            handle,
        }
    }

    /// Creates a new session from a reference to an input handle.
    pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
        InputSession {
            time: handle.time().clone(),
            buffer: Vec::new(),
            handle,
        }
    }

    /// Adds to the weight of an element in the collection.
    pub fn update(&mut self, element: D, change: R) {
        if self.buffer.len() == self.buffer.capacity() {
            if !self.buffer.is_empty() {
                self.handle.send_batch(&mut self.buffer);
            }
            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
            self.buffer.reserve(1024);
        }
        self.buffer.push((element, self.time.clone(), change));
    }

    /// Adds to the weight of an element in the collection at a future time.
    pub fn update_at(&mut self, element: D, time: T, change: R) {
        assert!(self.time.less_equal(&time));
        if self.buffer.len() == self.buffer.capacity() {
            if !self.buffer.is_empty() {
                self.handle.send_batch(&mut self.buffer);
            }
            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
            self.buffer.reserve(1024);
        }
        self.buffer.push((element, time, change));
    }

    /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session.
    ///
    /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is
    /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is
    /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
    pub fn flush(&mut self) {
        self.handle.send_batch(&mut self.buffer);
        if self.handle.epoch().less_than(&self.time) {
            self.handle.advance_to(self.time.clone());
        }
    }

    /// Advances the logical time for future records.
    ///
    /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when
    /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
    /// method unless the session has just been flushed.
    pub fn advance_to(&mut self, time: T) {
        assert!(self.handle.epoch().less_equal(&time));
        assert!(&self.time.less_equal(&time));
        self.time = time;
    }

    /// Reveals the current time of the session.
    pub fn epoch(&self) -> &T { &self.time }
    /// Reveals the current time of the session.
    pub fn time(&self) -> &T { &self.time }

    /// Closes the input, flushing and sealing the wrapped timely input.
    pub fn close(self) { }
}

impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> Drop for InputSession<T, D, R> {
    fn drop(&mut self) {
        self.flush();
    }
}