differential_dataflow/operators/join.rs
1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7
8use timely::{Accountable, ContainerBuilder};
9use timely::container::PushInto;
10use timely::order::PartialOrder;
11use timely::progress::Timestamp;
12use timely::dataflow::{Scope, StreamCore};
13use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::operators::Capability;
16
17use crate::hashable::Hashable;
18use crate::{Data, ExchangeData, VecCollection};
19use crate::difference::{Semigroup, Abelian, Multiply};
20use crate::lattice::Lattice;
21use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};
22use crate::trace::{BatchReader, Cursor};
23use crate::operators::ValueHistory;
24
25use crate::trace::TraceReader;
26
27/// Join implementations for `(key,val)` data.
28pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
29
30 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
31 ///
32 /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use differential_dataflow::input::Input;
38 /// use differential_dataflow::operators::Join;
39 ///
40 /// ::timely::example(|scope| {
41 ///
42 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
43 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
44 /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
45 ///
46 /// x.join(&y)
47 /// .assert_eq(&z);
48 /// });
49 /// ```
50 fn join<V2, R2>(&self, other: &VecCollection<G, (K,V2), R2>) -> VecCollection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
51 where
52 K: ExchangeData,
53 V2: ExchangeData,
54 R2: ExchangeData+Semigroup,
55 R: Multiply<R2, Output: Semigroup+'static>,
56 {
57 self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
58 }
59
60 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
61 ///
62 /// # Examples
63 ///
64 /// ```
65 /// use differential_dataflow::input::Input;
66 /// use differential_dataflow::operators::Join;
67 ///
68 /// ::timely::example(|scope| {
69 ///
70 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
71 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
72 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
73 ///
74 /// x.join_map(&y, |_key, &a, &b| (a,b))
75 /// .assert_eq(&z);
76 /// });
77 /// ```
78 fn join_map<V2, R2, D, L>(&self, other: &VecCollection<G, (K,V2), R2>, logic: L) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
79 where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output: Semigroup+'static>, D: Data, L: FnMut(&K, &V, &V2)->D+'static;
80
81 /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
82 ///
83 /// When the second collection contains frequencies that are either zero or one this is the more traditional
84 /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
85 /// the counts of the records in the first input.
86 ///
87 /// # Examples
88 ///
89 /// ```
90 /// use differential_dataflow::input::Input;
91 /// use differential_dataflow::operators::Join;
92 ///
93 /// ::timely::example(|scope| {
94 ///
95 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
96 /// let y = scope.new_collection_from(vec![0, 2]).1;
97 /// let z = scope.new_collection_from(vec![(0, 1)]).1;
98 ///
99 /// x.semijoin(&y)
100 /// .assert_eq(&z);
101 /// });
102 /// ```
103 fn semijoin<R2>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
104 where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output: Semigroup+'static>;
105
106 /// Subtracts the semijoin with `other` from `self`.
107 ///
108 /// In the case that `other` has multiplicities zero or one this results
109 /// in a relational antijoin, in which we discard input records whose key
110 /// is present in `other`. If the multiplicities could be other than zero
111 /// or one, the semantic interpretation of this operator is less clear.
112 ///
113 /// In almost all cases, you should ensure that `other` has multiplicities
114 /// that are zero or one, perhaps by using the `distinct` operator.
115 ///
116 /// # Examples
117 ///
118 /// ```
119 /// use differential_dataflow::input::Input;
120 /// use differential_dataflow::operators::Join;
121 ///
122 /// ::timely::example(|scope| {
123 ///
124 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
125 /// let y = scope.new_collection_from(vec![0, 2]).1;
126 /// let z = scope.new_collection_from(vec![(1, 3)]).1;
127 ///
128 /// x.antijoin(&y)
129 /// .assert_eq(&z);
130 /// });
131 /// ```
132 fn antijoin<R2>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), R>
133 where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output = R>, R: Abelian+'static;
134}
135
136impl<G, K, V, R> Join<G, K, V, R> for VecCollection<G, (K, V), R>
137where
138 G: Scope<Timestamp: Lattice+Ord>,
139 K: ExchangeData+Hashable,
140 V: ExchangeData,
141 R: ExchangeData+Semigroup,
142{
143 fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &VecCollection<G, (K, V2), R2>, mut logic: L) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
144 where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
145 let arranged1 = self.arrange_by_key();
146 let arranged2 = other.arrange_by_key();
147 arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
148 }
149
150 fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
151 where R: Multiply<R2, Output: Semigroup+'static> {
152 let arranged1 = self.arrange_by_key();
153 let arranged2 = other.arrange_by_self();
154 arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
155 }
156
157 fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), R>
158 where R: Multiply<R2, Output=R>, R: Abelian+'static {
159 self.concat(&self.semijoin(other).negate())
160 }
161}
162
163impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
164where
165 G: Scope<Timestamp=Tr::Time>,
166 Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>+Clone+'static,
167 K: ExchangeData+Hashable,
168 V: Data + 'static,
169{
170 fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &VecCollection<G, (K, V2), R2>, mut logic: L) -> VecCollection<G, D, <Tr::Diff as Multiply<R2>>::Output>
171 where
172 Tr::Diff: Multiply<R2, Output: Semigroup+'static>,
173 L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
174 {
175 let arranged2 = other.arrange_by_key();
176 self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
177 }
178
179 fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
180 where Tr::Diff: Multiply<R2, Output: Semigroup+'static> {
181 let arranged2 = other.arrange_by_self();
182 self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
183 }
184
185 fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), Tr::Diff>
186 where Tr::Diff: Multiply<R2, Output=Tr::Diff>, Tr::Diff: Abelian+'static {
187 self.as_collection(|k,v| (k.clone(), v.clone()))
188 .concat(&self.semijoin(other).negate())
189 }
190}
191
192/// Matches the elements of two arranged traces.
193///
194/// This method is used by the various `join` implementations, but it can also be used
195/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
196/// the arrangement is available for re-use, or from the output of a `reduce` operator.
197pub trait JoinCore<G: Scope<Timestamp: Lattice+Ord>, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> {
198
199 /// Joins two arranged collections with the same key type.
200 ///
201 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
202 /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
203 /// every value returned by the iterator.
204 ///
205 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
206 /// contains the implementations for collections.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use differential_dataflow::input::Input;
212 /// use differential_dataflow::operators::arrange::ArrangeByKey;
213 /// use differential_dataflow::operators::join::JoinCore;
214 /// use differential_dataflow::trace::Trace;
215 ///
216 /// ::timely::example(|scope| {
217 ///
218 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
219 /// .arrange_by_key();
220 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
221 /// .arrange_by_key();
222 ///
223 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
224 ///
225 /// x.join_core(&y, |_key, &a, &b| Some((a, b)))
226 /// .assert_eq(&z);
227 /// });
228 /// ```
229 fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> VecCollection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
230 where
231 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
232 R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
233 I: IntoIterator<Item: Data>,
234 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
235 ;
236
237 /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
238 /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
239 /// flexibility, but is more error-prone.
240 ///
241 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
242 /// which produces something implementing `IntoIterator`, where the output collection will have an entry
243 /// for every value returned by the iterator.
244 ///
245 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
246 /// contains the implementations for collections.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use differential_dataflow::input::Input;
252 /// use differential_dataflow::operators::arrange::ArrangeByKey;
253 /// use differential_dataflow::operators::join::JoinCore;
254 /// use differential_dataflow::trace::Trace;
255 ///
256 /// ::timely::example(|scope| {
257 ///
258 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
259 /// .arrange_by_key();
260 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
261 /// .arrange_by_key();
262 ///
263 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
264 ///
265 /// // Returned values have weight `a`
266 /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
267 /// .assert_eq(&z);
268 /// });
269 /// ```
270 fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> VecCollection<G,D,ROut>
271 where
272 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
273 D: Data,
274 ROut: Semigroup+'static,
275 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
276 L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
277 ;
278}
279
280
281impl<G, K, V, R> JoinCore<G, K, V, R> for VecCollection<G, (K, V), R>
282where
283 G: Scope<Timestamp: Lattice+Ord>,
284 K: ExchangeData+Hashable,
285 V: ExchangeData,
286 R: ExchangeData+Semigroup,
287{
288 fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> VecCollection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
289 where
290 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
291 R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
292 I: IntoIterator<Item: Data>,
293 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
294 {
295 self.arrange_by_key()
296 .join_core(stream2, result)
297 }
298
299 fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> VecCollection<G,D,ROut>
300 where
301 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
302 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
303 L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
304 D: Data,
305 ROut: Semigroup+'static,
306 {
307 self.arrange_by_key().join_core_internal_unsafe(stream2, result)
308 }
309}
310
311/// The session passed to join closures.
312pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder<CB>, CT>;
313
314/// A container builder that tracks the length of outputs to estimate the effort of join closures.
315#[derive(Default, Debug)]
316pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
317
318impl<CB: ContainerBuilder> timely::container::ContainerBuilder for EffortBuilder<CB> {
319 type Container = CB::Container;
320
321 #[inline]
322 fn extract(&mut self) -> Option<&mut Self::Container> {
323 let extracted = self.1.extract();
324 self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.record_count() as usize));
325 extracted
326 }
327
328 #[inline]
329 fn finish(&mut self) -> Option<&mut Self::Container> {
330 let finished = self.1.finish();
331 self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.record_count() as usize));
332 finished
333 }
334}
335
336impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
337 #[inline]
338 fn push_into(&mut self, item: D) {
339 self.1.push_into(item);
340 }
341}
342
343/// An equijoin of two traces, sharing a common key type.
344///
345/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
346/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
347/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
348/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
349///
350/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
351/// output stream in a collection.
352///
353/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
354///
355/// [`AsCollection`]: crate::collection::AsCollection
356pub fn join_traces<G, T1, T2, L, CB>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> StreamCore<G, CB::Container>
357where
358 G: Scope<Timestamp=T1::Time>,
359 T1: TraceReader+Clone+'static,
360 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
361 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession<T1::Time, CB, Capability<T1::Time>>)+'static,
362 CB: ContainerBuilder,
363{
364 // Rename traces for symmetry from here on out.
365 let mut trace1 = arranged1.trace.clone();
366 let mut trace2 = arranged2.trace.clone();
367
368 arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
369
370 // Acquire an activator to reschedule the operator when it has unfinished work.
371 use timely::scheduling::Activator;
372 let activations = arranged1.stream.scope().activations().clone();
373 let activator = Activator::new(info.address, activations);
374
375 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
376 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
377 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
378 // initial work for the two traces, and before the operator is constructed.
379
380 // Acknowledged frontier for each input.
381 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
382 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
383 // the physical compaction frontier of their corresponding trace.
384 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
385 use timely::progress::frontier::Antichain;
386 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
387 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
388
389 // deferred work of batches from each input.
390 let mut todo1 = std::collections::VecDeque::new();
391 let mut todo2 = std::collections::VecDeque::new();
392
393 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
394 trace1.map_batches(|batch1| {
395 acknowledged1.clone_from(batch1.upper());
396 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
397 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
398 // Once we start streaming batches in, we will need to respond to new batches from
399 // `input1` with logic that would have otherwise been here. Check out the next loop
400 // for the structure.
401 });
402 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
403 // iterating through batches and capturing the upper bound. This is a great moment to assert that
404 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
405 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
406 assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
407
408 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
409 // on both traces at the same time, as they could be the same trace and this would panic.
410 let mut batch2_cursors = Vec::new();
411 trace2.map_batches(|batch2| {
412 acknowledged2.clone_from(batch2.upper());
413 batch2_cursors.push((batch2.cursor(), batch2.clone()));
414 });
415 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
416 // iterating through batches and capturing the upper bound. This is a great moment to assert that
417 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
418 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
419 assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
420
421 // Load up deferred work using trace2 cursors and batches captured just above.
422 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
423 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
424 let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
425 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
426 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
427 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
428 // that property.
429 todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
430 }
431
432 // Droppable handles to shared trace data structures.
433 let mut trace1_option = Some(trace1);
434 let mut trace2_option = Some(trace2);
435
436 move |(input1, frontier1), (input2, frontier2), output| {
437
438 // 1. Consuming input.
439 //
440 // The join computation repeatedly accepts batches of updates from each of its inputs.
441 //
442 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
443 // updates from its other input. It is important to track which updates have been accepted, because
444 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
445 //
446 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
447 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
448 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
449 // in the absence of timely dataflow capabilities.
450
451 // Drain input 1, prepare work.
452 input1.for_each(|capability, data| {
453 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
454 if let Some(ref mut trace2) = trace2_option {
455 let capability = capability.retain();
456 for batch1 in data.drain(..) {
457 // Ignore any pre-loaded data.
458 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
459 if !batch1.is_empty() {
460 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
461 // at start-up, and have held back physical compaction ever since.
462 let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
463 let batch1_cursor = batch1.cursor();
464 todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
465 }
466
467 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
468 // may have skipped over empty batches. Still, the batches are in-order, and we should be
469 // able to just assume the most recent `batch1.upper`
470 debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
471 acknowledged1.clone_from(batch1.upper());
472 }
473 }
474 }
475 else { panic!("`trace2_option` dropped before `input1` emptied!"); }
476 });
477
478 // Drain input 2, prepare work.
479 input2.for_each(|capability, data| {
480 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
481 if let Some(ref mut trace1) = trace1_option {
482 let capability = capability.retain();
483 for batch2 in data.drain(..) {
484 // Ignore any pre-loaded data.
485 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
486 if !batch2.is_empty() {
487 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
488 // at start-up, and have held back physical compaction ever since.
489 let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
490 let batch2_cursor = batch2.cursor();
491 todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
492 }
493
494 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
495 // may have skipped over empty batches. Still, the batches are in-order, and we should be
496 // able to just assume the most recent `batch2.upper`
497 debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
498 acknowledged2.clone_from(batch2.upper());
499 }
500 }
501 }
502 else { panic!("`trace1_option` dropped before `input2` emptied!"); }
503 });
504
505 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
506 if let Some(trace1) = trace1_option.as_mut() {
507 trace1.advance_upper(&mut acknowledged1);
508 }
509 if let Some(trace2) = trace2_option.as_mut() {
510 trace2.advance_upper(&mut acknowledged2);
511 }
512
513 // 2. Join computation.
514 //
515 // For each of the inputs, we do some amount of work (measured in terms of number
516 // of output records produced). This is meant to yield control to allow downstream
517 // operators to consume and reduce the output, but it it also means to provide some
518 // degree of responsiveness. There is a potential risk here that if we fall behind
519 // then the increasing queues hold back physical compaction of the underlying traces
520 // which results in unintentionally quadratic processing time (each batch of either
521 // input must scan all batches from the other input).
522
523 // Perform some amount of outstanding work.
524 let mut fuel = 1_000_000;
525 while !todo1.is_empty() && fuel > 0 {
526 todo1.front_mut().unwrap().work(
527 output,
528 |k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c),
529 &mut fuel
530 );
531 if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
532 }
533
534 // Perform some amount of outstanding work.
535 let mut fuel = 1_000_000;
536 while !todo2.is_empty() && fuel > 0 {
537 todo2.front_mut().unwrap().work(
538 output,
539 |k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c),
540 &mut fuel
541 );
542 if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
543 }
544
545 // Re-activate operator if work remains.
546 if !todo1.is_empty() || !todo2.is_empty() {
547 activator.activate();
548 }
549
550 // 3. Trace maintenance.
551 //
552 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
553 // the progress of an input, because should we ever drop one of the traces we will
554 // lose the ability to extract information from anything other than the input.
555 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
556 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
557 // compaction of `trace1`.
558
559 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
560 if let Some(trace1) = trace1_option.as_mut() {
561 if frontier2.is_empty() { trace1_option = None; }
562 else {
563 // Allow `trace1` to compact logically up to the frontier we may yet receive,
564 // in the opposing input (`input2`). All `input2` times will be beyond this
565 // frontier, and joined times only need to be accurate when advanced to it.
566 trace1.set_logical_compaction(frontier2.frontier());
567 // Allow `trace1` to compact physically up to the upper bound of batches we
568 // have received in its input (`input1`). We will not require a cursor that
569 // is not beyond this bound.
570 trace1.set_physical_compaction(acknowledged1.borrow());
571 }
572 }
573
574 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
575 if let Some(trace2) = trace2_option.as_mut() {
576 if frontier1.is_empty() { trace2_option = None;}
577 else {
578 // Allow `trace2` to compact logically up to the frontier we may yet receive,
579 // in the opposing input (`input1`). All `input1` times will be beyond this
580 // frontier, and joined times only need to be accurate when advanced to it.
581 trace2.set_logical_compaction(frontier1.frontier());
582 // Allow `trace2` to compact physically up to the upper bound of batches we
583 // have received in its input (`input2`). We will not require a cursor that
584 // is not beyond this bound.
585 trace2.set_physical_compaction(acknowledged2.borrow());
586 }
587 }
588 }
589 })
590}
591
592
593/// Deferred join computation.
594///
595/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
596/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
597/// dataflow system a chance to run operators that can consume and aggregate the data.
598struct Deferred<T, C1, C2>
599where
600 T: Timestamp+Lattice+Ord,
601 C1: Cursor<Time=T>,
602 C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
603{
604 trace: C1,
605 trace_storage: C1::Storage,
606 batch: C2,
607 batch_storage: C2::Storage,
608 capability: Capability<T>,
609 done: bool,
610}
611
612impl<T, C1, C2> Deferred<T, C1, C2>
613where
614 C1: Cursor<Time=T>,
615 C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
616 T: Timestamp+Lattice+Ord,
617{
618 fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
619 Deferred {
620 trace,
621 trace_storage,
622 batch,
623 batch_storage,
624 capability,
625 done: false,
626 }
627 }
628
629 fn work_remains(&self) -> bool {
630 !self.done
631 }
632
633 /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
634 #[inline(never)]
635 fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputBuilderSession<T, EffortBuilder<CB>>, mut logic: L, fuel: &mut usize)
636 where
637 L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, Capability<T>>),
638 {
639
640 let meet = self.capability.time();
641
642 let mut effort = 0;
643 let mut session = output.session_with_builder(&self.capability);
644
645 let trace_storage = &self.trace_storage;
646 let batch_storage = &self.batch_storage;
647
648 let trace = &mut self.trace;
649 let batch = &mut self.batch;
650
651 let mut thinker = JoinThinker::new();
652
653 while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {
654
655 match trace_key.cmp(&batch_key) {
656 Ordering::Less => trace.seek_key(trace_storage, batch_key),
657 Ordering::Greater => batch.seek_key(batch_storage, trace_key),
658 Ordering::Equal => {
659
660 thinker.history1.edits.load(trace, trace_storage, |time| {
661 let mut time = C1::owned_time(time);
662 time.join_assign(meet);
663 time
664 });
665 thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
666
667 // populate `temp` with the results in the best way we know how.
668 thinker.think(|v1,v2,t,r1,r2| {
669 logic(batch_key, v1, v2, &t, r1, r2, &mut session);
670 });
671
672 // TODO: Effort isn't perfectly tracked as we might still have some data in the
673 // session at the moment it's dropped.
674 effort += session.builder().0.take();
675 batch.step_key(batch_storage);
676 trace.step_key(trace_storage);
677
678 thinker.history1.clear();
679 thinker.history2.clear();
680 }
681 }
682 }
683 self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
684
685 if effort > *fuel { *fuel = 0; }
686 else { *fuel -= effort; }
687 }
688}
689
690struct JoinThinker<'a, C1, C2>
691where
692 C1: Cursor,
693 C2: Cursor<Time = C1::Time>,
694{
695 pub history1: ValueHistory<'a, C1>,
696 pub history2: ValueHistory<'a, C2>,
697}
698
699impl<'a, C1, C2> JoinThinker<'a, C1, C2>
700where
701 C1: Cursor,
702 C2: Cursor<Time = C1::Time>,
703{
704 fn new() -> Self {
705 JoinThinker {
706 history1: ValueHistory::new(),
707 history2: ValueHistory::new(),
708 }
709 }
710
711 fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
712
713 // for reasonably sized edits, do the dead-simple thing.
714 if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
715 self.history1.edits.map(|v1, t1, d1| {
716 self.history2.edits.map(|v2, t2, d2| {
717 results(v1, v2, t1.join(t2), d1, d2);
718 })
719 })
720 }
721 else {
722
723 let mut replay1 = self.history1.replay();
724 let mut replay2 = self.history2.replay();
725
726 // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
727 // in here. If a time is ever repeated, for example, the call will be identical
728 // and accomplish nothing. If only a single record has been added, it may not
729 // be worth the time to collapse (advance, re-sort) the data when a linear scan
730 // is sufficient.
731
732 while !replay1.is_done() && !replay2.is_done() {
733
734 if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
735 replay2.advance_buffer_by(replay1.meet().unwrap());
736 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
737 let (val1, time1, diff1) = replay1.edit().unwrap();
738 results(val1, val2, time1.join(time2), diff1, diff2);
739 }
740 replay1.step();
741 }
742 else {
743 replay1.advance_buffer_by(replay2.meet().unwrap());
744 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
745 let (val2, time2, diff2) = replay2.edit().unwrap();
746 results(val1, val2, time1.join(time2), diff1, diff2);
747 }
748 replay2.step();
749 }
750 }
751
752 while !replay1.is_done() {
753 replay2.advance_buffer_by(replay1.meet().unwrap());
754 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
755 let (val1, time1, diff1) = replay1.edit().unwrap();
756 results(val1, val2, time1.join(time2), diff1, diff2);
757 }
758 replay1.step();
759 }
760 while !replay2.is_done() {
761 replay1.advance_buffer_by(replay2.meet().unwrap());
762 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
763 let (val2, time2, diff2) = replay2.edit().unwrap();
764 results(val1, val2, time1.join(time2), diff1, diff2);
765 }
766 replay2.step();
767 }
768 }
769 }
770}