Trait differential_dataflow::operators::join::JoinCore
source · pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup>{
// Required methods
fn join_core<Tr2, I, L>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
R: Multiply<Tr2::Diff>,
<R as Multiply<Tr2::Diff>>::Output: Semigroup + 'static,
I: IntoIterator,
I::Item: Data,
L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static;
fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, D, ROut>
where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
D: Data,
ROut: Semigroup + 'static,
I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
L: for<'a> FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static;
}
Expand description
Matches the elements of two arranged traces.
This method is used by the various join
implementations, but it can also be used
directly in the event that one has a handle to an Arranged<G,T>
, perhaps because
the arrangement is available for re-use, or from the output of a reduce
operator.
Required Methods§
sourcefn join_core<Tr2, I, L>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
fn join_core<Tr2, I, L>( &self, stream2: &Arranged<G, Tr2>, result: L, ) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
Joins two arranged collections with the same key type.
Each matching pair of records (key, val1)
and (key, val2)
are subjected to the result
function,
which produces something implementing IntoIterator
, where the output collection will have an entry for
every value returned by the iterator.
This trait is implemented for arrangements (Arranged<G, T>
) rather than collections. The Join
trait
contains the implementations for collections.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::trace::Trace;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
.arrange_by_key();
let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
.arrange_by_key();
let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
x.join_core(&y, |_key, &a, &b| Some((a, b)))
.assert_eq(&z);
});
sourcefn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, D, ROut>
fn join_core_internal_unsafe<Tr2, I, L, D, ROut>( &self, stream2: &Arranged<G, Tr2>, result: L, ) -> Collection<G, D, ROut>
An unsafe variant of join_core
where the result
closure takes additional arguments for time
and
diff
as input and returns an iterator over (data, time, diff)
triplets. This allows for more
flexibility, but is more error-prone.
Each matching pair of records (key, val1)
and (key, val2)
are subjected to the result
function,
which produces something implementing IntoIterator
, where the output collection will have an entry
for every value returned by the iterator.
This trait is implemented for arrangements (Arranged<G, T>
) rather than collections. The Join
trait
contains the implementations for collections.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::trace::Trace;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
.arrange_by_key();
let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
.arrange_by_key();
let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
// Returned values have weight `a`
x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
.assert_eq(&z);
});