Struct differential_dataflow::operators::arrange::arrangement::Arranged
source · pub struct Arranged<G: Scope, Tr>{
pub stream: Stream<G, Tr::Batch>,
pub trace: Tr,
}
Expand description
An arranged collection of (K,V)
values.
An Arranged
allows multiple differential operators to share the resources (communication,
computation, memory) required to produce and maintain an indexed representation of a collection.
Fields§
§stream: Stream<G, Tr::Batch>
A stream containing arranged updates.
This stream contains the same batches of updates the trace itself accepts, so there should be no additional overhead to receiving these records. The batches can be navigated just as the batches in the trace, by key and by value.
trace: Tr
A shared trace, updated by the Arrange
operator and readable by others.
Implementations§
source§impl<G, Tr> Arranged<G, Tr>
impl<G, Tr> Arranged<G, Tr>
sourcepub fn enter<'a, TInner>(
&self,
child: &Child<'a, G, TInner>,
) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
pub fn enter<'a, TInner>( &self, child: &Child<'a, G, TInner>, ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
Brings an arranged collection into a nested scope.
This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.
sourcepub fn enter_region<'a>(
&self,
child: &Child<'a, G, G::Timestamp>,
) -> Arranged<Child<'a, G, G::Timestamp>, Tr>
pub fn enter_region<'a>( &self, child: &Child<'a, G, G::Timestamp>, ) -> Arranged<Child<'a, G, G::Timestamp>, Tr>
Brings an arranged collection into a nested region.
This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.
sourcepub fn enter_at<'a, TInner, F, P>(
&self,
child: &Child<'a, G, TInner>,
logic: F,
prior: P,
) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
pub fn enter_at<'a, TInner, F, P>( &self, child: &Child<'a, G, TInner>, logic: F, prior: P, ) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
Brings an arranged collection into a nested scope.
This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.
sourcepub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>
pub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>
Filters an arranged collection.
This method produces a new arrangement backed by the same shared
arrangement as self
, paired with user-specified logic that can
filter by key and value. The resulting collection is restricted
to the keys and values that return true under the user predicate.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
::timely::example(|scope| {
let arranged =
scope.new_collection_from(0 .. 10).1
.map(|x| (x, x+1))
.arrange_by_key();
arranged
.filter(|k,v| k == v)
.as_collection(|k,v| (*k,*v))
.assert_empty();
});
sourcepub fn as_collection<D: Data, L>(&self, logic: L) -> Collection<G, D, Tr::Diff>
pub fn as_collection<D: Data, L>(&self, logic: L) -> Collection<G, D, Tr::Diff>
Flattens the stream into a Collection
.
The underlying Stream<G, BatchWrapper<T::Batch>>
is a much more efficient way to access the data,
and this method should only be used when the data need to be transformed or exchanged, rather than
supplied as arguments to an operator using the same key-value structure.
sourcepub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
Extracts elements from an arrangement as a collection.
The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.
sourcepub fn flat_map_batches<I, L>(
stream: &Stream<G, Tr::Batch>,
logic: L,
) -> Collection<G, I::Item, Tr::Diff>
pub fn flat_map_batches<I, L>( stream: &Stream<G, Tr::Batch>, logic: L, ) -> Collection<G, I::Item, Tr::Diff>
Extracts elements from a stream of batches as a collection.
The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.
This method exists for streams of batches without the corresponding arrangement.
If you have the arrangement, its flat_map_ref
method is equivalent to this.
source§impl<G, T1> Arranged<G, T1>
impl<G, T1> Arranged<G, T1>
sourcepub fn join_core<T2, I, L>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
pub fn join_core<T2, I, L>( &self, other: &Arranged<G, T2>, result: L, ) -> Collection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
A direct implementation of the JoinCore::join_core
method.
sourcepub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, D, ROut>
pub fn join_core_internal_unsafe<T2, I, L, D, ROut>( &self, other: &Arranged<G, T2>, result: L, ) -> Collection<G, D, ROut>
A direct implementation of the JoinCore::join_core_internal_unsafe
method.
source§impl<G, T1> Arranged<G, T1>
impl<G, T1> Arranged<G, T1>
sourcepub fn reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static,
K: Ord + 'static,
V: Data,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
pub fn reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static,
K: Ord + 'static,
V: Data,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
A direct implementation of ReduceCore::reduce_abelian
.
sourcepub fn reduce_core<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static,
K: Ord + 'static,
V: Data,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static,
pub fn reduce_core<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static,
K: Ord + 'static,
V: Data,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static,
A direct implementation of ReduceCore::reduce_core
.
source§impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
sourcepub fn leave_region(&self) -> Arranged<G, Tr>
pub fn leave_region(&self) -> Arranged<G, Tr>
Brings an arranged collection out of a nested region.
This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.
Trait Implementations§
source§impl<G, K, T1> CountTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = T1::Time>,
T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T1::Diff: Semigroup<T1::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
T1::Time: TotalOrder,
impl<G, K, T1> CountTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = T1::Time>,
T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T1::Diff: Semigroup<T1::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
T1::Time: TotalOrder,
source§fn count_total_core<R2: Semigroup + From<i8> + 'static>(
&self,
) -> Collection<G, (K, T1::Diff), R2>
fn count_total_core<R2: Semigroup + From<i8> + 'static>( &self, ) -> Collection<G, (K, T1::Diff), R2>
source§fn count_total(&self) -> Collection<G, (K, R), isize>
fn count_total(&self) -> Collection<G, (K, R), isize>
source§impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
K: ExchangeData + Hashable,
V: Data + 'static,
impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
K: ExchangeData + Hashable,
V: Data + 'static,
source§fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
&self,
other: &Collection<G, (K, V2), R2>,
logic: L,
) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>( &self, other: &Collection<G, (K, V2), R2>, logic: L, ) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
source§fn semijoin<R2: ExchangeData + Semigroup>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
fn semijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
(key, val)
and key
based on key
, producing the former with frequencies multiplied. Read moresource§fn antijoin<R2: ExchangeData + Semigroup>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), Tr::Diff>
fn antijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), Tr::Diff>
source§fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
(key,val1)
and (key,val2)
based on key
and yields pairs (key, (val1, val2))
. Read moresource§impl<G, K: Data, V: Data, T1, R: Ord + Semigroup + 'static> Reduce<G, K, V, R> for Arranged<G, T1>
impl<G, K: Data, V: Data, T1, R: Ord + Semigroup + 'static> Reduce<G, K, V, R> for Arranged<G, T1>
source§fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
&self,
name: &str,
logic: L,
) -> Collection<G, (K, V2), R2>
fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>( &self, name: &str, logic: L, ) -> Collection<G, (K, V2), R2>
reduce
with the ability to name the operator.source§impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
source§fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
&self,
name: &str,
thresh: F,
) -> Collection<G, K, R2>
fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( &self, name: &str, thresh: F, ) -> Collection<G, K, R2>
threshold
with the ability to name the operator.source§fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
&self,
thresh: F,
) -> Collection<G, K, R2>
fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( &self, thresh: F, ) -> Collection<G, K, R2>
source§fn distinct(&self) -> Collection<G, K, isize>
fn distinct(&self) -> Collection<G, K, isize>
source§fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>(
&self,
) -> Collection<G, K, R2>
fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>( &self, ) -> Collection<G, K, R2>
source§impl<G, K, T1> ThresholdTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = T1::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a ()> + Clone + 'static,
for<'a> T1::Diff: Semigroup<T1::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
T1::Time: TotalOrder,
impl<G, K, T1> ThresholdTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = T1::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a ()> + Clone + 'static,
for<'a> T1::Diff: Semigroup<T1::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
T1::Time: TotalOrder,
source§fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
source§fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>(
&self,
thresh: F,
) -> Collection<G, K, R2>
fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>( &self, thresh: F, ) -> Collection<G, K, R2>
source§fn distinct_total(&self) -> Collection<G, K, isize>
fn distinct_total(&self) -> Collection<G, K, isize>
source§fn distinct_total_core<R2: Abelian + From<i8> + 'static>(
&self,
) -> Collection<G, K, R2>
fn distinct_total_core<R2: Abelian + From<i8> + 'static>( &self, ) -> Collection<G, K, R2>
Auto Trait Implementations§
impl<G, Tr> Freeze for Arranged<G, Tr>
impl<G, Tr> !RefUnwindSafe for Arranged<G, Tr>
impl<G, Tr> !Send for Arranged<G, Tr>
impl<G, Tr> !Sync for Arranged<G, Tr>
impl<G, Tr> Unpin for Arranged<G, Tr>
impl<G, Tr> !UnwindSafe for Arranged<G, Tr>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.