1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
//! Support for forming collections from streams of upsert.
//!
//! Upserts are sequences of keyed optional values, and they define a collection of
//! the pairs of keys and each's most recent value, if it is present. Element in the
//! sequence effectively overwrites the previous value at the key, if present, and if
//! the value is not present it uninstalls the key.
//!
//! Upserts are non-trivial because they do not themselves describe the deletions that
//! the `Collection` update stream must present. However, if one creates an `Arrangement`
//! then this state provides sufficient information. The arrangement will continue to
//! exist even if dropped until the input or dataflow shuts down, as the upsert operator
//! itself needs access to its accumulated state.
//!
//! # Notes
//!
//! Upserts currently only work with totally ordered timestamps.
//!
//! In the case of ties in timestamps (concurrent updates to the same key) they choose
//! the *greatest* value according to `Option<Val>` ordering, which will prefer a value
//! to `None` and choose the greatest value (informally, as if applied in order of value).
//!
//! If the same value is repeated, no change will occur in the output. That may make this
//! operator effective at determining the difference between collections of keyed values,
//! but note that it will not notice the absence of keys in a collection.
//!
//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to
//! the input stream, mapping values that fail the predicate to `None` values, like so:
//!
//! ```ignore
//! // Dropped values should be retained as "uninstall" upserts.
//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))
//! ```
//!
//! # Example
//!
//! ```rust
//! // define a new timely dataflow computation.
//! timely::execute_from_args(std::env::args().skip(1), move |worker| {
//!
//!     type Key = String;
//!     type Val = String;
//!
//!     let mut input = timely::dataflow::InputHandle::new();
//!     let mut probe = timely::dataflow::ProbeHandle::new();
//!
//!     // Create a dataflow demonstrating upserts.
//!     //
//!     // Upserts are a sequence of records (key, option<val>) where the intended
//!     // value associated with a key is the most recent value, and if that is a
//!     // `none` then the key is removed (until a new value shows up).
//!     //
//!     // The challenge with upserts is that the value to *retract* isn't supplied
//!     // as part of the input stream. We have to determine what it should be!
//!
//!     worker.dataflow(|scope| {
//!
//!         use timely::dataflow::operators::Input;
//!         use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
//!         use differential_dataflow::operators::arrange::upsert;
//!
//!         let stream = scope.input_from(&mut input);
//!         let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
//!
//!         arranged
//!             .as_collection(|k,v| (k.clone(), v.clone()))
//!             .inspect(|x| println!("Observed: {:?}", x))
//!             .probe_with(&mut probe);
//!     });
//!
//!     // Introduce the key, with a specific value.
//!     input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
//!     input.advance_to(4);
//!     while probe.less_than(input.time()) { worker.step(); }
//!
//!     // Change the value to a different value.
//!     input.send(("frank".to_string(), Some("zappa".to_string()), 4));
//!     input.advance_to(5);
//!     while probe.less_than(input.time()) { worker.step(); }
//!
//!     // Remove the key and its value.
//!     input.send(("frank".to_string(), None, 5));
//!     input.advance_to(9);
//!     while probe.less_than(input.time()) { worker.step(); }
//!
//!     // Introduce a new totally different value
//!     input.send(("frank".to_string(), Some("oz".to_string()), 9));
//!     input.advance_to(10);
//!     while probe.less_than(input.time()) { worker.step(); }
//!
//!     // Repeat the value, which should produce no output.
//!     input.send(("frank".to_string(), Some("oz".to_string()), 11));
//!     input.advance_to(12);
//!     while probe.less_than(input.time()) { worker.step(); }
//!     // Remove the key and value.
//!     input.send(("frank".to_string(), None, 15));
//!     input.close();
//!
//! }).unwrap();
//! ```

use std::collections::{BinaryHeap, HashMap};

use timely::order::{PartialOrder, TotalOrder};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;
use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::dataflow::operators::Capability;

use crate::operators::arrange::arrangement::Arranged;
use crate::trace::{Builder, Description};
use crate::trace::{self, Trace, TraceReader, Batch, Cursor};
use crate::trace::cursor::IntoOwned;
use crate::{ExchangeData, Hashable};

use super::TraceAgent;

/// Arrange data from a stream of keyed upserts.
///
/// The input should be a stream of timestamped pairs of Key and Option<Val>.
/// The contents of the collection are defined key-by-key, where each optional
/// value in sequence either replaces or removes the existing value, should it
/// exist.
///
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, K, V, Bu, Tr>(
    stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
    name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
    G: Scope<Timestamp=Tr::Time>,
    Tr: Trace+TraceReader<Diff=isize>+'static,
    for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
    K: ExchangeData+Hashable+std::hash::Hash,
    V: ExchangeData,
    for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
    Tr::Time: TotalOrder+ExchangeData,
    Tr::Batch: Batch,
    Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
    let mut reader: Option<TraceAgent<Tr>> = None;

    // fabricate a data-parallel operator using the `unary_notify` pattern.
    let stream = {

        let reader = &mut reader;

        let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());

        stream.unary_frontier(exchange, name, move |_capability, info| {

            // Acquire a logger for arrange events.
            let logger = {
                let scope = stream.scope();
                let register = scope.log_register();
                register.get::<crate::logging::DifferentialEvent>("differential/arrange")
            };

            // Tracks the lower envelope of times in `priority_queue`.
            let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
            // Form the trace we will both use internally and publish.
            let activator = Some(stream.scope().activator_for(info.address.clone()));
            let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

            if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
                empty_trace.set_exert_logic(exert_logic);
            }

            let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
            // Capture the reader outside the builder scope.
            *reader = Some(reader_local.clone());

            // Tracks the input frontier, used to populate the lower bound of new batches.
            let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

            // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
            let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
            let mut updates = Vec::new();

            move |input, output| {

                // Stash capabilities and associated data (ordered by time).
                input.for_each(|cap, data| {
                    capabilities.insert(cap.retain());
                    for (key, val, time) in data.drain(..) {
                        priority_queue.push(std::cmp::Reverse((time, key, val)))
                    }
                });

                // Assert that the frontier never regresses.
                assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));

                // Test to see if strict progress has occurred, which happens whenever the new
                // frontier isn't equal to the previous. It is only in this case that we have any
                // data processing to do.
                if prev_frontier.borrow() != input.frontier().frontier() {

                    // If there is at least one capability not in advance of the input frontier ...
                    if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {

                        let mut upper = Antichain::new();   // re-used allocation for sealing batches.

                        // For each capability not in advance of the input frontier ...
                        for (index, capability) in capabilities.elements().iter().enumerate() {

                            if !input.frontier().less_equal(capability.time()) {

                                // Assemble the upper bound on times we can commit with this capabilities.
                                // We must respect the input frontier, and *subsequent* capabilities, as
                                // we are pretending to retire the capability changes one by one.
                                upper.clear();
                                for time in input.frontier().frontier().iter() {
                                    upper.insert(time.clone());
                                }
                                for other_capability in &capabilities.elements()[(index + 1) .. ] {
                                    upper.insert(other_capability.time().clone());
                                }

                                // Extract upserts available to process as of this `upper`.
                                let mut to_process = HashMap::new();
                                while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
                                    let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
                                    to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
                                }
                                // Reduce the allocation behind the priority queue if it is presently excessive.
                                // A factor of four is used to avoid repeated doubling and shrinking.
                                // TODO: if the queue were a sequence of geometrically sized allocations, we could
                                // shed the additional capacity without copying any data.
                                if priority_queue.capacity() > 4 * priority_queue.len() {
                                    priority_queue.shrink_to_fit();
                                }

                                // Put (key, list) into key order, to match cursor enumeration.
                                let mut to_process = to_process.into_iter().collect::<Vec<_>>();
                                to_process.sort();

                                // Prepare a cursor to the existing arrangement, and a batch builder for
                                // new stuff that we add.
                                let (mut trace_cursor, trace_storage) = reader_local.cursor();
                                let mut builder = Bu::new();
                                for (key, mut list) in to_process.drain(..) {

                                    // The prior value associated with the key.
                                    let mut prev_value: Option<V> = None;

                                    // Attempt to find the key in the trace.
                                    trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
                                    if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
                                        // Determine the prior value associated with the key.
                                        while let Some(val) = trace_cursor.get_val(&trace_storage) {
                                            let mut count = 0;
                                            trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
                                            assert!(count == 0 || count == 1);
                                            if count == 1 {
                                                assert!(prev_value.is_none());
                                                prev_value = Some(val.into_owned());
                                            }
                                            trace_cursor.step_val(&trace_storage);
                                        }
                                        trace_cursor.step_key(&trace_storage);
                                    }

                                    // Sort the list of upserts to `key` by their time, suppress multiple updates.
                                    list.sort();
                                    list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
                                    for (time, std::cmp::Reverse(next)) in list {
                                        if prev_value != next {
                                            if let Some(prev) = prev_value {
                                                updates.push(((key.clone(), prev), time.clone(), -1));
                                            }
                                            if let Some(next) = next.as_ref() {
                                                updates.push(((key.clone(), next.clone()), time.clone(), 1));
                                            }
                                            prev_value = next;
                                        }
                                    }
                                    // Must insert updates in (key, val, time) order.
                                    updates.sort();
                                    builder.push(&mut updates);
                                }
                                let description = Description::new(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
                                let batch = builder.done(description);
                                prev_frontier.clone_from(&upper);

                                // Communicate `batch` to the arrangement and the stream.
                                writer.insert(batch.clone(), Some(capability.time().clone()));
                                output.session(&capabilities.elements()[index]).give(batch);
                            }
                        }

                        // Having extracted and sent batches between each capability and the input frontier,
                        // we should downgrade all capabilities to match the batcher's lower update frontier.
                        // This may involve discarding capabilities, which is fine as any new updates arrive
                        // in messages with new capabilities.

                        let mut new_capabilities = Antichain::new();
                        if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
                            if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
                                new_capabilities.insert(capability.delayed(time));
                            }
                            else {
                                panic!("failed to find capability");
                            }
                        }

                        capabilities = new_capabilities;
                    }
                    else {
                        // Announce progress updates, even without data.
                        writer.seal(input.frontier().frontier().to_owned());
                    }

                    // Update our view of the input frontier.
                    prev_frontier.clear();
                    prev_frontier.extend(input.frontier().frontier().iter().cloned());

                    // Downgrade capabilities for `reader_local`.
                    reader_local.set_logical_compaction(prev_frontier.borrow());
                    reader_local.set_physical_compaction(prev_frontier.borrow());
                }

                writer.exert();
            }
        })
    };

    Arranged { stream, trace: reader.unwrap() }

}