differential_dataflow/operators/join.rs
1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7use timely::Container;
8
9use timely::container::{ContainerBuilder, PushInto};
10use timely::order::PartialOrder;
11use timely::progress::Timestamp;
12use timely::dataflow::{Scope, StreamCore};
13use timely::dataflow::operators::generic::{Operator, OutputHandleCore};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::channels::pushers::buffer::Session;
16use timely::dataflow::channels::pushers::Counter;
17use timely::dataflow::operators::Capability;
18use timely::dataflow::channels::pushers::tee::Tee;
19
20use crate::hashable::Hashable;
21use crate::{Data, ExchangeData, Collection};
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};
25use crate::trace::{BatchReader, Cursor};
26use crate::operators::ValueHistory;
27
28use crate::trace::TraceReader;
29
30/// Join implementations for `(key,val)` data.
31pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
32
33 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
34 ///
35 /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
36 ///
37 /// # Examples
38 ///
39 /// ```
40 /// use differential_dataflow::input::Input;
41 /// use differential_dataflow::operators::Join;
42 ///
43 /// ::timely::example(|scope| {
44 ///
45 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
46 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
47 /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
48 ///
49 /// x.join(&y)
50 /// .assert_eq(&z);
51 /// });
52 /// ```
53 fn join<V2, R2>(&self, other: &Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
54 where
55 K: ExchangeData,
56 V2: ExchangeData,
57 R2: ExchangeData+Semigroup,
58 R: Multiply<R2>,
59 <R as Multiply<R2>>::Output: Semigroup+'static
60 {
61 self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
62 }
63
64 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
65 ///
66 /// # Examples
67 ///
68 /// ```
69 /// use differential_dataflow::input::Input;
70 /// use differential_dataflow::operators::Join;
71 ///
72 /// ::timely::example(|scope| {
73 ///
74 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
75 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
76 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
77 ///
78 /// x.join_map(&y, |_key, &a, &b| (a,b))
79 /// .assert_eq(&z);
80 /// });
81 /// ```
82 fn join_map<V2, R2, D, L>(&self, other: &Collection<G, (K,V2), R2>, logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
83 where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup+'static, D: Data, L: FnMut(&K, &V, &V2)->D+'static;
84
85 /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
86 ///
87 /// When the second collection contains frequencies that are either zero or one this is the more traditional
88 /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
89 /// the counts of the records in the first input.
90 ///
91 /// # Examples
92 ///
93 /// ```
94 /// use differential_dataflow::input::Input;
95 /// use differential_dataflow::operators::Join;
96 ///
97 /// ::timely::example(|scope| {
98 ///
99 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
100 /// let y = scope.new_collection_from(vec![0, 2]).1;
101 /// let z = scope.new_collection_from(vec![(0, 1)]).1;
102 ///
103 /// x.semijoin(&y)
104 /// .assert_eq(&z);
105 /// });
106 /// ```
107 fn semijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
108 where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup+'static;
109
110 /// Subtracts the semijoin with `other` from `self`.
111 ///
112 /// In the case that `other` has multiplicities zero or one this results
113 /// in a relational antijoin, in which we discard input records whose key
114 /// is present in `other`. If the multiplicities could be other than zero
115 /// or one, the semantic interpretation of this operator is less clear.
116 ///
117 /// In almost all cases, you should ensure that `other` has multiplicities
118 /// that are zero or one, perhaps by using the `distinct` operator.
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// use differential_dataflow::input::Input;
124 /// use differential_dataflow::operators::Join;
125 ///
126 /// ::timely::example(|scope| {
127 ///
128 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
129 /// let y = scope.new_collection_from(vec![0, 2]).1;
130 /// let z = scope.new_collection_from(vec![(1, 3)]).1;
131 ///
132 /// x.antijoin(&y)
133 /// .assert_eq(&z);
134 /// });
135 /// ```
136 fn antijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
137 where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output = R>, R: Abelian+'static;
138}
139
140impl<G, K, V, R> Join<G, K, V, R> for Collection<G, (K, V), R>
141where
142 G: Scope,
143 K: ExchangeData+Hashable,
144 V: ExchangeData,
145 R: ExchangeData+Semigroup,
146 G::Timestamp: Lattice+Ord,
147{
148 fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
149 where R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup+'static, L: FnMut(&K, &V, &V2)->D+'static {
150 let arranged1 = self.arrange_by_key();
151 let arranged2 = other.arrange_by_key();
152 arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
153 }
154
155 fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
156 where R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup+'static {
157 let arranged1 = self.arrange_by_key();
158 let arranged2 = other.arrange_by_self();
159 arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
160 }
161
162 fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
163 where R: Multiply<R2, Output=R>, R: Abelian+'static {
164 self.concat(&self.semijoin(other).negate())
165 }
166}
167
168impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
169where
170 G: Scope<Timestamp=Tr::Time>,
171 Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V>+Clone+'static,
172 K: ExchangeData+Hashable,
173 V: Data + 'static,
174{
175 fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
176 where
177 Tr::Diff: Multiply<R2>,
178 <Tr::Diff as Multiply<R2>>::Output: Semigroup+'static,
179 L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
180 {
181 let arranged2 = other.arrange_by_key();
182 self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
183 }
184
185 fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
186 where Tr::Diff: Multiply<R2>, <Tr::Diff as Multiply<R2>>::Output: Semigroup+'static {
187 let arranged2 = other.arrange_by_self();
188 self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
189 }
190
191 fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), Tr::Diff>
192 where Tr::Diff: Multiply<R2, Output=Tr::Diff>, Tr::Diff: Abelian+'static {
193 self.as_collection(|k,v| (k.clone(), v.clone()))
194 .concat(&self.semijoin(other).negate())
195 }
196}
197
198/// Matches the elements of two arranged traces.
199///
200/// This method is used by the various `join` implementations, but it can also be used
201/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
202/// the arrangement is available for re-use, or from the output of a `reduce` operator.
203pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> where G::Timestamp: Lattice+Ord {
204
205 /// Joins two arranged collections with the same key type.
206 ///
207 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
208 /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
209 /// every value returned by the iterator.
210 ///
211 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
212 /// contains the implementations for collections.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use differential_dataflow::input::Input;
218 /// use differential_dataflow::operators::arrange::ArrangeByKey;
219 /// use differential_dataflow::operators::join::JoinCore;
220 /// use differential_dataflow::trace::Trace;
221 ///
222 /// ::timely::example(|scope| {
223 ///
224 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
225 /// .arrange_by_key();
226 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
227 /// .arrange_by_key();
228 ///
229 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
230 ///
231 /// x.join_core(&y, |_key, &a, &b| Some((a, b)))
232 /// .assert_eq(&z);
233 /// });
234 /// ```
235 fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
236 where
237 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
238 R: Multiply<Tr2::Diff>,
239 <R as Multiply<Tr2::Diff>>::Output: Semigroup+'static,
240 I: IntoIterator,
241 I::Item: Data,
242 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
243 ;
244
245 /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
246 /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
247 /// flexibility, but is more error-prone.
248 ///
249 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
250 /// which produces something implementing `IntoIterator`, where the output collection will have an entry
251 /// for every value returned by the iterator.
252 ///
253 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
254 /// contains the implementations for collections.
255 ///
256 /// # Examples
257 ///
258 /// ```
259 /// use differential_dataflow::input::Input;
260 /// use differential_dataflow::operators::arrange::ArrangeByKey;
261 /// use differential_dataflow::operators::join::JoinCore;
262 /// use differential_dataflow::trace::Trace;
263 ///
264 /// ::timely::example(|scope| {
265 ///
266 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
267 /// .arrange_by_key();
268 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
269 /// .arrange_by_key();
270 ///
271 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
272 ///
273 /// // Returned values have weight `a`
274 /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
275 /// .assert_eq(&z);
276 /// });
277 /// ```
278 fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
279 where
280 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
281 D: Data,
282 ROut: Semigroup+'static,
283 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
284 L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
285 ;
286}
287
288
289impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>
290where
291 G: Scope,
292 K: ExchangeData+Hashable,
293 V: ExchangeData,
294 R: ExchangeData+Semigroup,
295 G::Timestamp: Lattice+Ord,
296{
297 fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
298 where
299 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
300 R: Multiply<Tr2::Diff>,
301 <R as Multiply<Tr2::Diff>>::Output: Semigroup+'static,
302 I: IntoIterator,
303 I::Item: Data,
304 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
305 {
306 self.arrange_by_key()
307 .join_core(stream2, result)
308 }
309
310 fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
311 where
312 Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
313 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
314 L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
315 D: Data,
316 ROut: Semigroup+'static,
317 {
318 self.arrange_by_key().join_core_internal_unsafe(stream2, result)
319 }
320}
321
322/// The session passed to join closures.
323pub type JoinSession<'a, T, CB, C> = Session<'a, T, EffortBuilder<CB>, Counter<T, C, Tee<T, C>>>;
324
325/// A container builder that tracks the length of outputs to estimate the effort of join closures.
326#[derive(Default, Debug)]
327pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
328
329impl<CB: ContainerBuilder> ContainerBuilder for EffortBuilder<CB> {
330 type Container = CB::Container;
331
332 #[inline]
333 fn extract(&mut self) -> Option<&mut Self::Container> {
334 let extracted = self.1.extract();
335 self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.len()));
336 extracted
337 }
338
339 #[inline]
340 fn finish(&mut self) -> Option<&mut Self::Container> {
341 let finished = self.1.finish();
342 self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.len()));
343 finished
344 }
345}
346
347impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
348 #[inline]
349 fn push_into(&mut self, item: D) {
350 self.1.push_into(item);
351 }
352}
353
354/// An equijoin of two traces, sharing a common key type.
355///
356/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
357/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
358/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
359/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
360///
361/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
362/// output stream in a collection.
363///
364/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
365///
366/// [`AsCollection`]: crate::collection::AsCollection
367pub fn join_traces<G, T1, T2, L, CB>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> StreamCore<G, CB::Container>
368where
369 G: Scope<Timestamp=T1::Time>,
370 T1: TraceReader+Clone+'static,
371 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
372 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff,&mut JoinSession<T1::Time, CB, CB::Container>)+'static,
373 CB: ContainerBuilder + 'static,
374{
375 // Rename traces for symmetry from here on out.
376 let mut trace1 = arranged1.trace.clone();
377 let mut trace2 = arranged2.trace.clone();
378
379 arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
380
381 // Acquire an activator to reschedule the operator when it has unfinished work.
382 use timely::scheduling::Activator;
383 let activations = arranged1.stream.scope().activations().clone();
384 let activator = Activator::new(info.address, activations);
385
386 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
387 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
388 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
389 // initial work for the two traces, and before the operator is constructed.
390
391 // Acknowledged frontier for each input.
392 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
393 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
394 // the physical compaction frontier of their corresponding trace.
395 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
396 use timely::progress::frontier::Antichain;
397 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
398 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
399
400 // deferred work of batches from each input.
401 let mut todo1 = std::collections::VecDeque::new();
402 let mut todo2 = std::collections::VecDeque::new();
403
404 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
405 trace1.map_batches(|batch1| {
406 acknowledged1.clone_from(batch1.upper());
407 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
408 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
409 // Once we start streaming batches in, we will need to respond to new batches from
410 // `input1` with logic that would have otherwise been here. Check out the next loop
411 // for the structure.
412 });
413 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
414 // iterating through batches and capturing the upper bound. This is a great moment to assert that
415 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
416 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
417 assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
418
419 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
420 // on both traces at the same time, as they could be the same trace and this would panic.
421 let mut batch2_cursors = Vec::new();
422 trace2.map_batches(|batch2| {
423 acknowledged2.clone_from(batch2.upper());
424 batch2_cursors.push((batch2.cursor(), batch2.clone()));
425 });
426 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
427 // iterating through batches and capturing the upper bound. This is a great moment to assert that
428 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
429 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
430 assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
431
432 // Load up deferred work using trace2 cursors and batches captured just above.
433 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
434 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
435 let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
436 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
437 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
438 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
439 // that property.
440 todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
441 }
442
443 // Droppable handles to shared trace data structures.
444 let mut trace1_option = Some(trace1);
445 let mut trace2_option = Some(trace2);
446
447 move |input1, input2, output| {
448
449 // 1. Consuming input.
450 //
451 // The join computation repeatedly accepts batches of updates from each of its inputs.
452 //
453 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
454 // updates from its other input. It is important to track which updates have been accepted, because
455 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
456 //
457 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
458 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
459 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
460 // in the absence of timely dataflow capabilities.
461
462 // Drain input 1, prepare work.
463 input1.for_each(|capability, data| {
464 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
465 if let Some(ref mut trace2) = trace2_option {
466 let capability = capability.retain();
467 for batch1 in data.drain(..) {
468 // Ignore any pre-loaded data.
469 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
470 if !batch1.is_empty() {
471 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
472 // at start-up, and have held back physical compaction ever since.
473 let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
474 let batch1_cursor = batch1.cursor();
475 todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
476 }
477
478 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
479 // may have skipped over empty batches. Still, the batches are in-order, and we should be
480 // able to just assume the most recent `batch1.upper`
481 debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
482 acknowledged1.clone_from(batch1.upper());
483 }
484 }
485 }
486 else { panic!("`trace2_option` dropped before `input1` emptied!"); }
487 });
488
489 // Drain input 2, prepare work.
490 input2.for_each(|capability, data| {
491 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
492 if let Some(ref mut trace1) = trace1_option {
493 let capability = capability.retain();
494 for batch2 in data.drain(..) {
495 // Ignore any pre-loaded data.
496 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
497 if !batch2.is_empty() {
498 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
499 // at start-up, and have held back physical compaction ever since.
500 let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
501 let batch2_cursor = batch2.cursor();
502 todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
503 }
504
505 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
506 // may have skipped over empty batches. Still, the batches are in-order, and we should be
507 // able to just assume the most recent `batch2.upper`
508 debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
509 acknowledged2.clone_from(batch2.upper());
510 }
511 }
512 }
513 else { panic!("`trace1_option` dropped before `input2` emptied!"); }
514 });
515
516 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
517 if let Some(trace1) = trace1_option.as_mut() {
518 trace1.advance_upper(&mut acknowledged1);
519 }
520 if let Some(trace2) = trace2_option.as_mut() {
521 trace2.advance_upper(&mut acknowledged2);
522 }
523
524 // 2. Join computation.
525 //
526 // For each of the inputs, we do some amount of work (measured in terms of number
527 // of output records produced). This is meant to yield control to allow downstream
528 // operators to consume and reduce the output, but it it also means to provide some
529 // degree of responsiveness. There is a potential risk here that if we fall behind
530 // then the increasing queues hold back physical compaction of the underlying traces
531 // which results in unintentionally quadratic processing time (each batch of either
532 // input must scan all batches from the other input).
533
534 // Perform some amount of outstanding work.
535 let mut fuel = 1_000_000;
536 while !todo1.is_empty() && fuel > 0 {
537 todo1.front_mut().unwrap().work(
538 output,
539 |k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c),
540 &mut fuel
541 );
542 if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
543 }
544
545 // Perform some amount of outstanding work.
546 let mut fuel = 1_000_000;
547 while !todo2.is_empty() && fuel > 0 {
548 todo2.front_mut().unwrap().work(
549 output,
550 |k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c),
551 &mut fuel
552 );
553 if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
554 }
555
556 // Re-activate operator if work remains.
557 if !todo1.is_empty() || !todo2.is_empty() {
558 activator.activate();
559 }
560
561 // 3. Trace maintenance.
562 //
563 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
564 // the progress of an input, because should we ever drop one of the traces we will
565 // lose the ability to extract information from anything other than the input.
566 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
567 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
568 // compaction of `trace1`.
569
570 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
571 if let Some(trace1) = trace1_option.as_mut() {
572 if input2.frontier().is_empty() { trace1_option = None; }
573 else {
574 // Allow `trace1` to compact logically up to the frontier we may yet receive,
575 // in the opposing input (`input2`). All `input2` times will be beyond this
576 // frontier, and joined times only need to be accurate when advanced to it.
577 trace1.set_logical_compaction(input2.frontier().frontier());
578 // Allow `trace1` to compact physically up to the upper bound of batches we
579 // have received in its input (`input1`). We will not require a cursor that
580 // is not beyond this bound.
581 trace1.set_physical_compaction(acknowledged1.borrow());
582 }
583 }
584
585 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
586 if let Some(trace2) = trace2_option.as_mut() {
587 if input1.frontier().is_empty() { trace2_option = None;}
588 else {
589 // Allow `trace2` to compact logically up to the frontier we may yet receive,
590 // in the opposing input (`input1`). All `input1` times will be beyond this
591 // frontier, and joined times only need to be accurate when advanced to it.
592 trace2.set_logical_compaction(input1.frontier().frontier());
593 // Allow `trace2` to compact physically up to the upper bound of batches we
594 // have received in its input (`input2`). We will not require a cursor that
595 // is not beyond this bound.
596 trace2.set_physical_compaction(acknowledged2.borrow());
597 }
598 }
599 }
600 })
601}
602
603
604/// Deferred join computation.
605///
606/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
607/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
608/// dataflow system a chance to run operators that can consume and aggregate the data.
609struct Deferred<T, C1, C2>
610where
611 T: Timestamp+Lattice+Ord,
612 C1: Cursor<Time=T>,
613 C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
614{
615 trace: C1,
616 trace_storage: C1::Storage,
617 batch: C2,
618 batch_storage: C2::Storage,
619 capability: Capability<T>,
620 done: bool,
621}
622
623impl<T, C1, C2> Deferred<T, C1, C2>
624where
625 C1: Cursor<Time=T>,
626 C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
627 T: Timestamp+Lattice+Ord,
628{
629 fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
630 Deferred {
631 trace,
632 trace_storage,
633 batch,
634 batch_storage,
635 capability,
636 done: false,
637 }
638 }
639
640 fn work_remains(&self) -> bool {
641 !self.done
642 }
643
644 /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
645 #[inline(never)]
646 fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputHandleCore<T, EffortBuilder<CB>, Tee<T, CB::Container>>, mut logic: L, fuel: &mut usize)
647 where
648 L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, CB::Container>),
649 {
650
651 let meet = self.capability.time();
652
653 let mut effort = 0;
654 let mut session = output.session_with_builder(&self.capability);
655
656 let trace_storage = &self.trace_storage;
657 let batch_storage = &self.batch_storage;
658
659 let trace = &mut self.trace;
660 let batch = &mut self.batch;
661
662 let mut thinker = JoinThinker::new();
663
664 while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
665
666 match trace.key(trace_storage).cmp(&batch.key(batch_storage)) {
667 Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
668 Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
669 Ordering::Equal => {
670
671 use crate::IntoOwned;
672
673 thinker.history1.edits.load(trace, trace_storage, |time| {
674 let mut time = time.into_owned();
675 time.join_assign(meet);
676 time
677 });
678 thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned());
679
680 // populate `temp` with the results in the best way we know how.
681 thinker.think(|v1,v2,t,r1,r2| {
682 let key = batch.key(batch_storage);
683 logic(key, v1, v2, &t, r1, r2, &mut session);
684 });
685
686 // TODO: Effort isn't perfectly tracked as we might still have some data in the
687 // session at the moment it's dropped.
688 effort += session.builder().0.take();
689 batch.step_key(batch_storage);
690 trace.step_key(trace_storage);
691
692 thinker.history1.clear();
693 thinker.history2.clear();
694 }
695 }
696 }
697 self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
698
699 if effort > *fuel { *fuel = 0; }
700 else { *fuel -= effort; }
701 }
702}
703
704struct JoinThinker<'a, C1, C2>
705where
706 C1: Cursor,
707 C2: Cursor<Time = C1::Time>,
708{
709 pub history1: ValueHistory<'a, C1>,
710 pub history2: ValueHistory<'a, C2>,
711}
712
713impl<'a, C1, C2> JoinThinker<'a, C1, C2>
714where
715 C1: Cursor,
716 C2: Cursor<Time = C1::Time>,
717{
718 fn new() -> Self {
719 JoinThinker {
720 history1: ValueHistory::new(),
721 history2: ValueHistory::new(),
722 }
723 }
724
725 fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
726
727 // for reasonably sized edits, do the dead-simple thing.
728 if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
729 self.history1.edits.map(|v1, t1, d1| {
730 self.history2.edits.map(|v2, t2, d2| {
731 results(v1, v2, t1.join(t2), d1, d2);
732 })
733 })
734 }
735 else {
736
737 let mut replay1 = self.history1.replay();
738 let mut replay2 = self.history2.replay();
739
740 // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
741 // in here. If a time is ever repeated, for example, the call will be identical
742 // and accomplish nothing. If only a single record has been added, it may not
743 // be worth the time to collapse (advance, re-sort) the data when a linear scan
744 // is sufficient.
745
746 while !replay1.is_done() && !replay2.is_done() {
747
748 if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
749 replay2.advance_buffer_by(replay1.meet().unwrap());
750 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
751 let (val1, time1, diff1) = replay1.edit().unwrap();
752 results(val1, val2, time1.join(time2), diff1, diff2);
753 }
754 replay1.step();
755 }
756 else {
757 replay1.advance_buffer_by(replay2.meet().unwrap());
758 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
759 let (val2, time2, diff2) = replay2.edit().unwrap();
760 results(val1, val2, time1.join(time2), diff1, diff2);
761 }
762 replay2.step();
763 }
764 }
765
766 while !replay1.is_done() {
767 replay2.advance_buffer_by(replay1.meet().unwrap());
768 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
769 let (val1, time1, diff1) = replay1.edit().unwrap();
770 results(val1, val2, time1.join(time2), diff1, diff2);
771 }
772 replay1.step();
773 }
774 while !replay2.is_done() {
775 replay1.advance_buffer_by(replay2.meet().unwrap());
776 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
777 let (val2, time2, diff2) = replay2.edit().unwrap();
778 results(val1, val2, time1.join(time2), diff1, diff2);
779 }
780 replay2.step();
781 }
782 }
783 }
784}