differential_dogs3/operators/
half_join.rs

1//! Dataflow operator for delta joins over partially ordered timestamps.
2//!
3//! Given multiple streams of updates `(data, time, diff)` that are each
4//! defined over the same partially ordered `time`, we want to form the
5//! full cross-join of all relations (we will *later* apply some filters
6//! and instead equijoin on keys).
7//!
8//! The "correct" output is the outer join of these triples, where
9//!   1. The `data` entries are just tuple'd up together,
10//!   2. The `time` entries are subjected to the lattice `join` operator,
11//!   3. The `diff` entries are multiplied.
12//!
13//! One way to produce the correct output is to form independent dataflow
14//! fragments for each input stream, such that each intended output is then
15//! produced by exactly one of these input streams.
16//!
17//! There are several incorrect ways one might do this, but here is one way
18//! that I hope is not incorrect:
19//!
20//! Each input stream of updates is joined with each other input collection,
21//! where each input update is matched against each other input update that
22//! has a `time` that is less-than the input update's `time`, *UNDER A TOTAL
23//! ORDER ON `time`*. The output are the `(data, time, diff)` entries that
24//! follow the rules above, except that we additionally preserve the input's
25//! initial `time` as well, for use in subsequent joins with the other input
26//! collections.
27//!
28//! There are some caveats about ties, and we should treat each `time` for
29//! each input as occurring at distinct times, one after the other (so that
30//! ties are resolved by the index of the input). There is also the matter
31//! of logical compaction, which should not be done in a way that prevents
32//! the correct determination of the total order comparison.
33
34use std::collections::HashMap;
35use std::ops::Mul;
36use std::time::Instant;
37
38use timely::ContainerBuilder;
39use timely::container::CapacityContainerBuilder;
40use timely::dataflow::{Scope, ScopeParent, StreamCore};
41use timely::dataflow::channels::pact::{Pipeline, Exchange};
42use timely::dataflow::operators::{Capability, Operator, generic::Session};
43use timely::progress::Antichain;
44use timely::progress::frontier::AntichainRef;
45
46use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
47use differential_dataflow::difference::{Monoid, Semigroup};
48use differential_dataflow::lattice::Lattice;
49use differential_dataflow::operators::arrange::Arranged;
50use differential_dataflow::trace::{Cursor, TraceReader};
51use differential_dataflow::consolidation::{consolidate, consolidate_updates};
52use differential_dataflow::trace::implementations::BatchContainer;
53
54/// A binary equijoin that responds to updates on only its first input.
55///
56/// This operator responds to inputs of the form
57///
58/// ```ignore
59/// ((key, val1, time1), initial_time, diff1)
60/// ```
61///
62/// where `initial_time` is less or equal to `time1`, and produces as output
63///
64/// ```ignore
65/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
66/// ```
67///
68/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
69/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
70/// This last constraint is important to ensure that we correctly produce
71/// all pairs of output updates across multiple `half_join` operators.
72///
73/// Notice that the time is hoisted up into data. The expectation is that
74/// once out of the "delta flow region", the updates will be `delay`d to the
75/// times specified in the payloads.
76pub fn half_join<G, K, V, R, Tr, FF, CF, DOut, S>(
77    stream: &VecCollection<G, (K, V, G::Timestamp), R>,
78    arrangement: Arranged<G, Tr>,
79    frontier_func: FF,
80    comparison: CF,
81    mut output_func: S,
82) -> VecCollection<G, (DOut, G::Timestamp), <R as Mul<Tr::Diff>>::Output>
83where
84    G: Scope<Timestamp = Tr::Time>,
85    K: Hashable + ExchangeData,
86    V: ExchangeData,
87    R: ExchangeData + Monoid,
88    Tr: TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
89    R: Mul<Tr::Diff, Output: Semigroup>,
90    FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
91    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
92    DOut: Clone+'static,
93    S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static,
94{
95    let output_func = move |session: &mut SessionFor<G, _>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| {
96        for (time, diff2) in output.drain(..) {
97            let diff = diff1.clone() * diff2.clone();
98            let dout = (output_func(k, v1, v2), time.clone());
99            session.give((dout, initial.clone(), diff));
100        }
101    };
102    half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder<Vec<_>>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func)
103        .as_collection()
104}
105
106/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
107///
108/// This is a shorthand primarily for the reson of readability.
109type SessionFor<'a, 'b, G, CB> =
110    Session<'a, 'b,
111        <G as ScopeParent>::Timestamp,
112        CB,
113        Capability<<G as ScopeParent>::Timestamp>,
114    >;
115
116/// An unsafe variant of `half_join` where the `output_func` closure takes
117/// additional arguments a vector of `time` and `diff` tuples as input and
118/// writes its outputs at a container builder. The container builder
119/// can, but isn't required to, accept `(data, time, diff)` triplets.
120/// This allows for more flexibility, but is more error-prone.
121///
122/// This operator responds to inputs of the form
123///
124/// ```ignore
125/// ((key, val1, time1), initial_time, diff1)
126/// ```
127///
128/// where `initial_time` is less or equal to `time1`, and produces as output
129///
130/// ```ignore
131/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2])
132/// ```
133///
134/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
135/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
136///
137/// The `yield_function` allows the caller to indicate when the operator should
138/// yield control, as a function of the elapsed time and the number of matched
139/// records. Note this is not the number of *output* records, owing mainly to
140/// the number of matched records being easiest to record with low overhead.
141pub fn half_join_internal_unsafe<G, K, V, R, Tr, FF, CF, Y, S, CB>(
142    stream: &VecCollection<G, (K, V, G::Timestamp), R>,
143    mut arrangement: Arranged<G, Tr>,
144    frontier_func: FF,
145    comparison: CF,
146    yield_function: Y,
147    mut output_func: S,
148) -> StreamCore<G, CB::Container>
149where
150    G: Scope<Timestamp = Tr::Time>,
151    K: Hashable + ExchangeData,
152    V: ExchangeData,
153    R: ExchangeData + Monoid,
154    Tr: for<'a> TraceReader<KeyOwn = K, Time: std::hash::Hash>+Clone+'static,
155    FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
156    CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
157    Y: Fn(std::time::Instant, usize) -> bool + 'static,
158    S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
159    CB: ContainerBuilder,
160{
161    // No need to block physical merging for this operator.
162    arrangement.trace.set_physical_compaction(Antichain::new().borrow());
163    let mut arrangement_trace = Some(arrangement.trace);
164    let arrangement_stream = arrangement.stream;
165
166    let mut stash = HashMap::new();
167
168    let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into());
169
170    // Stash for (time, diff) accumulation.
171    let mut output_buffer = Vec::new();
172
173    stream.inner.binary_frontier(&arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| {
174
175        // Acquire an activator to reschedule the operator when it has unfinished work.
176        let activator = stream.scope().activator_for(info.address);
177
178        move |(input1, frontier1), (input2, frontier2), output| {
179
180            // drain the first input, stashing requests.
181            input1.for_each(|capability, data| {
182                stash.entry(capability.retain())
183                    .or_insert(Vec::new())
184                    .append(data)
185            });
186
187            // Drain input batches; although we do not observe them, we want access to the input
188            // to observe the frontier and to drive scheduling.
189            input2.for_each(|_, _| { });
190
191            // Local variables to track if and when we should exit early.
192            // The rough logic is that we fully process inputs and set their differences to zero,
193            // stopping at any point. We clean up all of the zeros in buffers that did any work,
194            // and reactivate at the end if the yield function still says so.
195            let mut yielded = false;
196            let timer = std::time::Instant::now();
197            let mut work = 0;
198
199            // New entries to introduce to the stash after processing.
200            let mut stash_additions = HashMap::new();
201
202            if let Some(ref mut trace) = arrangement_trace {
203
204                for (capability, proposals) in stash.iter_mut() {
205
206                    // Avoid computation if we should already yield.
207                    // TODO: Verify this is correct for TOTAL ORDER.
208                    yielded = yielded || yield_function(timer, work);
209                    if !yielded && !frontier2.less_equal(capability.time()) {
210
211                        let frontier = frontier2.frontier();
212
213                        // Update yielded: We can only go from false to {false, true} as
214                        // we're checking that `!yielded` holds before entering this block.
215                        yielded = process_proposals::<G, _, _, _, _, _, _, _, _>(
216                            &comparison,
217                            &yield_function,
218                            &mut output_func,
219                            &mut output_buffer,
220                            timer,
221                            &mut work,
222                            trace,
223                            proposals,
224                            output.session_with_builder(capability),
225                            frontier
226                        );
227
228                        proposals.retain(|ptd| !ptd.2.is_zero());
229
230                        // Determine the lower bound of remaining update times.
231                        let mut antichain = Antichain::new();
232                        for (_, initial, _) in proposals.iter() {
233                            antichain.insert(initial.clone());
234                        }
235                        // Fast path: there is only one element in the antichain.
236                        // All times in `proposals` must be greater or equal to it.
237                        if antichain.len() == 1 && !antichain.less_equal(capability.time()) {
238                            stash_additions
239                                .entry(capability.delayed(&antichain[0]))
240                                .or_insert(Vec::new())
241                                .append(proposals);
242                        }
243                        else if antichain.len() > 1 {
244                            // Any remaining times should peel off elements from `proposals`.
245                            let mut additions = vec![Vec::new(); antichain.len()];
246                            for (data, initial, diff) in proposals.drain(..) {
247                                use timely::PartialOrder;
248                                let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap();
249                                additions[position].push((data, initial, diff));
250                            }
251                            for (time, addition) in antichain.into_iter().zip(additions) {
252                                stash_additions
253                                    .entry(capability.delayed(&time))
254                                    .or_insert(Vec::new())
255                                    .extend(addition);
256                            }
257                        }
258                    }
259                }
260            }
261
262            // If we yielded, re-activate the operator.
263            if yielded {
264                activator.activate();
265            }
266
267            // drop fully processed capabilities.
268            stash.retain(|_,proposals| !proposals.is_empty());
269
270            for (capability, proposals) in stash_additions.into_iter() {
271                stash.entry(capability).or_insert(Vec::new()).extend(proposals);
272            }
273
274            // The logical merging frontier depends on both input1 and stash.
275            let mut frontier = timely::progress::frontier::Antichain::new();
276            for time in frontier1.frontier().iter() {
277                frontier_func(time, &mut frontier);
278            }
279            for time in stash.keys() {
280                frontier_func(time, &mut frontier);
281            }
282            arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
283
284            if frontier1.is_empty() && stash.is_empty() {
285                arrangement_trace = None;
286            }
287        }
288    })
289}
290
291/// Outlined inner loop for `half_join_internal_unsafe` for reasons of performance.
292///
293/// Gives Rust/LLVM the opportunity to inline the loop body instead of inlining the loop and
294/// leaving all calls in the loop body outlined.
295///
296/// Consumes proposals until the yield function returns `true` or all proposals are processed.
297/// Leaves a zero diff in place for all proposals that were processed.
298///
299/// Returns `true` if the operator should yield.
300fn process_proposals<G, Tr, CF, Y, S, CB, K, V, R>(
301    comparison: &CF,
302    yield_function: &Y,
303    output_func: &mut S,
304    mut output_buffer: &mut Vec<(Tr::Time, Tr::Diff)>,
305    timer: Instant,
306    work: &mut usize,
307    trace: &mut Tr,
308    proposals: &mut Vec<((K, V, Tr::Time), Tr::Time, R)>,
309    mut session: SessionFor<G, CB>,
310    frontier: AntichainRef<Tr::Time>
311) -> bool
312where
313    G: Scope<Timestamp = Tr::Time>,
314    Tr: for<'a> TraceReader<KeyOwn = K>,
315    CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static,
316    Y: Fn(Instant, usize) -> bool + 'static,
317    S: FnMut(&mut SessionFor<G, CB>, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static,
318    CB: ContainerBuilder,
319    K: Ord,
320    V: Ord,
321    R: Monoid,
322{
323    // Sort requests by key for in-order cursor traversal.
324    consolidate_updates(proposals);
325
326    let (mut cursor, storage) = trace.cursor();
327    let mut yielded = false;
328
329    let mut key_con = Tr::KeyContainer::with_capacity(1);
330    let mut time_con = Tr::TimeContainer::with_capacity(1);
331    for time in frontier.iter() {
332        time_con.push_own(time);
333    }
334
335    // Process proposals one at a time, stopping if we should yield.
336    for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
337        // Use TOTAL ORDER to allow the release of `time`.
338        yielded = yielded || yield_function(timer, *work);
339
340        if !yielded && !(0 .. time_con.len()).any(|i| comparison(time_con.index(i), initial)) {
341            key_con.clear(); key_con.push_own(&key);
342            cursor.seek_key(&storage, key_con.index(0));
343            if cursor.get_key(&storage) == key_con.get(0) {
344                while let Some(val2) = cursor.get_val(&storage) {
345                    cursor.map_times(&storage, |t, d| {
346                        if comparison(t, initial) {
347                            let mut t = Tr::owned_time(t);
348                            t.join_assign(time);
349                            output_buffer.push((t, Tr::owned_diff(d)))
350                        }
351                    });
352                    consolidate(&mut output_buffer);
353                    *work += output_buffer.len();
354                    output_func(&mut session, key, val1, val2, initial, diff1, &mut output_buffer);
355                    // Defensive clear; we'd expect `output_func` to clear the buffer.
356                    // TODO: Should we assert it is empty?
357                    output_buffer.clear();
358                    cursor.step_val(&storage);
359                }
360                cursor.rewind_vals(&storage);
361            }
362            *diff1 = R::zero();
363        }
364    }
365
366    yielded
367}