1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
//! Support for forming collections from streams of upsert.
//!
//! Upserts are sequences of keyed optional values, and they define a collection of
//! the pairs of keys and each's most recent value, if it is present. Element in the
//! sequence effectively overwrites the previous value at the key, if present, and if
//! the value is not present it uninstalls the key.
//!
//! Upserts are non-trivial because they do not themselves describe the deletions that
//! the `Collection` update stream must present. However, if one creates an `Arrangement`
//! then this state provides sufficient information. The arrangement will continue to
//! exist even if dropped until the input or dataflow shuts down, as the upsert operator
//! itself needs access to its accumulated state.
//!
//! # Notes
//!
//! Upserts currently only work with totally ordered timestamps.
//!
//! In the case of ties in timestamps (concurrent updates to the same key) they choose
//! the *greatest* value according to `Option<Val>` ordering, which will prefer a value
//! to `None` and choose the greatest value (informally, as if applied in order of value).
//!
//! If the same value is repeated, no change will occur in the output. That may make this
//! operator effective at determining the difference between collections of keyed values,
//! but note that it will not notice the absence of keys in a collection.
//!
//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to
//! the input stream, mapping values that fail the predicate to `None` values, like so:
//!
//! ```ignore
//! // Dropped values should be retained as "uninstall" upserts.
//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))
//! ```
//!
//! # Example
//!
//! ```rust
//! // define a new timely dataflow computation.
//! timely::execute_from_args(std::env::args().skip(1), move |worker| {
//!
//! type Key = String;
//! type Val = String;
//!
//! let mut input = timely::dataflow::InputHandle::new();
//! let mut probe = timely::dataflow::ProbeHandle::new();
//!
//! // Create a dataflow demonstrating upserts.
//! //
//! // Upserts are a sequence of records (key, option<val>) where the intended
//! // value associated with a key is the most recent value, and if that is a
//! // `none` then the key is removed (until a new value shows up).
//! //
//! // The challenge with upserts is that the value to *retract* isn't supplied
//! // as part of the input stream. We have to determine what it should be!
//!
//! worker.dataflow(|scope| {
//!
//! use timely::dataflow::operators::Input;
//! use differential_dataflow::trace::implementations::ValSpine;
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine<Key, Val, _, _>>(&stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
//! .inspect(|x| println!("Observed: {:?}", x))
//! .probe_with(&mut probe);
//! });
//!
//! // Introduce the key, with a specific value.
//! input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
//! input.advance_to(4);
//! while probe.less_than(input.time()) { worker.step(); }
//!
//! // Change the value to a different value.
//! input.send(("frank".to_string(), Some("zappa".to_string()), 4));
//! input.advance_to(5);
//! while probe.less_than(input.time()) { worker.step(); }
//!
//! // Remove the key and its value.
//! input.send(("frank".to_string(), None, 5));
//! input.advance_to(9);
//! while probe.less_than(input.time()) { worker.step(); }
//!
//! // Introduce a new totally different value
//! input.send(("frank".to_string(), Some("oz".to_string()), 9));
//! input.advance_to(10);
//! while probe.less_than(input.time()) { worker.step(); }
//!
//! // Repeat the value, which should produce no output.
//! input.send(("frank".to_string(), Some("oz".to_string()), 11));
//! input.advance_to(12);
//! while probe.less_than(input.time()) { worker.step(); }
//! // Remove the key and value.
//! input.send(("frank".to_string(), None, 15));
//! input.close();
//!
//! }).unwrap();
//! ```
use std::collections::{BinaryHeap, HashMap};
use timely::order::{PartialOrder, TotalOrder};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;
use timely::progress::Timestamp;
use timely::progress::Antichain;
use timely::dataflow::operators::Capability;
use crate::operators::arrange::arrangement::Arranged;
use crate::trace::Builder;
use crate::trace::{self, Trace, TraceReader, Batch, Cursor};
use crate::trace::cursor::IntoOwned;
use crate::{ExchangeData, Hashable};
use super::TraceAgent;
/// Arrange data from a stream of keyed upserts.
///
/// The input should be a stream of timestamped pairs of Key and Option<Val>.
/// The contents of the collection are defined key-by-key, where each optional
/// value in sequence either replaces or removes the existing value, should it
/// exist.
///
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, K, V, Tr>(
stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp=Tr::Time>,
Tr: Trace+TraceReader<Diff=isize>+'static,
for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>,
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = Vec<((K, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;
// fabricate a data-parallel operator using the `unary_notify` pattern.
let stream = {
let reader = &mut reader;
let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());
stream.unary_frontier(exchange, name, move |_capability, info| {
// Acquire a logger for arrange events.
let logger = {
let scope = stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};
// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture the reader outside the builder scope.
*reader = Some(reader_local.clone());
// Tracks the input frontier, used to populate the lower bound of new batches.
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
let mut updates = Vec::new();
move |input, output| {
// Stash capabilities and associated data (ordered by time).
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
// Assert that the frontier never regresses.
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
// Test to see if strict progress has occurred, which happens whenever the new
// frontier isn't equal to the previous. It is only in this case that we have any
// data processing to do.
if prev_frontier.borrow() != input.frontier().frontier() {
// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
let mut upper = Antichain::new(); // re-used allocation for sealing batches.
// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
if !input.frontier().less_equal(capability.time()) {
// Assemble the upper bound on times we can commit with this capabilities.
// We must respect the input frontier, and *subsequent* capabilities, as
// we are pretending to retire the capability changes one by one.
upper.clear();
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}
// Extract upserts available to process as of this `upper`.
let mut to_process = HashMap::new();
while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
}
// Reduce the allocation behind the priority queue if it is presently excessive.
// A factor of four is used to avoid repeated doubling and shrinking.
// TODO: if the queue were a sequence of geometrically sized allocations, we could
// shed the additional capacity without copying any data.
if priority_queue.capacity() > 4 * priority_queue.len() {
priority_queue.shrink_to_fit();
}
// Put (key, list) into key order, to match cursor enumeration.
let mut to_process = to_process.into_iter().collect::<Vec<_>>();
to_process.sort();
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {
// The prior value associated with the key.
let mut prev_value: Option<V> = None;
// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(val.into_owned());
}
trace_cursor.step_val(&trace_storage);
}
trace_cursor.step_key(&trace_storage);
}
// Sort the list of upserts to `key` by their time, suppress multiple updates.
list.sort();
list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push(((key.clone(), prev), time.clone(), -1));
}
if let Some(next) = next.as_ref() {
updates.push(((key.clone(), next.clone()), time.clone(), 1));
}
prev_value = next;
}
}
// Must insert updates in (key, val, time) order.
updates.sort();
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
// Communicate `batch` to the arrangement and the stream.
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
}
}
// Having extracted and sent batches between each capability and the input frontier,
// we should downgrade all capabilities to match the batcher's lower update frontier.
// This may involve discarding capabilities, which is fine as any new updates arrive
// in messages with new capabilities.
let mut new_capabilities = Antichain::new();
if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
}
else {
// Announce progress updates, even without data.
writer.seal(input.frontier().frontier().to_owned());
}
// Update our view of the input frontier.
prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
// Downgrade capabilities for `reader_local`.
reader_local.set_logical_compaction(prev_frontier.borrow());
reader_local.set_physical_compaction(prev_frontier.borrow());
}
writer.exert();
}
})
};
Arranged { stream, trace: reader.unwrap() }
}