Trait differential_dataflow::operators::join::JoinCore

source ·
pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup>
where G::Timestamp: Lattice + Ord,
{ // Required methods fn join_core<Tr2, I, L>( &self, stream2: &Arranged<G, Tr2>, result: L ) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output> where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static, R: Multiply<Tr2::Diff>, <R as Multiply<Tr2::Diff>>::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static; fn join_core_internal_unsafe<Tr2, I, L, D, ROut>( &self, stream2: &Arranged<G, Tr2>, result: L ) -> Collection<G, D, ROut> where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static, D: Data, ROut: Semigroup, I: IntoIterator<Item = (D, G::Timestamp, ROut)>, L: for<'a> FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static; }
Expand description

Matches the elements of two arranged traces.

This method is used by the various join implementations, but it can also be used directly in the event that one has a handle to an Arranged<G,T>, perhaps because the arrangement is available for re-use, or from the output of a reduce operator.

Required Methods§

source

fn join_core<Tr2, I, L>( &self, stream2: &Arranged<G, Tr2>, result: L ) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static, R: Multiply<Tr2::Diff>, <R as Multiply<Tr2::Diff>>::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static,

Joins two arranged collections with the same key type.

Each matching pair of records (key, val1) and (key, val2) are subjected to the result function, which produces something implementing IntoIterator, where the output collection will have an entry for every value returned by the iterator.

This trait is implemented for arrangements (Arranged<G, T>) rather than collections. The Join trait contains the implementations for collections.

§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::trace::Trace;

::timely::example(|scope| {

    let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
                 .arrange_by_key();
    let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
                 .arrange_by_key();

    let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;

    x.join_core(&y, |_key, &a, &b| Some((a, b)))
     .assert_eq(&z);
});
source

fn join_core_internal_unsafe<Tr2, I, L, D, ROut>( &self, stream2: &Arranged<G, Tr2>, result: L ) -> Collection<G, D, ROut>
where Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static, D: Data, ROut: Semigroup, I: IntoIterator<Item = (D, G::Timestamp, ROut)>, L: for<'a> FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static,

An unsafe variant of join_core where the result closure takes additional arguments for time and diff as input and returns an iterator over (data, time, diff) triplets. This allows for more flexibility, but is more error-prone.

Each matching pair of records (key, val1) and (key, val2) are subjected to the result function, which produces something implementing IntoIterator, where the output collection will have an entry for every value returned by the iterator.

This trait is implemented for arrangements (Arranged<G, T>) rather than collections. The Join trait contains the implementations for collections.

§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::trace::Trace;

::timely::example(|scope| {

    let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
                 .arrange_by_key();
    let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
                 .arrange_by_key();

    let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;

    // Returned values have weight `a`
    x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
     .assert_eq(&z);
});

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>