differential_dataflow/operators/
join.rs

1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7use timely::Container;
8
9use timely::container::{ContainerBuilder, PushInto};
10use timely::order::PartialOrder;
11use timely::progress::Timestamp;
12use timely::dataflow::{Scope, StreamCore};
13use timely::dataflow::operators::generic::{Operator, OutputHandleCore};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::channels::pushers::buffer::Session;
16use timely::dataflow::channels::pushers::Counter;
17use timely::dataflow::operators::Capability;
18use timely::dataflow::channels::pushers::tee::Tee;
19
20use crate::hashable::Hashable;
21use crate::{Data, ExchangeData, Collection};
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};
25use crate::trace::{BatchReader, Cursor};
26use crate::operators::ValueHistory;
27
28use crate::trace::TraceReader;
29
30/// Join implementations for `(key,val)` data.
31pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
32
33    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
34    ///
35    /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
36    ///
37    /// # Examples
38    ///
39    /// ```
40    /// use differential_dataflow::input::Input;
41    /// use differential_dataflow::operators::Join;
42    ///
43    /// ::timely::example(|scope| {
44    ///
45    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
46    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
47    ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
48    ///
49    ///     x.join(&y)
50    ///      .assert_eq(&z);
51    /// });
52    /// ```
53    fn join<V2, R2>(&self, other: &Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
54    where
55        K: ExchangeData,
56        V2: ExchangeData,
57        R2: ExchangeData+Semigroup,
58        R: Multiply<R2, Output: Semigroup+'static>,
59    {
60        self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
61    }
62
63    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
64    ///
65    /// # Examples
66    ///
67    /// ```
68    /// use differential_dataflow::input::Input;
69    /// use differential_dataflow::operators::Join;
70    ///
71    /// ::timely::example(|scope| {
72    ///
73    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
74    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
75    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
76    ///
77    ///     x.join_map(&y, |_key, &a, &b| (a,b))
78    ///      .assert_eq(&z);
79    /// });
80    /// ```
81    fn join_map<V2, R2, D, L>(&self, other: &Collection<G, (K,V2), R2>, logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
82    where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output: Semigroup+'static>, D: Data, L: FnMut(&K, &V, &V2)->D+'static;
83
84    /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
85    ///
86    /// When the second collection contains frequencies that are either zero or one this is the more traditional
87    /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
88    /// the counts of the records in the first input.
89    ///
90    /// # Examples
91    ///
92    /// ```
93    /// use differential_dataflow::input::Input;
94    /// use differential_dataflow::operators::Join;
95    ///
96    /// ::timely::example(|scope| {
97    ///
98    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
99    ///     let y = scope.new_collection_from(vec![0, 2]).1;
100    ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
101    ///
102    ///     x.semijoin(&y)
103    ///      .assert_eq(&z);
104    /// });
105    /// ```
106    fn semijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
107    where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output: Semigroup+'static>;
108
109    /// Subtracts the semijoin with `other` from `self`.
110    ///
111    /// In the case that `other` has multiplicities zero or one this results
112    /// in a relational antijoin, in which we discard input records whose key
113    /// is present in `other`. If the multiplicities could be other than zero
114    /// or one, the semantic interpretation of this operator is less clear.
115    ///
116    /// In almost all cases, you should ensure that `other` has multiplicities
117    /// that are zero or one, perhaps by using the `distinct` operator.
118    ///
119    /// # Examples
120    ///
121    /// ```
122    /// use differential_dataflow::input::Input;
123    /// use differential_dataflow::operators::Join;
124    ///
125    /// ::timely::example(|scope| {
126    ///
127    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
128    ///     let y = scope.new_collection_from(vec![0, 2]).1;
129    ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
130    ///
131    ///     x.antijoin(&y)
132    ///      .assert_eq(&z);
133    /// });
134    /// ```
135    fn antijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
136    where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output = R>, R: Abelian+'static;
137}
138
139impl<G, K, V, R> Join<G, K, V, R> for Collection<G, (K, V), R>
140where
141    G: Scope<Timestamp: Lattice+Ord>,
142    K: ExchangeData+Hashable,
143    V: ExchangeData,
144    R: ExchangeData+Semigroup,
145{
146    fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
147    where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
148        let arranged1 = self.arrange_by_key();
149        let arranged2 = other.arrange_by_key();
150        arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
151    }
152
153    fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
154    where R: Multiply<R2, Output: Semigroup+'static> {
155        let arranged1 = self.arrange_by_key();
156        let arranged2 = other.arrange_by_self();
157        arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
158    }
159
160    fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
161    where R: Multiply<R2, Output=R>, R: Abelian+'static {
162        self.concat(&self.semijoin(other).negate())
163    }
164}
165
166impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
167where
168    G: Scope<Timestamp=Tr::Time>,
169    Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>+Clone+'static,
170    K: ExchangeData+Hashable,
171    V: Data + 'static,
172{
173    fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
174    where
175        Tr::Diff: Multiply<R2, Output: Semigroup+'static>,
176        L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
177    {
178        let arranged2 = other.arrange_by_key();
179        self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
180    }
181
182    fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
183    where Tr::Diff: Multiply<R2, Output: Semigroup+'static> {
184        let arranged2 = other.arrange_by_self();
185        self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
186    }
187
188    fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), Tr::Diff>
189    where Tr::Diff: Multiply<R2, Output=Tr::Diff>, Tr::Diff: Abelian+'static {
190        self.as_collection(|k,v| (k.clone(), v.clone()))
191            .concat(&self.semijoin(other).negate())
192    }
193}
194
195/// Matches the elements of two arranged traces.
196///
197/// This method is used by the various `join` implementations, but it can also be used
198/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
199/// the arrangement is available for re-use, or from the output of a `reduce` operator.
200pub trait JoinCore<G: Scope<Timestamp: Lattice+Ord>, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> {
201
202    /// Joins two arranged collections with the same key type.
203    ///
204    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
205    /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
206    /// every value returned by the iterator.
207    ///
208    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
209    /// contains the implementations for collections.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use differential_dataflow::input::Input;
215    /// use differential_dataflow::operators::arrange::ArrangeByKey;
216    /// use differential_dataflow::operators::join::JoinCore;
217    /// use differential_dataflow::trace::Trace;
218    ///
219    /// ::timely::example(|scope| {
220    ///
221    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
222    ///                  .arrange_by_key();
223    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
224    ///                  .arrange_by_key();
225    ///
226    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
227    ///
228    ///     x.join_core(&y, |_key, &a, &b| Some((a, b)))
229    ///      .assert_eq(&z);
230    /// });
231    /// ```
232    fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
233    where
234        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
235        R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
236        I: IntoIterator<Item: Data>,
237        L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
238        ;
239
240    /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
241    /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
242    /// flexibility, but is more error-prone.
243    ///
244    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
245    /// which produces something implementing `IntoIterator`, where the output collection will have an entry
246    /// for every value returned by the iterator.
247    ///
248    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
249    /// contains the implementations for collections.
250    ///
251    /// # Examples
252    ///
253    /// ```
254    /// use differential_dataflow::input::Input;
255    /// use differential_dataflow::operators::arrange::ArrangeByKey;
256    /// use differential_dataflow::operators::join::JoinCore;
257    /// use differential_dataflow::trace::Trace;
258    ///
259    /// ::timely::example(|scope| {
260    ///
261    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
262    ///                  .arrange_by_key();
263    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
264    ///                  .arrange_by_key();
265    ///
266    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
267    ///
268    ///     // Returned values have weight `a`
269    ///     x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
270    ///      .assert_eq(&z);
271    /// });
272    /// ```
273    fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
274    where
275        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
276        D: Data,
277        ROut: Semigroup+'static,
278        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
279        L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
280        ;
281}
282
283
284impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>
285where
286    G: Scope<Timestamp: Lattice+Ord>,
287    K: ExchangeData+Hashable,
288    V: ExchangeData,
289    R: ExchangeData+Semigroup,
290{
291    fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
292    where
293        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
294        R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
295        I: IntoIterator<Item: Data>,
296        L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
297    {
298        self.arrange_by_key()
299            .join_core(stream2, result)
300    }
301
302    fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
303    where
304        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
305        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
306        L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
307        D: Data,
308        ROut: Semigroup+'static,
309    {
310        self.arrange_by_key().join_core_internal_unsafe(stream2, result)
311    }
312}
313
314/// The session passed to join closures.
315pub type JoinSession<'a, T, CB, C> = Session<'a, T, EffortBuilder<CB>, Counter<T, C, Tee<T, C>>>;
316
317/// A container builder that tracks the length of outputs to estimate the effort of join closures.
318#[derive(Default, Debug)]
319pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
320
321impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
322    type Container = CB::Container;
323
324    #[inline]
325    fn extract(&mut self) -> Option<&mut Self::Container> {
326        let extracted = self.1.extract();
327        self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.len()));
328        extracted
329    }
330
331    #[inline]
332    fn finish(&mut self) -> Option<&mut Self::Container> {
333        let finished = self.1.finish();
334        self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.len()));
335        finished
336    }
337}
338
339impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
340    #[inline]
341    fn push_into(&mut self, item: D) {
342        self.1.push_into(item);
343    }
344}
345
346/// An equijoin of two traces, sharing a common key type.
347///
348/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
349/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
350/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
351/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
352///
353/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
354/// output stream in a collection.
355///
356/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
357///
358/// [`AsCollection`]: crate::collection::AsCollection
359pub fn join_traces<G, T1, T2, L, CB>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> StreamCore<G, CB::Container>
360where
361    G: Scope<Timestamp=T1::Time>,
362    T1: TraceReader+Clone+'static,
363    T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
364    L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession<T1::Time, CB, CB::Container>)+'static,
365    CB: ContainerBuilder + 'static,
366{
367    // Rename traces for symmetry from here on out.
368    let mut trace1 = arranged1.trace.clone();
369    let mut trace2 = arranged2.trace.clone();
370
371    arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
372
373        // Acquire an activator to reschedule the operator when it has unfinished work.
374        use timely::scheduling::Activator;
375        let activations = arranged1.stream.scope().activations().clone();
376        let activator = Activator::new(info.address, activations);
377
378        // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
379        // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
380        // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
381        // initial work for the two traces, and before the operator is constructed.
382
383        // Acknowledged frontier for each input.
384        // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
385        // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
386        // the physical compaction frontier of their corresponding trace.
387        // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
388        use timely::progress::frontier::Antichain;
389        let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
390        let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
391
392        // deferred work of batches from each input.
393        let mut todo1 = std::collections::VecDeque::new();
394        let mut todo2 = std::collections::VecDeque::new();
395
396        // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
397        trace1.map_batches(|batch1| {
398            acknowledged1.clone_from(batch1.upper());
399            // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
400            // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
401            // Once we start streaming batches in, we will need to respond to new batches from
402            // `input1` with logic that would have otherwise been here. Check out the next loop
403            // for the structure.
404        });
405        // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
406        // iterating through batches and capturing the upper bound. This is a great moment to assert that
407        // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
408        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
409        assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
410
411        // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
412        // on both traces at the same time, as they could be the same trace and this would panic.
413        let mut batch2_cursors = Vec::new();
414        trace2.map_batches(|batch2| {
415            acknowledged2.clone_from(batch2.upper());
416            batch2_cursors.push((batch2.cursor(), batch2.clone()));
417        });
418        // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
419        // iterating through batches and capturing the upper bound. This is a great moment to assert that
420        // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
421        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
422        assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
423
424        // Load up deferred work using trace2 cursors and batches captured just above.
425        for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
426            // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
427            let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
428            // We could downgrade the capability here, but doing so is a bit complicated mathematically.
429            // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
430            // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
431            // that property.
432            todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
433        }
434
435        // Droppable handles to shared trace data structures.
436        let mut trace1_option = Some(trace1);
437        let mut trace2_option = Some(trace2);
438
439        move |input1, input2, output| {
440
441            // 1. Consuming input.
442            //
443            // The join computation repeatedly accepts batches of updates from each of its inputs.
444            //
445            // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
446            // updates from its other input. It is important to track which updates have been accepted, because
447            // we use a shared trace and there may be updates present that are in advance of this accepted bound.
448            //
449            // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
450            // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
451            // This last case is a consequence of our inability to transmit empty batches, as they may be formed
452            // in the absence of timely dataflow capabilities.
453
454            // Drain input 1, prepare work.
455            input1.for_each(|capability, data| {
456                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
457                if let Some(ref mut trace2) = trace2_option {
458                    let capability = capability.retain();
459                    for batch1 in data.drain(..) {
460                        // Ignore any pre-loaded data.
461                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
462                            if !batch1.is_empty() {
463                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
464                                // at start-up, and have held back physical compaction ever since.
465                                let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
466                                let batch1_cursor = batch1.cursor();
467                                todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
468                            }
469
470                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
471                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
472                            // able to just assume the most recent `batch1.upper`
473                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
474                            acknowledged1.clone_from(batch1.upper());
475                        }
476                    }
477                }
478                else { panic!("`trace2_option` dropped before `input1` emptied!"); }
479            });
480
481            // Drain input 2, prepare work.
482            input2.for_each(|capability, data| {
483                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
484                if let Some(ref mut trace1) = trace1_option {
485                    let capability = capability.retain();
486                    for batch2 in data.drain(..) {
487                        // Ignore any pre-loaded data.
488                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
489                            if !batch2.is_empty() {
490                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
491                                // at start-up, and have held back physical compaction ever since.
492                                let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
493                                let batch2_cursor = batch2.cursor();
494                                todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
495                            }
496
497                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
498                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
499                            // able to just assume the most recent `batch2.upper`
500                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
501                            acknowledged2.clone_from(batch2.upper());
502                        }
503                    }
504                }
505                else { panic!("`trace1_option` dropped before `input2` emptied!"); }
506            });
507
508            // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
509            if let Some(trace1) = trace1_option.as_mut() {
510                trace1.advance_upper(&mut acknowledged1);
511            }
512            if let Some(trace2) = trace2_option.as_mut() {
513                trace2.advance_upper(&mut acknowledged2);
514            }
515
516            // 2. Join computation.
517            //
518            // For each of the inputs, we do some amount of work (measured in terms of number
519            // of output records produced). This is meant to yield control to allow downstream
520            // operators to consume and reduce the output, but it it also means to provide some
521            // degree of responsiveness. There is a potential risk here that if we fall behind
522            // then the increasing queues hold back physical compaction of the underlying traces
523            // which results in unintentionally quadratic processing time (each batch of either
524            // input must scan all batches from the other input).
525
526            // Perform some amount of outstanding work.
527            let mut fuel = 1_000_000;
528            while !todo1.is_empty() && fuel > 0 {
529                todo1.front_mut().unwrap().work(
530                    output,
531                    |k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c),
532                    &mut fuel
533                );
534                if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
535            }
536
537            // Perform some amount of outstanding work.
538            let mut fuel = 1_000_000;
539            while !todo2.is_empty() && fuel > 0 {
540                todo2.front_mut().unwrap().work(
541                    output,
542                    |k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c),
543                    &mut fuel
544                );
545                if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
546            }
547
548            // Re-activate operator if work remains.
549            if !todo1.is_empty() || !todo2.is_empty() {
550                activator.activate();
551            }
552
553            // 3. Trace maintenance.
554            //
555            // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
556            // the progress of an input, because should we ever drop one of the traces we will
557            // lose the ability to extract information from anything other than the input.
558            // For example, if we dropped `trace2` we would not be able to use `advance_upper`
559            // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
560            // compaction of `trace1`.
561
562            // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
563            if let Some(trace1) = trace1_option.as_mut() {
564                if input2.frontier().is_empty() { trace1_option = None; }
565                else {
566                    // Allow `trace1` to compact logically up to the frontier we may yet receive,
567                    // in the opposing input (`input2`). All `input2` times will be beyond this
568                    // frontier, and joined times only need to be accurate when advanced to it.
569                    trace1.set_logical_compaction(input2.frontier().frontier());
570                    // Allow `trace1` to compact physically up to the upper bound of batches we
571                    // have received in its input (`input1`). We will not require a cursor that
572                    // is not beyond this bound.
573                    trace1.set_physical_compaction(acknowledged1.borrow());
574                }
575            }
576
577            // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
578            if let Some(trace2) = trace2_option.as_mut() {
579                if input1.frontier().is_empty() { trace2_option = None;}
580                else {
581                    // Allow `trace2` to compact logically up to the frontier we may yet receive,
582                    // in the opposing input (`input1`). All `input1` times will be beyond this
583                    // frontier, and joined times only need to be accurate when advanced to it.
584                    trace2.set_logical_compaction(input1.frontier().frontier());
585                    // Allow `trace2` to compact physically up to the upper bound of batches we
586                    // have received in its input (`input2`). We will not require a cursor that
587                    // is not beyond this bound.
588                    trace2.set_physical_compaction(acknowledged2.borrow());
589                }
590            }
591        }
592    })
593}
594
595
596/// Deferred join computation.
597///
598/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
599/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
600/// dataflow system a chance to run operators that can consume and aggregate the data.
601struct Deferred<T, C1, C2>
602where
603    T: Timestamp+Lattice+Ord,
604    C1: Cursor<Time=T>,
605    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
606{
607    trace: C1,
608    trace_storage: C1::Storage,
609    batch: C2,
610    batch_storage: C2::Storage,
611    capability: Capability<T>,
612    done: bool,
613}
614
615impl<T, C1, C2> Deferred<T, C1, C2>
616where
617    C1: Cursor<Time=T>,
618    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
619    T: Timestamp+Lattice+Ord,
620{
621    fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
622        Deferred {
623            trace,
624            trace_storage,
625            batch,
626            batch_storage,
627            capability,
628            done: false,
629        }
630    }
631
632    fn work_remains(&self) -> bool {
633        !self.done
634    }
635
636    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
637    #[inline(never)]
638    fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputHandleCore<T, EffortBuilder<CB>, Tee<T, CB::Container>>, mut logic: L, fuel: &mut usize)
639    where
640        L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, CB::Container>),
641    {
642
643        let meet = self.capability.time();
644
645        let mut effort = 0;
646        let mut session = output.session_with_builder(&self.capability);
647
648        let trace_storage = &self.trace_storage;
649        let batch_storage = &self.batch_storage;
650
651        let trace = &mut self.trace;
652        let batch = &mut self.batch;
653
654        let mut thinker = JoinThinker::new();
655
656        while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {
657
658            match trace_key.cmp(&batch_key) {
659                Ordering::Less => trace.seek_key(trace_storage, batch_key),
660                Ordering::Greater => batch.seek_key(batch_storage, trace_key),
661                Ordering::Equal => {
662
663                    thinker.history1.edits.load(trace, trace_storage, |time| {
664                        let mut time = C1::owned_time(time);
665                        time.join_assign(meet);
666                        time
667                    });
668                    thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
669
670                    // populate `temp` with the results in the best way we know how.
671                    thinker.think(|v1,v2,t,r1,r2| {
672                        logic(batch_key, v1, v2, &t, r1, r2, &mut session);
673                    });
674
675                    // TODO: Effort isn't perfectly tracked as we might still have some data in the
676                    // session at the moment it's dropped.
677                    effort += session.builder().0.take();
678                    batch.step_key(batch_storage);
679                    trace.step_key(trace_storage);
680
681                    thinker.history1.clear();
682                    thinker.history2.clear();
683                }
684            }
685        }
686        self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
687
688        if effort > *fuel { *fuel = 0; }
689        else              { *fuel -= effort; }
690    }
691}
692
693struct JoinThinker<'a, C1, C2>
694where
695    C1: Cursor,
696    C2: Cursor<Time = C1::Time>,
697{
698    pub history1: ValueHistory<'a, C1>,
699    pub history2: ValueHistory<'a, C2>,
700}
701
702impl<'a, C1, C2> JoinThinker<'a, C1, C2>
703where
704    C1: Cursor,
705    C2: Cursor<Time = C1::Time>,
706{
707    fn new() -> Self {
708        JoinThinker {
709            history1: ValueHistory::new(),
710            history2: ValueHistory::new(),
711        }
712    }
713
714    fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
715
716        // for reasonably sized edits, do the dead-simple thing.
717        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
718            self.history1.edits.map(|v1, t1, d1| {
719                self.history2.edits.map(|v2, t2, d2| {
720                    results(v1, v2, t1.join(t2), d1, d2);
721                })
722            })
723        }
724        else {
725
726            let mut replay1 = self.history1.replay();
727            let mut replay2 = self.history2.replay();
728
729            // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
730            //       in here. If a time is ever repeated, for example, the call will be identical
731            //       and accomplish nothing. If only a single record has been added, it may not
732            //       be worth the time to collapse (advance, re-sort) the data when a linear scan
733            //       is sufficient.
734
735            while !replay1.is_done() && !replay2.is_done() {
736
737                if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
738                    replay2.advance_buffer_by(replay1.meet().unwrap());
739                    for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
740                        let (val1, time1, diff1) = replay1.edit().unwrap();
741                        results(val1, val2, time1.join(time2), diff1, diff2);
742                    }
743                    replay1.step();
744                }
745                else {
746                    replay1.advance_buffer_by(replay2.meet().unwrap());
747                    for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
748                        let (val2, time2, diff2) = replay2.edit().unwrap();
749                        results(val1, val2, time1.join(time2), diff1, diff2);
750                    }
751                    replay2.step();
752                }
753            }
754
755            while !replay1.is_done() {
756                replay2.advance_buffer_by(replay1.meet().unwrap());
757                for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
758                    let (val1, time1, diff1) = replay1.edit().unwrap();
759                    results(val1, val2, time1.join(time2), diff1, diff2);
760                }
761                replay1.step();
762            }
763            while !replay2.is_done() {
764                replay1.advance_buffer_by(replay2.meet().unwrap());
765                for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
766                    let (val2, time2, diff2) = replay2.edit().unwrap();
767                    results(val1, val2, time1.join(time2), diff1, diff2);
768                }
769                replay2.step();
770            }
771        }
772    }
773}