differential_dogs3/operators/half_join.rs
1//! Dataflow operator for delta joins over partially ordered timestamps.
2//!
3//! Given multiple streams of updates `(data, time, diff)` that are each
4//! defined over the same partially ordered `time`, we want to form the
5//! full cross-join of all relations (we will *later* apply some filters
6//! and instead equijoin on keys).
7//!
8//! The "correct" output is the outer join of these triples, where
9//! 1. The `data` entries are just tuple'd up together,
10//! 2. The `time` entries are subjected to the lattice `join` operator,
11//! 3. The `diff` entries are multiplied.
12//!
13//! One way to produce the correct output is to form independent dataflow
14//! fragments for each input stream, such that each intended output is then
15//! produced by exactly one of these input streams.
16//!
17//! There are several incorrect ways one might do this, but here is one way
18//! that I hope is not incorrect:
19//!
20//! Each input stream of updates is joined with each other input collection,
21//! where each input update is matched against each other input update that
22//! has a `time` that is less-than the input update's `time`, *UNDER A TOTAL
23//! ORDER ON `time`*. The output are the `(data, time, diff)` entries that
24//! follow the rules above, except that we additionally preserve the input's
25//! initial `time` as well, for use in subsequent joins with the other input
26//! collections.
27//!
28//! There are some caveats about ties, and we should treat each `time` for
29//! each input as occurring at distinct times, one after the other (so that
30//! ties are resolved by the index of the input). There is also the matter
31//! of logical compaction, which should not be done in a way that prevents
32//! the correct determination of the total order comparison.
33
34use std::collections::HashMap;
35use std::ops::Mul;
36use std::time::Instant;
37
38use timely::ContainerBuilder;
39use timely::container::CapacityContainerBuilder;
40use timely::dataflow::{Scope, ScopeParent, StreamCore};
41use timely::dataflow::channels::pact::{Pipeline, Exchange};
42use timely::dataflow::operators::{Capability, Operator, generic::Session};
43use timely::progress::Antichain;
44use timely::progress::frontier::AntichainRef;
45
46use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
47use differential_dataflow::difference::{Monoid, Semigroup};
48use differential_dataflow::lattice::Lattice;
49use differential_dataflow::operators::arrange::Arranged;
50use differential_dataflow::trace::{Cursor, TraceReader};
51use differential_dataflow::consolidation::{consolidate, consolidate_updates};
52use differential_dataflow::trace::implementations::BatchContainer;
53
54/// A binary equijoin that responds to updates on only its first input.
55///
56/// This operator responds to inputs of the form
57///
58/// ```ignore
59/// ((key, val1, time1), initial_time, diff1)
60/// ```
61///
62/// where `initial_time` is less or equal to `time1`, and produces as output
63///
64/// ```ignore
65/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
66/// ```
67///
68/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
69/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
70/// This last constraint is important to ensure that we correctly produce
71/// all pairs of output updates across multiple `half_join` operators.
72///
73/// Notice that the time is hoisted up into data. The expectation is that
74/// once out of the "delta flow region", the updates will be `delay`d to the
75/// times specified in the payloads.
76pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
77 stream: &VecCollection<G, (K, V, G::Timestamp), R>,
78 arrangement: Arranged<G, Tr>,
79 frontier_func: FF,
80 comparison: CF,
81 mut output_func: S,
82) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
83where
84 G: Scope<Timestamp = Tr::Time>,
85 K: Hashable + ExchangeData,
86 V: ExchangeData,
87 R: ExchangeData + Monoid,
88 Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
89 R: Mul<Tr::Diff, Output: Semigroup>,
90 FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
91 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
92 DOut: Clone+'static,
93 S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
94{
95 let output_func = move |session: &mut SessionFor<G, _>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
96 for (time, diff2) in output.drain(..) {
97 let diff = diff1.clone() * diff2.clone();
98 let dout = (output_func(k, v1, v2), time.clone());
99 session.give((dout, initial.clone(), diff));
100 }
101 };
102 half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
103 .as_collection()
104}
105
106/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
107///
108/// This is a shorthand primarily for the reson of readability.
109type SessionFor<'a, 'b, G, CB> =
110 Session<'a, 'b,
111 <G as ScopeParent>::Timestamp,
112 CB,
113 Capability<<G as ScopeParent>::Timestamp>,
114 >;
115
116/// An unsafe variant of `half_join` where the `output_func` closure takes
117/// additional arguments a vector of `time` and `diff` tuples as input and
118/// writes its outputs at a container builder. The container builder
119/// can, but isn't required to, accept `(data, time, diff)` triplets.
120/// This allows for more flexibility, but is more error-prone.
121///
122/// This operator responds to inputs of the form
123///
124/// ```ignore
125/// ((key, val1, time1), initial_time, diff1)
126/// ```
127///
128/// where `initial_time` is less or equal to `time1`, and produces as output
129///
130/// ```ignore
131/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2])
132/// ```
133///
134/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
135/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
136///
137/// The `yield_function` allows the caller to indicate when the operator should
138/// yield control, as a function of the elapsed time and the number of matched
139/// records. Note this is not the number of *output* records, owing mainly to
140/// the number of matched records being easiest to record with low overhead.
141pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
142 stream: &VecCollection<G, (K, V, G::Timestamp), R>,
143 mut arrangement: Arranged<G, Tr>,
144 frontier_func: FF,
145 comparison: CF,
146 yield_function: Y,
147 mut output_func: S,
148) -> StreamCore<G, CB::Container>
149where
150 G: Scope<Timestamp = Tr::Time>,
151 K: Hashable + ExchangeData,
152 V: ExchangeData,
153 R: ExchangeData + Monoid,
154 Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
155 FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
156 CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
157 Y: Fn(std::time::Instant, usize) -> bool + 'static,
158 S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
159 CB: ContainerBuilder,
160{
161 // No need to block physical merging for this operator.
162 arrangement.trace.set_physical_compaction(Antichain::new().borrow());
163 let mut arrangement_trace = Some(arrangement.trace);
164 let arrangement_stream = arrangement.stream;
165
166 let mut stash = HashMap::new();
167
168 let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
169
170 // Stash for (time, diff) accumulation.
171 let mut output_buffer = Vec::new();
172
173 stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
174
175 // Acquire an activator to reschedule the operator when it has unfinished work.
176 let activator = stream.scope().activator_for(info.address);
177
178 move |(input1, frontier1), (input2, frontier2), output| {
179
180 // drain the first input, stashing requests.
181 input1.for_each(|capability, data| {
182 stash.entry(capability.retain())
183 .or_insert(Vec::new())
184 .append(data)
185 });
186
187 // Drain input batches; although we do not observe them, we want access to the input
188 // to observe the frontier and to drive scheduling.
189 input2.for_each(|_, _| { });
190
191 // Local variables to track if and when we should exit early.
192 // The rough logic is that we fully process inputs and set their differences to zero,
193 // stopping at any point. We clean up all of the zeros in buffers that did any work,
194 // and reactivate at the end if the yield function still says so.
195 let mut yielded = false;
196 let timer = std::time::Instant::now();
197 let mut work = 0;
198
199 // New entries to introduce to the stash after processing.
200 let mut stash_additions = HashMap::new();
201
202 if let Some(ref mut trace) = arrangement_trace {
203
204 for (capability, proposals) in stash.iter_mut() {
205
206 // Avoid computation if we should already yield.
207 // TODO: Verify this is correct for TOTAL ORDER.
208 yielded = yielded || yield_function(timer, work);
209 if !yielded && !frontier2.less_equal(capability.time()) {
210
211 let frontier = frontier2.frontier();
212
213 // Update yielded: We can only go from false to {false, true} as
214 // we're checking that `!yielded` holds before entering this block.
215 yielded = process_proposals::<G, _, _, _, _, _, _, _, _>(
216 &comparison,
217 &yield_function,
218 &mut output_func,
219 &mut output_buffer,
220 timer,
221 &mut work,
222 trace,
223 proposals,
224 output.session_with_builder(capability),
225 frontier
226 );
227
228 proposals.retain(|ptd| !ptd.2.is_zero());
229
230 // Determine the lower bound of remaining update times.
231 let mut antichain = Antichain::new();
232 for (_, initial, _) in proposals.iter() {
233 antichain.insert(initial.clone());
234 }
235 // Fast path: there is only one element in the antichain.
236 // All times in `proposals` must be greater or equal to it.
237 if antichain.len() == 1 && !antichain.less_equal(capability.time()) {
238 stash_additions
239 .entry(capability.delayed(&antichain[0]))
240 .or_insert(Vec::new())
241 .append(proposals);
242 }
243 else if antichain.len() > 1 {
244 // Any remaining times should peel off elements from `proposals`.
245 let mut additions = vec![Vec::new(); antichain.len()];
246 for (data, initial, diff) in proposals.drain(..) {
247 use timely::PartialOrder;
248 let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap();
249 additions[position].push((data, initial, diff));
250 }
251 for (time, addition) in antichain.into_iter().zip(additions) {
252 stash_additions
253 .entry(capability.delayed(&time))
254 .or_insert(Vec::new())
255 .extend(addition);
256 }
257 }
258 }
259 }
260 }
261
262 // If we yielded, re-activate the operator.
263 if yielded {
264 activator.activate();
265 }
266
267 // drop fully processed capabilities.
268 stash.retain(|_,proposals| !proposals.is_empty());
269
270 for (capability, proposals) in stash_additions.into_iter() {
271 stash.entry(capability).or_insert(Vec::new()).extend(proposals);
272 }
273
274 // The logical merging frontier depends on both input1 and stash.
275 let mut frontier = timely::progress::frontier::Antichain::new();
276 for time in frontier1.frontier().iter() {
277 frontier_func(time, &mut frontier);
278 }
279 for time in stash.keys() {
280 frontier_func(time, &mut frontier);
281 }
282 arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
283
284 if frontier1.is_empty() && stash.is_empty() {
285 arrangement_trace = None;
286 }
287 }
288 })
289}
290
291/// Outlined inner loop for `half_join_internal_unsafe` for reasons of performance.
292///
293/// Gives Rust/LLVM the opportunity to inline the loop body instead of inlining the loop and
294/// leaving all calls in the loop body outlined.
295///
296/// Consumes proposals until the yield function returns `true` or all proposals are processed.
297/// Leaves a zero diff in place for all proposals that were processed.
298///
299/// Returns `true` if the operator should yield.
300fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
301 comparison: &CF,
302 yield_function: &Y,
303 output_func: &mut S,
304 mut output_buffer: &mut Vec<(Tr::Time, Tr::Diff)>,
305 timer: Instant,
306 work: &mut usize,
307 trace: &mut Tr,
308 proposals: &mut Vec<((K, V, Tr::Time), Tr::Time, R)>,
309 mut session: SessionFor<G, CB>,
310 frontier: AntichainRef<Tr::Time>
311) -> bool
312where
313 G: Scope<Timestamp = Tr::Time>,
314 Tr: for<'a> TraceReader<KeyOwn = K>,
315 CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
316 Y: Fn(Instant, usize) -> bool + 'static,
317 S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
318 CB: ContainerBuilder,
319 K: Ord,
320 V: Ord,
321 R: Monoid,
322{
323 // Sort requests by key for in-order cursor traversal.
324 consolidate_updates(proposals);
325
326 let (mut cursor, storage) = trace.cursor();
327 let mut yielded = false;
328
329 let mut key_con = Tr::KeyContainer::with_capacity(1);
330 let mut time_con = Tr::TimeContainer::with_capacity(1);
331 for time in frontier.iter() {
332 time_con.push_own(time);
333 }
334
335 // Process proposals one at a time, stopping if we should yield.
336 for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
337 // Use TOTAL ORDER to allow the release of `time`.
338 yielded = yielded || yield_function(timer, *work);
339
340 if !yielded && !(0 .. time_con.len()).any(|i| comparison(time_con.index(i), initial)) {
341 key_con.clear(); key_con.push_own(&key);
342 cursor.seek_key(&storage, key_con.index(0));
343 if cursor.get_key(&storage) == key_con.get(0) {
344 while let Some(val2) = cursor.get_val(&storage) {
345 cursor.map_times(&storage, |t, d| {
346 if comparison(t, initial) {
347 let mut t = Tr::owned_time(t);
348 t.join_assign(time);
349 output_buffer.push((t, Tr::owned_diff(d)))
350 }
351 });
352 consolidate(&mut output_buffer);
353 *work += output_buffer.len();
354 output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
355 // Defensive clear; we'd expect `output_func` to clear the buffer.
356 // TODO: Should we assert it is empty?
357 output_buffer.clear();
358 cursor.step_val(&storage);
359 }
360 cursor.rewind_vals(&storage);
361 }
362 *diff1 = R::zero();
363 }
364 }
365
366 yielded
367}