mz_compute::typedefs

Type Alias RowRowArrangement

Source
pub type RowRowArrangement<S> = Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>;

Aliased Type§

struct RowRowArrangement<S> {
    pub stream: StreamCore<S, Vec<<TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>> as TraceReader>::Batch>>,
    pub trace: TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>>,
}

Fields§

§stream: StreamCore<S, Vec<<TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>> as TraceReader>::Batch>>

A stream containing arranged updates.

This stream contains the same batches of updates the trace itself accepts, so there should be no additional overhead to receiving these records. The batches can be navigated just as the batches in the trace, by key and by value.

§trace: TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, i64)>>>>>

A shared trace, updated by the Arrange operator and readable by others.

Implementations

Source§

impl<'a, G, Tr> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>
where G: Scope<Timestamp = <Tr as TraceReader>::Time>, Tr: TraceReader + Clone,

Source

pub fn leave_region(&self) -> Arranged<G, Tr>

Brings an arranged collection out of a nested region.

This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.

Source§

impl<G, T1> Arranged<G, T1>
where G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: TraceReader + Clone + 'static,

Source

pub fn join_core<T2, I, L>( &self, other: &Arranged<G, T2>, result: L, ) -> Collection<G, <I as IntoIterator>::Item, <<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output>
where T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static, <T1 as TraceReader>::Diff: Multiply<<T2 as TraceReader>::Diff>, <<T1 as TraceReader>::Diff as Multiply<<T2 as TraceReader>::Diff>>::Output: Semigroup + 'static, I: IntoIterator, <I as IntoIterator>::Item: Data, L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>) -> I + 'static,

A direct implementation of the JoinCore::join_core method.

Source

pub fn join_core_internal_unsafe<T2, I, L, D, ROut>( &self, other: &Arranged<G, T2>, result: L, ) -> Collection<G, D, ROut>
where T2: for<'a> TraceReader<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + Clone + 'static, D: Data, ROut: Semigroup + 'static, I: IntoIterator<Item = (D, <G as ScopeParent>::Timestamp, ROut)>, L: FnMut(<T1 as TraceReader>::Key<'_>, <T1 as TraceReader>::Val<'_>, <T2 as TraceReader>::Val<'_>, &<G as ScopeParent>::Timestamp, &<T1 as TraceReader>::Diff, &<T2 as TraceReader>::Diff) -> I + 'static,

A direct implementation of the JoinCore::join_core_internal_unsafe method.

Source§

impl<G, T1> Arranged<G, T1>
where G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: TraceReader + Clone + 'static,

Source

pub fn reduce_abelian<L, K, V, Bu, T2>( &self, name: &str, logic: L, ) -> Arranged<G, TraceAgent<T2>>
where <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static, K: Ord + 'static, V: Data, <T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>, <T2 as TraceReader>::Diff: Abelian, <T2 as TraceReader>::Batch: Batch, Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>, <Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>, L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,

A direct implementation of ReduceCore::reduce_abelian.

Source

pub fn reduce_core<L, K, V, Bu, T2>( &self, name: &str, logic: L, ) -> Arranged<G, TraceAgent<T2>>
where <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, T2: for<'a> Trace<Key<'a> = <T1 as TraceReader>::Key<'a>, Time = <T1 as TraceReader>::Time> + 'static, K: Ord + 'static, V: Data, <T2 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>, <T2 as TraceReader>::Batch: Batch, Bu: Builder<Time = <G as ScopeParent>::Timestamp, Output = <T2 as TraceReader>::Batch>, <Bu as Builder>::Input: Container + PushInto<((K, V), <T2 as TraceReader>::Time, <T2 as TraceReader>::Diff)>, L: FnMut(<T1 as TraceReader>::Key<'_>, &[(<T1 as TraceReader>::Val<'_>, <T1 as TraceReader>::Diff)], &mut Vec<(V, <T2 as TraceReader>::Diff)>, &mut Vec<(V, <T2 as TraceReader>::Diff)>) + 'static,

A direct implementation of ReduceCore::reduce_core.

Source§

impl<G, Tr> Arranged<G, Tr>
where G: Scope<Timestamp = <Tr as TraceReader>::Time>, Tr: TraceReader + Clone,

Source

pub fn enter<'a, TInner>( &self, child: &Child<'a, G, TInner>, ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
where TInner: Refines<<G as ScopeParent>::Timestamp> + Lattice + Timestamp + Clone,

Brings an arranged collection into a nested scope.

This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.

Source

pub fn enter_region<'a>( &self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>, ) -> Arranged<Child<'a, G, <G as ScopeParent>::Timestamp>, Tr>

Brings an arranged collection into a nested region.

This method only applies to regions, which are subscopes with the same timestamp as their containing scope. In this case, the trace type does not need to change.

Source

pub fn enter_at<'a, TInner, F, P>( &self, child: &Child<'a, G, TInner>, logic: F, prior: P, ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner, F, P>>
where TInner: Refines<<G as ScopeParent>::Timestamp> + Lattice + Timestamp + Clone + 'static, F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>, <Tr as TraceReader>::TimeGat<'_>) -> TInner + Clone + 'static, P: FnMut(&TInner) -> <Tr as TraceReader>::Time + Clone + 'static,

Brings an arranged collection into a nested scope.

This method produces a proxy trace handle that uses the same backing data, but acts as if the timestamps have all been extended with an additional coordinate with the default value. The resulting collection does not vary with the new timestamp coordinate.

Source

pub fn filter<F>(&self, logic: F) -> Arranged<G, TraceFilter<Tr, F>>
where F: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> bool + Clone + 'static,

Filters an arranged collection.

This method produces a new arrangement backed by the same shared arrangement as self, paired with user-specified logic that can filter by key and value. The resulting collection is restricted to the keys and values that return true under the user predicate.

§Examples
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;

::timely::example(|scope| {

    let arranged =
    scope.new_collection_from(0 .. 10).1
         .map(|x| (x, x+1))
         .arrange_by_key();

    arranged
        .filter(|k,v| k == v)
        .as_collection(|k,v| (*k,*v))
        .assert_empty();
});
Source

pub fn as_collection<D, L>( &self, logic: L, ) -> Collection<G, D, <Tr as TraceReader>::Diff>
where D: Data, L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> D + 'static,

Flattens the stream into a Collection.

The underlying Stream<G, BatchWrapper<T::Batch>> is a much more efficient way to access the data, and this method should only be used when the data need to be transformed or exchanged, rather than supplied as arguments to an operator using the same key-value structure.

Source

pub fn flat_map_ref<I, L>( &self, logic: L, ) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>
where I: IntoIterator, <I as IntoIterator>::Item: Data, L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,

Extracts elements from an arrangement as a collection.

The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.

Source

pub fn flat_map_batches<I, L>( stream: &StreamCore<G, Vec<<Tr as TraceReader>::Batch>>, logic: L, ) -> Collection<G, <I as IntoIterator>::Item, <Tr as TraceReader>::Diff>
where I: IntoIterator, <I as IntoIterator>::Item: Data, L: FnMut(<Tr as TraceReader>::Key<'_>, <Tr as TraceReader>::Val<'_>) -> I + 'static,

Extracts elements from a stream of batches as a collection.

The supplied logic may produce an iterator over output values, allowing either filtering or flat mapping as part of the extraction.

This method exists for streams of batches without the corresponding arrangement. If you have the arrangement, its flat_map_ref method is equivalent to this.

Trait Implementations

Source§

impl<G, T, R> ArrangementSize for Arranged<G, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>>>
where G: Scope<Timestamp = T>, G::Timestamp: Lattice + Ord + Columnation, T: Lattice + Timestamp, R: Semigroup + Ord + Columnation + 'static,

Source§

fn log_arrangement_size(self) -> Self

Install a logger to track the heap size of the target.
Source§

impl<G, Tr> Clone for Arranged<G, Tr>
where G: Scope<Timestamp = <Tr as TraceReader>::Time>, Tr: TraceReader + Clone,

Source§

fn clone(&self) -> Arranged<G, Tr>

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<G, K, T1, R> Count<G, K, R> for Arranged<G, T1>
where R: Data + Semigroup, G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R> + Clone + 'static, <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, K: Data,

Source§

fn count_core<R2>(&self) -> Collection<G, (K, R), R2>
where R2: Ord + Abelian + From<i8> + 'static,

Count for general integer differences. Read more
Source§

fn count(&self) -> Collection<G, (K, R)>

Counts the number of occurrences of each element. Read more
Source§

impl<G, K, T1> CountTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>
where G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: for<'a> TraceReader<Val<'a> = &'a ()> + Clone + 'static, <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, <T1 as TraceReader>::Diff: for<'a> Semigroup<<T1 as TraceReader>::DiffGat<'a>> + ExchangeData, K: ExchangeData, <T1 as TraceReader>::Time: TotalOrder,

Source§

fn count_total_core<R2>( &self, ) -> Collection<G, (K, <T1 as TraceReader>::Diff), R2>
where R2: Semigroup + From<i8> + 'static,

Count for general integer differences. Read more
Source§

fn count_total(&self) -> Collection<G, (K, R)>

Counts the number of occurrences of each element. Read more
Source§

impl<G, K, V, Tr> Join<G, K, V, <Tr as TraceReader>::Diff> for Arranged<G, Tr>
where G: Scope<Timestamp = <Tr as TraceReader>::Time>, Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static, K: ExchangeData + Hashable, V: Data + 'static,

Source§

fn join_map<V2, R2, D, L>( &self, other: &Collection<G, (K, V2), R2>, logic: L, ) -> Collection<G, D, <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>
where V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, <Tr as TraceReader>::Diff: Multiply<R2>, <<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static, L: for<'a> FnMut(<Tr as TraceReader>::Key<'a>, <Tr as TraceReader>::Val<'a>, &V2) -> D + 'static,

Matches pairs (key,val1) and (key,val2) based on key and then applies a function. Read more
Source§

fn semijoin<R2>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), <<Tr as TraceReader>::Diff as Multiply<R2>>::Output>
where R2: ExchangeData + Semigroup, <Tr as TraceReader>::Diff: Multiply<R2>, <<Tr as TraceReader>::Diff as Multiply<R2>>::Output: Semigroup + 'static,

Matches pairs (key, val) and key based on key, producing the former with frequencies multiplied. Read more
Source§

fn antijoin<R2>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), <Tr as TraceReader>::Diff>
where R2: ExchangeData + Semigroup, <Tr as TraceReader>::Diff: Multiply<R2, Output = <Tr as TraceReader>::Diff> + Abelian + 'static,

Subtracts the semijoin with other from self. Read more
Source§

fn join<V2, R2>( &self, other: &Collection<G, (K, V2), R2>, ) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>
where K: ExchangeData, V2: ExchangeData, R2: ExchangeData + Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup + 'static,

Matches pairs (key,val1) and (key,val2) based on key and yields pairs (key, (val1, val2)). Read more
Source§

impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
where G::Timestamp: Lattice + Ord, G: Scope, T1: TraceReader<Time = G::Timestamp> + Clone + 'static, T1::Diff: Semigroup,

Source§

fn mz_reduce_abelian<L, K, V, Bu, T2>( &self, name: &str, logic: L, ) -> Arranged<G, TraceAgent<T2>>
where T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static, K: Data, V: Data, for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>, T2::Diff: Abelian, T2::Batch: Batch, Bu: Builder<Time = G::Timestamp, Output = T2::Batch>, Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static, Arranged<G, TraceAgent<T2>>: ArrangementSize,

Applies reduce to arranged data, and returns an arrangement of output data.

Source§

impl<G, K, V, T1, R> Reduce<G, K, V, R> for Arranged<G, T1>
where V: Data, R: Ord + Semigroup + 'static, G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Diff = R> + Clone + 'static, <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, <T1 as TraceReader>::Val<'a>: for<'a> IntoOwned<'a, Owned = V>, K: Data,

Source§

fn reduce_named<L, V2, R2>( &self, name: &str, logic: L, ) -> Collection<G, (K, V2), R2>
where V2: Data, R2: Ord + Abelian + 'static, L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,

As reduce with the ability to name the operator.
Source§

fn reduce<L, V2, R2>(&self, logic: L) -> Collection<G, (K, V2), R2>
where V2: Data, R2: Ord + Abelian + 'static, L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,

Applies a reduction function on records grouped by key. Read more
Source§

impl<G: Scope, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
where G::Timestamp: Lattice + Ord, Tr: TraceReader<Time = G::Timestamp> + Clone + 'static, Tr::Diff: Semigroup,

Source§

fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>( &self, name1: &str, name2: &str, logic1: L1, logic2: L2, ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where K: Data, T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static, for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>, T1::Diff: Abelian, T1::Batch: Batch, Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>, Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>, L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static, V1: Data, T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static, for<'a, 'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>, T2::Diff: Abelian, T2::Batch: Batch, Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>, Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>, L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static, V2: Data, Arranged<G, TraceAgent<T1>>: ArrangementSize, Arranged<G, TraceAgent<T2>>: ArrangementSize,

This method produces a reduction pair based on the same input arrangement. Each reduction in the pair operates with its own logic and the two output arrangements from the reductions are produced as a result. The method is useful for reductions that need to present different output views on the same input data. An example is producing an error-free reduction output along with a separate error output indicating when the error-free output is valid.
Source§

impl<G, K, T1, R1> Threshold<G, K, R1> for Arranged<G, T1>
where R1: Semigroup, G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a (), Diff = R1> + Clone + 'static, <T1 as TraceReader>::Key<'a>: for<'a> IntoOwned<'a, Owned = K>, K: Data,

Source§

fn threshold_named<R2, F>(&self, name: &str, thresh: F) -> Collection<G, K, R2>
where R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static,

A threshold with the ability to name the operator.
Source§

fn threshold<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
where R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static,

Transforms the multiplicity of records. Read more
Source§

fn distinct(&self) -> Collection<G, K>

Reduces the collection to one occurrence of each distinct element. Read more
Source§

fn distinct_core<R2>(&self) -> Collection<G, K, R2>
where R2: Ord + Abelian + 'static + From<i8>,

Distinct for general integer differences. Read more
Source§

impl<G, K, T1> ThresholdTotal<G, K, <T1 as TraceReader>::Diff> for Arranged<G, T1>
where G: Scope<Timestamp = <T1 as TraceReader>::Time>, T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a ()> + Clone + 'static, <T1 as TraceReader>::Diff: for<'a> Semigroup<<T1 as TraceReader>::DiffGat<'a>> + ExchangeData, K: ExchangeData, <T1 as TraceReader>::Time: TotalOrder,

Source§

fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
where R2: Semigroup + 'static, F: for<'a> FnMut(<T1 as TraceReader>::Key<'a>, &<T1 as TraceReader>::Diff, Option<&<T1 as TraceReader>::Diff>) -> Option<R2> + 'static,

Reduces the collection to one occurrence of each distinct element.
Source§

fn threshold_total<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
where R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static,

Reduces the collection to one occurrence of each distinct element. Read more
Source§

fn distinct_total(&self) -> Collection<G, K>

Reduces the collection to one occurrence of each distinct element. Read more
Source§

fn distinct_total_core<R2>(&self) -> Collection<G, K, R2>
where R2: Abelian + From<i8> + 'static,

Distinct for general integer differences. Read more
Source§

impl<S, Tr> WithStartSignal for Arranged<S, Tr>

Source§

fn with_start_signal(self, signal: StartSignal) -> Self

Delays data and progress updates until the start signal has fired. Read more