differential_dataflow/operators/arrange/upsert.rs
1//! Support for forming collections from streams of upsert.
2//!
3//! Upserts are sequences of keyed optional values, and they define a collection of
4//! the pairs of keys and each's most recent value, if it is present. Element in the
5//! sequence effectively overwrites the previous value at the key, if present, and if
6//! the value is not present it uninstalls the key.
7//!
8//! Upserts are non-trivial because they do not themselves describe the deletions that
9//! the `Collection` update stream must present. However, if one creates an `Arrangement`
10//! then this state provides sufficient information. The arrangement will continue to
11//! exist even if dropped until the input or dataflow shuts down, as the upsert operator
12//! itself needs access to its accumulated state.
13//!
14//! # Notes
15//!
16//! Upserts currently only work with totally ordered timestamps.
17//!
18//! In the case of ties in timestamps (concurrent updates to the same key) they choose
19//! the *greatest* value according to `Option<Val>` ordering, which will prefer a value
20//! to `None` and choose the greatest value (informally, as if applied in order of value).
21//!
22//! If the same value is repeated, no change will occur in the output. That may make this
23//! operator effective at determining the difference between collections of keyed values,
24//! but note that it will not notice the absence of keys in a collection.
25//!
26//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to
27//! the input stream, mapping values that fail the predicate to `None` values, like so:
28//!
29//! ```ignore
30//! // Dropped values should be retained as "uninstall" upserts.
31//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))
32//! ```
33//!
34//! # Example
35//!
36//! ```rust
37//! // define a new timely dataflow computation.
38//! timely::execute_from_args(std::env::args().skip(1), move |worker| {
39//!
40//! type Key = String;
41//! type Val = String;
42//!
43//! let mut input = timely::dataflow::InputHandle::new();
44//! let mut probe = timely::dataflow::ProbeHandle::new();
45//!
46//! // Create a dataflow demonstrating upserts.
47//! //
48//! // Upserts are a sequence of records (key, option<val>) where the intended
49//! // value associated with a key is the most recent value, and if that is a
50//! // `none` then the key is removed (until a new value shows up).
51//! //
52//! // The challenge with upserts is that the value to *retract* isn't supplied
53//! // as part of the input stream. We have to determine what it should be!
54//!
55//! worker.dataflow(|scope| {
56//!
57//! use timely::dataflow::operators::Input;
58//! use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
59//! use differential_dataflow::operators::arrange::upsert;
60//!
61//! let stream = scope.input_from(&mut input);
62//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(stream, &"test");
63//!
64//! arranged
65//! .as_collection(|k,v| (k.clone(), v.clone()))
66//! .inspect(|x| println!("Observed: {:?}", x))
67//! .probe_with(&mut probe);
68//! });
69//!
70//! // Introduce the key, with a specific value.
71//! input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
72//! input.advance_to(4);
73//! while probe.less_than(input.time()) { worker.step(); }
74//!
75//! // Change the value to a different value.
76//! input.send(("frank".to_string(), Some("zappa".to_string()), 4));
77//! input.advance_to(5);
78//! while probe.less_than(input.time()) { worker.step(); }
79//!
80//! // Remove the key and its value.
81//! input.send(("frank".to_string(), None, 5));
82//! input.advance_to(9);
83//! while probe.less_than(input.time()) { worker.step(); }
84//!
85//! // Introduce a new totally different value
86//! input.send(("frank".to_string(), Some("oz".to_string()), 9));
87//! input.advance_to(10);
88//! while probe.less_than(input.time()) { worker.step(); }
89//!
90//! // Repeat the value, which should produce no output.
91//! input.send(("frank".to_string(), Some("oz".to_string()), 11));
92//! input.advance_to(12);
93//! while probe.less_than(input.time()) { worker.step(); }
94//! // Remove the key and value.
95//! input.send(("frank".to_string(), None, 15));
96//! input.close();
97//!
98//! }).unwrap();
99//! ```
100
101use std::collections::{BinaryHeap, BTreeMap};
102
103use timely::order::{PartialOrder, TotalOrder};
104use timely::dataflow::{Scope, Stream};
105use timely::dataflow::operators::generic::Operator;
106use timely::dataflow::channels::pact::Exchange;
107use timely::progress::Timestamp;
108use timely::progress::Antichain;
109use timely::dataflow::operators::Capability;
110
111use crate::operators::arrange::arrangement::Arranged;
112use crate::trace::{Builder, Description};
113use crate::trace::{self, Trace, TraceReader, Cursor};
114use crate::{ExchangeData, Hashable};
115
116use crate::trace::implementations::containers::BatchContainer;
117
118use super::TraceAgent;
119
120/// Arrange data from a stream of keyed upserts.
121///
122/// The input should be a stream of timestamped pairs of `Key` and `Option<Val>`.
123/// The contents of the collection are defined key-by-key, where each optional
124/// value in sequence either replaces or removes the existing value, should it
125/// exist.
126///
127/// This method is only implemented for totally ordered times, as we do not yet
128/// understand what a "sequence" of upserts would mean for partially ordered
129/// timestamps.
130pub fn arrange_from_upsert<G, Bu, Tr>(
131 stream: Stream<G, Vec<(Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>>,
132 name: &str,
133) -> Arranged<G, TraceAgent<Tr>>
134where
135 G: Scope<Timestamp=Tr::Time>,
136 Tr: for<'a> Trace<
137 KeyOwn: ExchangeData+Hashable+std::hash::Hash,
138 ValOwn: ExchangeData,
139 Time: TotalOrder+ExchangeData,
140 Diff=isize,
141 >+'static,
142 Bu: Builder<Time=G::Timestamp, Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
143{
144 let mut reader: Option<TraceAgent<Tr>> = None;
145
146 // fabricate a data-parallel operator using the `unary_notify` pattern.
147 let stream = {
148
149 let reader = &mut reader;
150
151 let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option<Tr::ValOwn>,G::Timestamp)| (update.0).hashed().into());
152
153 let scope = stream.scope();
154 stream.unary_frontier(exchange, name, move |_capability, info| {
155
156 // Acquire a logger for arrange events.
157 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
158
159 // Tracks the lower envelope of times in `priority_queue`.
160 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
161 // Form the trace we will both use internally and publish.
162 let activator = Some(scope.activator_for(info.address.clone()));
163 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
164
165 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
166 empty_trace.set_exert_logic(exert_logic);
167 }
168
169 let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
170 // Capture the reader outside the builder scope.
171 *reader = Some(reader_local.clone());
172
173 // Tracks the input frontier, used to populate the lower bound of new batches.
174 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
175
176 // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
177 let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>>::new();
178 let mut updates = Vec::new();
179
180 move |(input, frontier), output| {
181
182 // Stash capabilities and associated data (ordered by time).
183 input.for_each(|cap, data| {
184 capabilities.insert(cap.retain(0));
185 for (key, val, time) in data.drain(..) {
186 priority_queue.push(std::cmp::Reverse((time, key, val)))
187 }
188 });
189
190 // Assert that the frontier never regresses.
191 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
192
193 // Test to see if strict progress has occurred, which happens whenever the new
194 // frontier isn't equal to the previous. It is only in this case that we have any
195 // data processing to do.
196 if prev_frontier.borrow() != frontier.frontier() {
197
198 // If there is at least one capability not in advance of the input frontier ...
199 if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
200
201 let mut upper = Antichain::new(); // re-used allocation for sealing batches.
202
203 // For each capability not in advance of the input frontier ...
204 for (index, capability) in capabilities.elements().iter().enumerate() {
205
206 if !frontier.less_equal(capability.time()) {
207
208 // Assemble the upper bound on times we can commit with this capabilities.
209 // We must respect the input frontier, and *subsequent* capabilities, as
210 // we are pretending to retire the capability changes one by one.
211 upper.clear();
212 for time in frontier.frontier().iter() {
213 upper.insert(time.clone());
214 }
215 for other_capability in &capabilities.elements()[(index + 1) .. ] {
216 upper.insert(other_capability.time().clone());
217 }
218
219 // Extract upserts available to process as of this `upper`.
220 let mut to_process = BTreeMap::new();
221 while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
222 let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
223 to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
224 }
225 // Reduce the allocation behind the priority queue if it is presently excessive.
226 // A factor of four is used to avoid repeated doubling and shrinking.
227 // TODO: if the queue were a sequence of geometrically sized allocations, we could
228 // shed the additional capacity without copying any data.
229 if priority_queue.capacity() > 4 * priority_queue.len() {
230 priority_queue.shrink_to_fit();
231 }
232
233 // Prepare a cursor to the existing arrangement, and a batch builder for
234 // new stuff that we add.
235 let (mut trace_cursor, trace_storage) = reader_local.cursor();
236 let mut builder = Bu::new();
237 let mut key_con = Tr::KeyContainer::with_capacity(1);
238 for (key, mut list) in to_process {
239
240 key_con.clear(); key_con.push_own(&key);
241
242 // The prior value associated with the key.
243 let mut prev_value: Option<Tr::ValOwn> = None;
244
245 // Attempt to find the key in the trace.
246 trace_cursor.seek_key(&trace_storage, key_con.index(0));
247 if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) {
248 // Determine the prior value associated with the key.
249 while let Some(val) = trace_cursor.get_val(&trace_storage) {
250 let mut count = 0;
251 trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff));
252 assert!(count == 0 || count == 1);
253 if count == 1 {
254 assert!(prev_value.is_none());
255 prev_value = Some(Tr::owned_val(val));
256 }
257 trace_cursor.step_val(&trace_storage);
258 }
259 trace_cursor.step_key(&trace_storage);
260 }
261
262 // Sort the list of upserts to `key` by their time, suppress multiple updates.
263 list.sort();
264 list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
265 for (time, std::cmp::Reverse(next)) in list {
266 if prev_value != next {
267 if let Some(prev) = prev_value {
268 updates.push(((key.clone(), prev), time.clone(), -1));
269 }
270 if let Some(next) = next.as_ref() {
271 updates.push(((key.clone(), next.clone()), time.clone(), 1));
272 }
273 prev_value = next;
274 }
275 }
276 // Must insert updates in (key, val, time) order.
277 updates.sort();
278 builder.push(&mut updates);
279 }
280 let description = Description::new(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
281 let batch = builder.done(description);
282 prev_frontier.clone_from(&upper);
283
284 // Communicate `batch` to the arrangement and the stream.
285 writer.insert(batch.clone(), Some(capability.time().clone()));
286 output.session(&capabilities.elements()[index]).give(batch);
287 }
288 }
289
290 // Having extracted and sent batches between each capability and the input frontier,
291 // we should downgrade all capabilities to match the batcher's lower update frontier.
292 // This may involve discarding capabilities, which is fine as any new updates arrive
293 // in messages with new capabilities.
294
295 let mut new_capabilities = Antichain::new();
296 if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
297 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
298 new_capabilities.insert(capability.delayed(time));
299 }
300 else {
301 panic!("failed to find capability");
302 }
303 }
304
305 capabilities = new_capabilities;
306 }
307 else {
308 // Announce progress updates, even without data.
309 writer.seal(frontier.frontier().to_owned());
310 }
311
312 // Update our view of the input frontier.
313 prev_frontier.clear();
314 prev_frontier.extend(frontier.frontier().iter().cloned());
315
316 // Downgrade capabilities for `reader_local`.
317 reader_local.set_logical_compaction(prev_frontier.borrow());
318 reader_local.set_physical_compaction(prev_frontier.borrow());
319 }
320
321 writer.exert();
322 }
323 })
324 };
325
326 Arranged { stream, trace: reader.unwrap() }
327
328}