pub struct Arranged<G: Scope, Tr>
where G::Timestamp: Lattice + Ord, Tr: TraceReader + Clone,
{ pub stream: Stream<G, Tr::Batch>, pub trace: Tr, }
Expand description

An arranged collection of (K,V) values.

An Arranged allows multiple differential operators to share the resources (communication, computation, memory) required to produce and maintain an indexed representation of a collection.

Fields§

§stream: Stream<G, Tr::Batch>

A stream containing arranged updates.

This stream contains the same batches of updates the trace itself accepts, so there should be no additional overhead to receiving these records. The batches can be navigated just as the batches in the trace, by key and by value.

§trace: Tr

A shared trace, updated by the Arrange operator and readable by others.

Implementations§

source§

impl<G, Tr> Arranged<G, Tr>
where G: Scope<Timestamp = Tr::Time>, Tr: TraceReader + Clone,

source

pub fn enter<'a, TInner>( &self, child: &Child<'a, G, TInner> ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
where TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone,

Brings an arranged collection into a nested scope.

This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.

source

pub fn enter_region<'a>( &self, child: &Child<'a, G, G::Timestamp> ) -> Arranged<Child<'a, G, G::Timestamp>, Tr>

Brings an arranged collection into a nested region.

This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.

source

pub fn enter_at<'a, TInner, F, P>( &self, child: &Child<'a, G, TInner>, logic: F, prior: P ) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
where TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + 'static, F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp) -> TInner + Clone + 'static, P: FnMut(&TInner) -> Tr::Time + Clone + 'static,

Brings an arranged collection into a nested scope.

This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.

source

pub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>
where F: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> bool + Clone + 'static,

Filters an arranged collection.

This method produces a new arrangement backed by the same shared arrangement as self, paired with user-specified logic that can filter by key and value. The resulting collection is restricted to the keys and values that return true under the user predicate.

§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;

::timely::example(|scope| {

    let arranged =
    scope.new_collection_from(0 .. 10).1
         .map(|x| (x, x+1))
         .arrange_by_key();

    arranged
        .filter(|k,v| k == v)
        .as_collection(|k,v| (*k,*v))
        .assert_empty();
});
source

pub fn as_collection<D: Data, L>(&self, logic: L) -> Collection<G, D, Tr::Diff>
where L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,

Flattens the stream into a Collection.

The underlying Stream<G, BatchWrapper<T::Batch>> is a much more efficient way to access the data, and this method should only be used when the data need to be transformed or exchanged, rather than supplied as arguments to an operator using the same key-value structure.

source

pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
where I: IntoIterator, I::Item: Data, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,

Extracts elements from an arrangement as a collection.

The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.

source

pub fn flat_map_batches<I, L>( stream: &Stream<G, Tr::Batch>, logic: L ) -> Collection<G, I::Item, Tr::Diff>
where I: IntoIterator, I::Item: Data, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,

Extracts elements from a stream of batches as a collection.

The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.

This method exists for streams of batches without the corresponding arrangement. If you have the arrangement, its flat_map_ref method is equivalent to this.

source

pub fn lookup( &self, queries: &Stream<G, (Tr::KeyOwned, G::Timestamp)> ) -> Stream<G, (Tr::KeyOwned, Tr::ValOwned, G::Timestamp, Tr::Diff)>
where Tr::KeyOwned: ExchangeData + Hashable, Tr::ValOwned: ExchangeData, Tr::Diff: ExchangeData, Tr: 'static,

Report values associated with keys at certain times.

This method consumes a stream of (key, time) queries and reports the corresponding stream of (key, value, time, diff) accumulations in the self trace.

source§

impl<G, T1> Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: TraceReader + Clone + 'static,

source

pub fn join_core<T2, I, L>( &self, other: &Arranged<G, T2>, result: L ) -> Collection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
where T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static, T1::Diff: Multiply<T2::Diff>, <T1::Diff as Multiply<T2::Diff>>::Output: Semigroup, I: IntoIterator, I::Item: Data, L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>) -> I + 'static,

A direct implementation of the JoinCore::join_core method.

source

pub fn join_core_internal_unsafe<T2, I, L, D, ROut>( &self, other: &Arranged<G, T2>, result: L ) -> Collection<G, D, ROut>
where T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static, D: Data, ROut: Semigroup, I: IntoIterator<Item = (D, G::Timestamp, ROut)>, L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>, &G::Timestamp, &T1::Diff, &T2::Diff) -> I + 'static,

A direct implementation of the JoinCore::join_core_internal_unsafe method.

source§

impl<G, T1> Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: TraceReader + Clone + 'static,

source

pub fn reduce_abelian<L, T2>( &self, name: &str, logic: L ) -> Arranged<G, TraceAgent<T2>>
where T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static, T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>) + 'static,

A direct implementation of ReduceCore::reduce_abelian.

source

pub fn reduce_core<L, T2>( &self, name: &str, logic: L ) -> Arranged<G, TraceAgent<T2>>
where T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = T1::Time> + 'static, T2::ValOwned: Data, T2::Batch: Batch, T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>) + 'static,

A direct implementation of ReduceCore::reduce_core.

source§

impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where G: Scope<Timestamp = Tr::Time>, Tr: TraceReader + Clone,

source

pub fn leave_region(&self) -> Arranged<G, Tr>

Brings an arranged collection out of a nested region.

This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.

Trait Implementations§

source§

impl<G, Tr> Clone for Arranged<G, Tr>
where G: Scope<Timestamp = Tr::Time>, Tr: TraceReader + Clone,

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<G, K: Data, T1, R: Semigroup> Count<G, K, R> for Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwned = K, Val<'a> = &'a (), Diff = R> + Clone + 'static,

source§

fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2>

Count for general integer differences. Read more
source§

fn count(&self) -> Collection<G, (K, R), isize>

Counts the number of occurrences of each element. Read more
source§

impl<G, T1> CountTotal<G, <T1 as TraceReader>::KeyOwned, <T1 as TraceReader>::Diff> for Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static, T1::KeyOwned: ExchangeData, T1::Time: TotalOrder, T1::Diff: ExchangeData,

source§

fn count_total_core<R2: Semigroup + From<i8>>( &self ) -> Collection<G, (T1::KeyOwned, T1::Diff), R2>

Count for general integer differences. Read more
source§

fn count_total(&self) -> Collection<G, (K, R), isize>

Counts the number of occurrences of each element. Read more
source§

impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>
where G: Scope<Timestamp = Tr::Time>, Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static, K: ExchangeData + Hashable, V: Data + 'static,

source§

fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>( &self, other: &Collection<G, (K, V2), R2>, logic: L ) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
where Tr::Diff: Multiply<R2>, <Tr::Diff as Multiply<R2>>::Output: Semigroup, L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2) -> D + 'static,

Matches pairs (key,val1) and (key,val2) based on key and then applies a function. Read more
source§

fn semijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2> ) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
where Tr::Diff: Multiply<R2>, <Tr::Diff as Multiply<R2>>::Output: Semigroup,

Matches pairs (key, val) and key based on key, producing the former with frequencies multiplied. Read more
source§

fn antijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2> ) -> Collection<G, (K, V), Tr::Diff>
where Tr::Diff: Multiply<R2, Output = Tr::Diff> + Abelian,

Subtracts the semijoin with other from self. Read more
source§

fn join<V2, R2>( &self, other: &Collection<G, (K, V2), R2> ) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>
where K: ExchangeData, V2: ExchangeData, R2: ExchangeData + Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup,

Matches pairs (key,val1) and (key,val2) based on key and yields pairs (key, (val1, val2)). Read more
source§

impl<G, K: Data, V: Data, T1, R: Semigroup> Reduce<G, K, V, R> for Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwned = K, Val<'a> = &'a V, Diff = R> + Clone + 'static,

source§

fn reduce_named<L, V2: Data, R2: Abelian>( &self, name: &str, logic: L ) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,

As reduce with the ability to name the operator.
source§

fn reduce<L, V2: Data, R2: Abelian>( &self, logic: L ) -> Collection<G, (K, V2), R2>
where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,

Applies a reduction function on records grouped by key. Read more
source§

impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwned = K, Val<'a> = &'a (), Diff = R1> + Clone + 'static,

source§

fn threshold_named<R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static>( &self, name: &str, thresh: F ) -> Collection<G, K, R2>

A threshold with the ability to name the operator.
source§

fn threshold<R2: Abelian, F: FnMut(&K, &R1) -> R2 + 'static>( &self, thresh: F ) -> Collection<G, K, R2>

Transforms the multiplicity of records. Read more
source§

fn distinct(&self) -> Collection<G, K, isize>

Reduces the collection to one occurrence of each distinct element. Read more
source§

fn distinct_core<R2: Abelian + From<i8>>(&self) -> Collection<G, K, R2>

Distinct for general integer differences. Read more
source§

impl<G, K, T1> ThresholdTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>
where G: Scope<Timestamp = T1::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a ()> + Clone + 'static, K: ExchangeData, T1::Time: TotalOrder, T1::Diff: ExchangeData,

source§

fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
where R2: Semigroup, F: for<'a> FnMut(T1::Key<'a>, &T1::Diff, Option<&T1::Diff>) -> Option<R2> + 'static,

Reduces the collection to one occurrence of each distinct element.
source§

fn threshold_total<R2: Abelian, F: FnMut(&K, &R) -> R2 + 'static>( &self, thresh: F ) -> Collection<G, K, R2>

Reduces the collection to one occurrence of each distinct element. Read more
source§

fn distinct_total(&self) -> Collection<G, K, isize>

Reduces the collection to one occurrence of each distinct element. Read more
source§

fn distinct_total_core<R2: Abelian + From<i8>>(&self) -> Collection<G, K, R2>

Distinct for general integer differences. Read more

Auto Trait Implementations§

§

impl<G, Tr> Freeze for Arranged<G, Tr>
where Tr: Freeze, G: Freeze,

§

impl<G, Tr> !RefUnwindSafe for Arranged<G, Tr>

§

impl<G, Tr> !Send for Arranged<G, Tr>

§

impl<G, Tr> !Sync for Arranged<G, Tr>

§

impl<G, Tr> Unpin for Arranged<G, Tr>
where Tr: Unpin, G: Unpin,

§

impl<G, Tr> !UnwindSafe for Arranged<G, Tr>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
where R: Region<Index = (usize, usize)>, O: OffsetContainer<usize>, T: CopyOnto<R>,

source§

fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index

Copy self into the target container, returning an index that allows to look up the corresponding read item.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<R, T> PushInto<FlatStack<R>> for T
where R: Region + Clone + 'static, T: CopyOnto<R>,

source§

fn push_into(self, target: &mut FlatStack<R>)

Push self into the target container.
source§

impl<T> PushInto<Vec<T>> for T

source§

fn push_into(self, target: &mut Vec<T>)

Push self into the target container.
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> Data for T
where T: Clone + 'static,