pub type RowRowArrangement<S> = Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>;
Aliased Type§
struct RowRowArrangement<S> {
pub stream: StreamCore<S, Vec<<TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>> as TraceReader>::Batch>>,
pub trace: TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>>,
}
Fields§
§stream: StreamCore<S, Vec<<TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>> as TraceReader>::Batch>>
A stream containing arranged updates.
This stream contains the same batches of updates the trace itself accepts, so there should be no additional overhead to receiving these records. The batches can be navigated just as the batches in the trace, by key and by value.
trace: TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>>
A shared trace, updated by the Arrange
operator and readable by others.
Implementations
Source§impl<'a, G, Tr> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>
impl<'a, G, Tr> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>
Sourcepub fn leave_region(&self) -> Arranged<G, Tr>
pub fn leave_region(&self) -> Arranged<G, Tr>
Brings an arranged collection out of a nested region.
This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.
Source§impl<G, T1> Arranged<G, T1>
impl<G, T1> Arranged<G, T1>
Sourcepub fn join_core<T2, I, L>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, <I as IntoIterator>::Item, <<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output>where
T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static,
<T1 as TraceReader>::Diff: Multiply<<T2 as TraceReader>::Diff>,
<<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output: Semigroup + 'static,
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>) -> I + 'static,
pub fn join_core<T2, I, L>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, <I as IntoIterator>::Item, <<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output>where
T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static,
<T1 as TraceReader>::Diff: Multiply<<T2 as TraceReader>::Diff>,
<<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output: Semigroup + 'static,
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>) -> I + 'static,
A direct implementation of the JoinCore::join_core
method.
Sourcepub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, D, ROut>where
T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static,
D: Data,
ROut: Semigroup + 'static,
I: IntoIterator<Item = (D, <G as ScopeParent>::Timestamp, ROut)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>, &<G as ScopeParent>::Timestamp, &<T1 as TraceReader>::Diff, &<T2 as TraceReader>::Diff) -> I + 'static,
pub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
&self,
other: &Arranged<G, T2>,
result: L,
) -> Collection<G, D, ROut>where
T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static,
D: Data,
ROut: Semigroup + 'static,
I: IntoIterator<Item = (D, <G as ScopeParent>::Timestamp, ROut)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>, &<G as ScopeParent>::Timestamp, &<T1 as TraceReader>::Diff, &<T2 as TraceReader>::Diff) -> I + 'static,
A direct implementation of the JoinCore::join_core_internal_unsafe
method.
Source§impl<G, T1> Arranged<G, T1>
impl<G, T1> Arranged<G, T1>
Sourcepub fn reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static,
K: Ord + 'static,
V: Data,
<T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
<T2 as TraceReader>::Diff: Abelian,
<T2 as TraceReader>::Batch: Batch,
Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>,
<Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,
pub fn reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static,
K: Ord + 'static,
V: Data,
<T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
<T2 as TraceReader>::Diff: Abelian,
<T2 as TraceReader>::Batch: Batch,
Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>,
<Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,
A direct implementation of ReduceCore::reduce_abelian
.
Sourcepub fn reduce_core<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static,
K: Ord + 'static,
V: Data,
<T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
<T2 as TraceReader>::Batch: Batch,
Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>,
<Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>, &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,
pub fn reduce_core<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static,
K: Ord + 'static,
V: Data,
<T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
<T2 as TraceReader>::Batch: Batch,
Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>,
<Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>,
L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>, &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,
A direct implementation of ReduceCore::reduce_core
.
Source§impl<G, Tr> Arranged<G, Tr>
impl<G, Tr> Arranged<G, Tr>
Sourcepub fn enter<'a, TInner>(
&self,
child: &Child<'a, G, TInner>,
) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
pub fn enter<'a, TInner>( &self, child: &Child<'a, G, TInner>, ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
Brings an arranged collection into a nested scope.
This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.
Sourcepub fn enter_region<'a>(
&self,
child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
) -> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>
pub fn enter_region<'a>( &self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>, ) -> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>
Brings an arranged collection into a nested region.
This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.
Sourcepub fn enter_at<'a, TInner, F, P>(
&self,
child: &Child<'a, G, TInner>,
logic: F,
prior: P,
) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner, F, P>>where
TInner: Refines<<G as ScopeParent>::Timestamp> + Lattice + Timestamp + Clone + 'static,
F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>, <Tr as TraceReader>::TimeGat<'_>) -> TInner + Clone + 'static,
P: FnMut(&TInner) -> <Tr as TraceReader>::Time + Clone + 'static,
pub fn enter_at<'a, TInner, F, P>(
&self,
child: &Child<'a, G, TInner>,
logic: F,
prior: P,
) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner, F, P>>where
TInner: Refines<<G as ScopeParent>::Timestamp> + Lattice + Timestamp + Clone + 'static,
F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>, <Tr as TraceReader>::TimeGat<'_>) -> TInner + Clone + 'static,
P: FnMut(&TInner) -> <Tr as TraceReader>::Time + Clone + 'static,
Brings an arranged collection into a nested scope.
This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.
Sourcepub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>where
F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> bool + Clone + 'static,
pub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>where
F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> bool + Clone + 'static,
Filters an arranged collection.
This method produces a new arrangement backed by the same shared
arrangement as self
, paired with user-specified logic that can
filter by key and value. The resulting collection is restricted
to the keys and values that return true under the user predicate.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
::timely::example(|scope| {
let arranged =
scope.new_collection_from(0 .. 10).1
.map(|x| (x, x+1))
.arrange_by_key();
arranged
.filter(|k,v| k == v)
.as_collection(|k,v| (*k,*v))
.assert_empty();
});
Sourcepub fn as_collection<D, L>(
&self,
logic: L,
) -> Collection<G, D, <Tr as TraceReader>::Diff>
pub fn as_collection<D, L>( &self, logic: L, ) -> Collection<G, D, <Tr as TraceReader>::Diff>
Flattens the stream into a Collection
.
The underlying Stream<G, BatchWrapper<T::Batch>>
is a much more efficient way to access the data,
and this method should only be used when the data need to be transformed or exchanged, rather than
supplied as arguments to an operator using the same key-value structure.
Sourcepub fn flat_map_ref<I, L>(
&self,
logic: L,
) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>where
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,
pub fn flat_map_ref<I, L>(
&self,
logic: L,
) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>where
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,
Extracts elements from an arrangement as a collection.
The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.
Sourcepub fn flat_map_batches<I, L>(
stream: &StreamCore<G, Vec<<Tr as TraceReader>::Batch>>,
logic: L,
) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>where
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,
pub fn flat_map_batches<I, L>(
stream: &StreamCore<G, Vec<<Tr as TraceReader>::Batch>>,
logic: L,
) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>where
I: IntoIterator,
<I as IntoIterator>::Item: Data,
L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,
Extracts elements from a stream of batches as a collection.
The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.
This method exists for streams of batches without the corresponding arrangement.
If you have the arrangement, its flat_map_ref
method is equivalent to this.
Trait Implementations
Source§impl<G, T, R> ArrangementSize for Arranged<G, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>>>where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord + Columnation,
T: Lattice + Timestamp,
R: Semigroup + Ord + Columnation + 'static,
impl<G, T, R> ArrangementSize for Arranged<G, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>>>where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord + Columnation,
T: Lattice + Timestamp,
R: Semigroup + Ord + Columnation + 'static,
Source§fn log_arrangement_size(self) -> Self
fn log_arrangement_size(self) -> Self
Source§impl<G, K, T1, R> Count<G, K, R> for Arranged<G, T1>where
R: Data + Semigroup,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
K: Data,
impl<G, K, T1, R> Count<G, K, R> for Arranged<G, T1>where
R: Data + Semigroup,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
K: Data,
Source§impl<G, K, T1> CountTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
<T1 as TraceReader>::Diff: for<'a> Semigroup<<T1 as TraceReader>::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
<T1 as TraceReader>::Time: TotalOrder,
impl<G, K, T1> CountTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>where
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
<T1 as TraceReader>::Diff: for<'a> Semigroup<<T1 as TraceReader>::DiffGat<'a>> + ExchangeData,
K: ExchangeData,
<T1 as TraceReader>::Time: TotalOrder,
Source§fn count_total_core<R2>(
&self,
) -> Collection<G, (K, <T1 as TraceReader>::Diff), R2>
fn count_total_core<R2>( &self, ) -> Collection<G, (K, <T1 as TraceReader>::Diff), R2>
Source§fn count_total(&self) -> Collection<G, (K, R)>
fn count_total(&self) -> Collection<G, (K, R)>
Source§impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>where
G: Scope<Timestamp = <Tr as TraceReader>::Time>,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
K: ExchangeData + Hashable,
V: Data + 'static,
impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>where
G: Scope<Timestamp = <Tr as TraceReader>::Time>,
Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
K: ExchangeData + Hashable,
V: Data + 'static,
Source§fn join_map<V2, R2, D, L>(
&self,
other: &Collection<G, (K, V2), R2>,
logic: L,
) -> Collection<G, D, <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>where
V2: ExchangeData,
R2: ExchangeData + Semigroup,
D: Data,
<Tr as TraceReader>::Diff: Multiply<R2>,
<<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static,
L: for<'a> FnMut(<Tr as TraceReader>::Key<'a>, <Tr as TraceReader>::Val<'a>, &V2) -> D + 'static,
fn join_map<V2, R2, D, L>(
&self,
other: &Collection<G, (K, V2), R2>,
logic: L,
) -> Collection<G, D, <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>where
V2: ExchangeData,
R2: ExchangeData + Semigroup,
D: Data,
<Tr as TraceReader>::Diff: Multiply<R2>,
<<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static,
L: for<'a> FnMut(<Tr as TraceReader>::Key<'a>, <Tr as TraceReader>::Val<'a>, &V2) -> D + 'static,
Source§fn semijoin<R2>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>where
R2: ExchangeData + Semigroup,
<Tr as TraceReader>::Diff: Multiply<R2>,
<<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static,
fn semijoin<R2>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>where
R2: ExchangeData + Semigroup,
<Tr as TraceReader>::Diff: Multiply<R2>,
<<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static,
(key, val)
and key
based on key
, producing the former with frequencies multiplied. Read moreSource§fn antijoin<R2>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <Tr as TraceReader>::Diff>where
R2: ExchangeData + Semigroup,
<Tr as TraceReader>::Diff: Multiply<R2, Output = <Tr as TraceReader>::Diff> + Abelian + 'static,
fn antijoin<R2>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <Tr as TraceReader>::Diff>where
R2: ExchangeData + Semigroup,
<Tr as TraceReader>::Diff: Multiply<R2, Output = <Tr as TraceReader>::Diff> + Abelian + 'static,
Source§fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
(key,val1)
and (key,val2)
based on key
and yields pairs (key, (val1, val2))
. Read moreSource§impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
Source§fn mz_reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
K: Data,
V: Data,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
fn mz_reduce_abelian<L, K, V, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
K: Data,
V: Data,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
Applies reduce
to arranged data, and returns an arrangement of output data.
Source§impl<G, K, V, T1, R> Reduce<G, K, V, R> for Arranged<G, T1>where
V: Data,
R: Ord + Semigroup + 'static,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Diff = R> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
<T1 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
K: Data,
impl<G, K, V, T1, R> Reduce<G, K, V, R> for Arranged<G, T1>where
V: Data,
R: Ord + Semigroup + 'static,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Diff = R> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
<T1 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>,
K: Data,
Source§fn reduce_named<L, V2, R2>(
&self,
name: &str,
logic: L,
) -> Collection<G, (K, V2), R2>
fn reduce_named<L, V2, R2>( &self, name: &str, logic: L, ) -> Collection<G, (K, V2), R2>
reduce
with the ability to name the operator.Source§impl<G: Scope, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
impl<G: Scope, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
Source§fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)where
K: Data,
T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
T1::Diff: Abelian,
T1::Batch: Batch,
Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
V1: Data,
T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
for<'a, 'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
V2: Data,
Arranged<G, TraceAgent<T1>>: ArrangementSize,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)where
K: Data,
T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
T1::Diff: Abelian,
T1::Batch: Batch,
Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
V1: Data,
T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
for<'a, 'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
V2: Data,
Arranged<G, TraceAgent<T1>>: ArrangementSize,
Arranged<G, TraceAgent<T2>>: ArrangementSize,
Source§impl<G, K, T1, R1> Threshold<G, K, R1> for Arranged<G, T1>where
R1: Semigroup,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R1> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
K: Data,
impl<G, K, T1, R1> Threshold<G, K, R1> for Arranged<G, T1>where
R1: Semigroup,
G: Scope<Timestamp = <T1 as TraceReader>::Time>,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R1> + Clone + 'static,
<T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>,
K: Data,
Source§fn threshold_named<R2, F>(&self, name: &str, thresh: F) -> Collection<G, K, R2>
fn threshold_named<R2, F>(&self, name: &str, thresh: F) -> Collection<G, K, R2>
threshold
with the ability to name the operator.