Struct differential_dataflow::collection::Collection
source · pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
pub inner: StreamCore<G, C>,
/* private fields */
}
Expand description
A mutable collection of values of type D
The Collection
type is the core abstraction in differential dataflow programs. As you write your
differential dataflow computation, you write as if the collection is a static dataset to which you
apply functional transformations, creating new collections. Once your computation is written, you
are able to mutate the collection (by inserting and removing elements); differential dataflow will
propagate changes through your functional computation and report the corresponding changes to the
output collections.
Each collection has three generic parameters. The parameter G
is for the scope in which the
collection exists; as you write more complicated programs you may wish to introduce nested scopes
(e.g. for iteration) and this parameter tracks the scope (for timely dataflow’s benefit). The D
parameter is the type of data in your collection, for example String
, or (u32, Vec<Option<()>>)
.
The R
parameter represents the types of changes that the data undergo, and is most commonly (and
defaults to) isize
, representing changes to the occurrence count of each record.
Fields§
§inner: StreamCore<G, C>
The underlying timely dataflow stream.
This field is exposed to support direct timely dataflow manipulation when required, but it is not intended to be the idiomatic way to work with the collection.
Implementations§
source§impl<G, D, R> Collection<G, D, R>where
G: Scope,
G::Timestamp: Data + Lattice,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
impl<G, D, R> Collection<G, D, R>where
G: Scope,
G::Timestamp: Data + Lattice,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
Methods which require data be arrangeable.
sourcepub fn consolidate(&self) -> Self
pub fn consolidate(&self) -> Self
Aggregates the weights of equal records into at most one record.
This method uses the type D
’s hashed()
method to partition the data. The data are
accumulated in place, each held back until their timestamp has completed.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(1 .. 10u32).1;
x.negate()
.concat(&x)
.consolidate() // <-- ensures cancellation occurs
.assert_empty();
});
sourcepub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
pub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
As consolidate
but with the ability to name the operator and specify the trace type.
sourcepub fn consolidate_stream(&self) -> Self
pub fn consolidate_stream(&self) -> Self
Aggregates the weights of equal records.
Unlike consolidate
, this method does not exchange data and does not
ensure that at most one copy of each (data, time)
pair exists in the
results. Instead, it acts on each batch of data and collapses equivalent
(data, time)
pairs found therein, suppressing any that accumulate to
zero.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(1 .. 10u32).1;
// nothing to assert, as no particular guarantees.
x.negate()
.concat(&x)
.consolidate_stream();
});
source§impl<G, D, R, T, TOuter> Collection<G, D, R>
impl<G, D, R, T, TOuter> Collection<G, D, R>
sourcepub fn enter_dynamic(&self, _level: usize) -> Self
pub fn enter_dynamic(&self, _level: usize) -> Self
Enters a dynamically created scope which has level
timestamp coordinates.
sourcepub fn leave_dynamic(&self, level: usize) -> Self
pub fn leave_dynamic(&self, level: usize) -> Self
Leaves a dynamically created scope which has level
timestamp coordinates.
source§impl<G: Scope, D, R, C> Collection<G, D, R, C>
impl<G: Scope, D, R, C> Collection<G, D, R, C>
sourcepub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C>
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C>
Creates a new Collection from a timely dataflow stream.
This method seems to be rarely used, with the as_collection
method on streams being a more
idiomatic approach to convert timely streams to collections. Also, the input::Input
trait
provides a new_collection
method which will create a new collection for you without exposing
the underlying timely stream at all.
source§impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C>
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C>
sourcepub fn concat(&self, other: &Self) -> Self
pub fn concat(&self, other: &Self) -> Self
Creates a new collection accumulating the contents of the two collections.
Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.concat(&evens)
.assert_eq(&data);
});
sourcepub fn concatenate<I>(&self, sources: I) -> Selfwhere
I: IntoIterator<Item = Self>,
pub fn concatenate<I>(&self, sources: I) -> Selfwhere
I: IntoIterator<Item = Self>,
Creates a new collection accumulating the contents of the two collections.
Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.concatenate(Some(evens))
.assert_eq(&data);
});
sourcepub fn enter_region<'a>(
&self,
child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C>
pub fn enter_region<'a>( &self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>, ) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C>
This method is a specialization of enter
to the case where the nested scope is a region.
It removes the need for an operator that adjusts the timestamp.
sourcepub fn inspect_container<F>(&self, func: F) -> Self
pub fn inspect_container<F>(&self, func: F) -> Self
Applies a supplied function to each batch of updates.
This method is analogous to inspect
, but operates on batches and reveals the timestamp of the
timely dataflow capability associated with the batch of updates. The observed batching depends
on how the system executes, and may vary run to run.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect_container(|event| println!("event: {:?}", event));
});
sourcepub fn probe(&self) -> Handle<G::Timestamp>
pub fn probe(&self) -> Handle<G::Timestamp>
Attaches a timely dataflow probe to the output of a Collection.
This probe is used to determine when the state of the Collection has stabilized and can be read out.
sourcepub fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> Self
pub fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> Self
Attaches a timely dataflow probe to the output of a Collection.
This probe is used to determine when the state of the Collection has stabilized and all updates observed. In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a computation can wait until the probe has caught up to the input before introducing more rounds of data, to avoid swamping the system.
source§impl<G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<G, D, R>
impl<G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<G, D, R>
sourcepub fn map<D2, L>(&self, logic: L) -> Collection<G, D2, R>
pub fn map<D2, L>(&self, logic: L) -> Collection<G, D2, R>
Creates a new collection by applying the supplied function to each input element.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});
sourcepub fn map_in_place<L>(&self, logic: L) -> Collection<G, D, R>
pub fn map_in_place<L>(&self, logic: L) -> Collection<G, D, R>
Creates a new collection by applying the supplied function to each input element.
Although the name suggests in-place mutation, this function does not change the source collection,
but rather re-uses the underlying allocations in its implementation. The method is semantically
equivalent to map
, but can be more efficient.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});
sourcepub fn flat_map<I, L>(&self, logic: L) -> Collection<G, I::Item, R>
pub fn flat_map<I, L>(&self, logic: L) -> Collection<G, I::Item, R>
Creates a new collection by applying the supplied function to each input element and accumulating the results.
This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be warned that if the iterators produce substantial amounts of data, they are currently fully drained before attempting to consolidate the results.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.flat_map(|x| 0 .. x);
});
sourcepub fn filter<L>(&self, logic: L) -> Collection<G, D, R>
pub fn filter<L>(&self, logic: L) -> Collection<G, D, R>
Creates a new collection containing those input records satisfying the supplied predicate.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});
sourcepub fn explode<D2, R2, I, L>(
&self,
logic: L,
) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
pub fn explode<D2, R2, I, L>( &self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
Replaces each record with another, with a new difference type.
This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) and move the data into the difference component. This will allow differential dataflow to update in-place.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let nums = scope.new_collection_from(0 .. 10).1;
let x1 = nums.flat_map(|x| 0 .. x);
let x2 = nums.map(|x| (x, 9 - x))
.explode(|(x,y)| Some((x,y)));
x1.assert_eq(&x2);
});
sourcepub fn join_function<D2, R2, I, L>(
&self,
logic: L,
) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
pub fn join_function<D2, R2, I, L>( &self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
Joins each record against a collection defined by the function logic
.
This method performs what is essentially a join with the collection of records (x, logic(x))
.
Rather than materialize this second relation, logic
is applied to each record and the appropriate
modifications made to the results, namely joining timestamps and multiplying differences.
#Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// creates `x` copies of `2*x` from time `3*x` until `4*x`,
// for x from 0 through 9.
scope.new_collection_from(0 .. 10isize).1
.join_function(|x|
// data time diff
vec![(2*x, (3*x) as u64, x),
(2*x, (4*x) as u64, -x)]
);
});
sourcepub fn enter<'a, T>(
&self,
child: &Child<'a, G, T>,
) -> Collection<Child<'a, G, T>, D, R>
pub fn enter<'a, T>( &self, child: &Child<'a, G, T>, ) -> Collection<Child<'a, G, T>, D, R>
Brings a Collection into a nested scope.
§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let result = scope.region(|child| {
data.enter(child)
.leave()
});
data.assert_eq(&result);
});
sourcepub fn enter_at<'a, T, F>(
&self,
child: &Iterative<'a, G, T>,
initial: F,
) -> Collection<Iterative<'a, G, T>, D, R>
pub fn enter_at<'a, T, F>( &self, child: &Iterative<'a, G, T>, initial: F, ) -> Collection<Iterative<'a, G, T>, D, R>
Brings a Collection into a nested scope, at varying times.
The initial
function indicates the time at which each element of the Collection should appear.
§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let result = scope.iterative::<u64,_,_>(|child| {
data.enter_at(child, |x| *x)
.leave()
});
data.assert_eq(&result);
});
sourcepub fn delay<F>(&self, func: F) -> Collection<G, D, R>
pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
Delays each difference by a supplied function.
It is assumed that func
only advances timestamps; this is not verified, and things may go horribly
wrong if that assumption is incorrect. It is also critical that func
be monotonic: if two times are
ordered, they should have the same order once func
is applied to them (this is because we advance the
timely capability with the same logic, and it must remain less_equal
to all of the data timestamps).
sourcepub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
Applies a supplied function to each update.
This method is most commonly used to report information back to the user, often for debugging purposes. Any function can be used here, but be warned that the incremental nature of differential dataflow does not guarantee that it will be called as many times as you might expect.
The (data, time, diff)
triples indicate a change diff
to the frequency of data
which takes effect
at the logical time time
. When times are totally ordered (for example, usize
), these updates reflect
the changes along the sequence of collections. For partially ordered times, the mathematics are more
interesting and less intuitive, unfortunately.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect(|x| println!("error: {:?}", x));
});
sourcepub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
Applies a supplied function to each batch of updates.
This method is analogous to inspect
, but operates on batches and reveals the timestamp of the
timely dataflow capability associated with the batch of updates. The observed batching depends
on how the system executes, and may vary run to run.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
});
sourcepub fn assert_empty(&self)where
D: ExchangeData + Hashable,
R: ExchangeData + Hashable + Semigroup,
G::Timestamp: Lattice + Ord,
pub fn assert_empty(&self)where
D: ExchangeData + Hashable,
R: ExchangeData + Hashable + Semigroup,
G::Timestamp: Lattice + Ord,
Assert if the collection is ever non-empty.
Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collection could be non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});
source§impl<'a, G: Scope, T, D: Clone + 'static, R: Clone + 'static> Collection<Child<'a, G, T>, D, R>
impl<'a, G: Scope, T, D: Clone + 'static, R: Clone + 'static> Collection<Child<'a, G, T>, D, R>
Methods requiring a nested scope.
sourcepub fn leave(&self) -> Collection<G, D, R>
pub fn leave(&self) -> Collection<G, D, R>
Returns the final value of a Collection from a nested scope to its containing scope.
§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let result = scope.region(|child| {
data.enter(child)
.leave()
});
data.assert_eq(&result);
});
source§impl<'a, G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<Child<'a, G, G::Timestamp>, D, R>
impl<'a, G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<Child<'a, G, G::Timestamp>, D, R>
Methods requiring a region as the scope.
sourcepub fn leave_region(&self) -> Collection<G, D, R>
pub fn leave_region(&self) -> Collection<G, D, R>
Returns the value of a Collection from a nested region to its containing scope.
This method is a specialization of leave
to the case that of a nested region.
It removes the need for an operator that adjusts the timestamp.
source§impl<G: Scope, D: Clone + 'static, R: Abelian + 'static> Collection<G, D, R>
impl<G: Scope, D: Clone + 'static, R: Abelian + 'static> Collection<G, D, R>
Methods requiring an Abelian difference, to support negation.
sourcepub fn negate(&self) -> Collection<G, D, R>
pub fn negate(&self) -> Collection<G, D, R>
Creates a new collection whose counts are the negation of those in the input.
This method is most commonly used with concat
to get those element in one collection but not another.
However, differential dataflow computations are still defined for all values of the difference type R
,
including negative counts.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.negate()
.concat(&data)
.assert_eq(&evens);
});
sourcepub fn assert_eq(&self, other: &Self)
pub fn assert_eq(&self, other: &Self)
Assert if the collections are ever different.
Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collections could vary. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.concat(&evens)
.assert_eq(&data);
});
Trait Implementations§
source§impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Arrange<G, Vec<((K, ()), <G as ScopeParent>::Timestamp, R)>> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Arrange<G, Vec<((K, ()), <G as ScopeParent>::Timestamp, R)>> for Collection<G, K, R>
source§fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
source§impl<G, K, V, R> Arrange<G, Vec<((K, V), <G as ScopeParent>::Timestamp, R)>> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> Arrange<G, Vec<((K, V), <G as ScopeParent>::Timestamp, R)>> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
source§fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
source§impl<G: Scope, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K, V), R>
impl<G: Scope, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K, V), R>
source§fn arrange_by_key(
&self,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
fn arrange_by_key( &self, ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
source§fn arrange_by_key_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
fn arrange_by_key_named( &self, name: &str, ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
arrange_by_key
but with the ability to name the arrangement.source§impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
source§fn arrange_by_self(
&self,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
fn arrange_by_self( &self, ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
source§fn arrange_by_self_named(
&self,
name: &str,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
fn arrange_by_self_named( &self, name: &str, ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
arrange_by_self
but with the ability to name the arrangement.source§impl<G: Clone + Scope, D: Clone, R: Clone, C: Clone> Clone for Collection<G, D, R, C>
impl<G: Clone + Scope, D: Clone, R: Clone, C: Clone> Clone for Collection<G, D, R, C>
source§fn clone(&self) -> Collection<G, D, R, C>
fn clone(&self) -> Collection<G, D, R, C>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Count<G, K, R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Count<G, K, R> for Collection<G, K, R>
source§impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
source§fn count_total_core<R2: Semigroup + From<i8> + 'static>(
&self,
) -> Collection<G, (K, R), R2>
fn count_total_core<R2: Semigroup + From<i8> + 'static>( &self, ) -> Collection<G, (K, R), R2>
source§fn count_total(&self) -> Collection<G, (K, R), isize>
fn count_total(&self) -> Collection<G, (K, R), isize>
source§impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
source§fn identifiers(&self) -> Collection<G, (D, u64), R>
fn identifiers(&self) -> Collection<G, (D, u64), R>
source§impl<G: Scope, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R> for Collection<G, D, R>
impl<G: Scope, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R> for Collection<G, D, R>
source§fn iterate<F>(&self, logic: F) -> Collection<G, D, R>where
G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>) -> Collection<Iterative<'a, G, u64>, D, R>,
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>where
G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>) -> Collection<Iterative<'a, G, u64>, D, R>,
logic
to the source collection until convergence. Read moresource§impl<G, K, V, R> Join<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
G::Timestamp: Lattice + Ord,
impl<G, K, V, R> Join<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
G::Timestamp: Lattice + Ord,
source§fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
&self,
other: &Collection<G, (K, V2), R2>,
logic: L,
) -> Collection<G, D, <R as Multiply<R2>>::Output>
fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>( &self, other: &Collection<G, (K, V2), R2>, logic: L, ) -> Collection<G, D, <R as Multiply<R2>>::Output>
source§fn semijoin<R2: ExchangeData + Semigroup>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
fn semijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
(key, val)
and key
based on key
, producing the former with frequencies multiplied. Read moresource§fn antijoin<R2: ExchangeData + Semigroup>(
&self,
other: &Collection<G, K, R2>,
) -> Collection<G, (K, V), R>
fn antijoin<R2: ExchangeData + Semigroup>( &self, other: &Collection<G, K, R2>, ) -> Collection<G, (K, V), R>
source§fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
fn join<V2, R2>(
&self,
other: &Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2>,
<R as Multiply<R2>>::Output: Semigroup + 'static,
(key,val1)
and (key,val2)
based on key
and yields pairs (key, (val1, val2))
. Read moresource§impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
G::Timestamp: Lattice + Ord,
impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
G::Timestamp: Lattice + Ord,
source§fn join_core<Tr2, I, L>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
fn join_core<Tr2, I, L>( &self, stream2: &Arranged<G, Tr2>, result: L, ) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
source§fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
&self,
stream2: &Arranged<G, Tr2>,
result: L,
) -> Collection<G, D, ROut>
fn join_core_internal_unsafe<Tr2, I, L, D, ROut>( &self, stream2: &Arranged<G, Tr2>, result: L, ) -> Collection<G, D, ROut>
join_core
where the result
closure takes additional arguments for time
and
diff
as input and returns an iterator over (data, time, diff)
triplets. This allows for more
flexibility, but is more error-prone. Read moresource§impl<G, K, D> PrefixSum<G, K, D> for Collection<G, ((usize, K), D)>
impl<G, K, D> PrefixSum<G, K, D> for Collection<G, ((usize, K), D)>
source§fn prefix_sum<F>(&self, zero: D, combine: F) -> Self
fn prefix_sum<F>(&self, zero: D, combine: F) -> Self
source§fn prefix_sum_at<F>(
&self,
locations: Collection<G, (usize, K)>,
zero: D,
combine: F,
) -> Self
fn prefix_sum_at<F>( &self, locations: Collection<G, (usize, K)>, zero: D, combine: F, ) -> Self
location
.source§impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice + Ord,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice + Ord,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
source§fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
&self,
name: &str,
logic: L,
) -> Collection<G, (K, V2), R2>
fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>( &self, name: &str, logic: L, ) -> Collection<G, (K, V2), R2>
reduce
with the ability to name the operator.source§impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice + Ord,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>where
G: Scope,
G::Timestamp: Lattice + Ord,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
source§fn reduce_core<L, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
V: Data,
T2: for<'a> Trace<Key<'a> = &'a K, Time = G::Timestamp> + 'static,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
Bu: Builder<Time = T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static,
fn reduce_core<L, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
V: Data,
T2: for<'a> Trace<Key<'a> = &'a K, Time = G::Timestamp> + 'static,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
Bu: Builder<Time = T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static,
source§fn reduce_abelian<L, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
T2: for<'a> Trace<Key<'a> = &'a K, Time = G::Timestamp> + 'static,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>) + 'static,
fn reduce_abelian<L, Bu, T2>(
&self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>where
T2: for<'a> Trace<Key<'a> = &'a K, Time = G::Timestamp> + 'static,
for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
Bu: Builder<Time = T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>) + 'static,
reduce
to arranged data, and returns an arrangement of output data. Read moresource§impl<G: Scope, K: ExchangeData + Hashable, R1: ExchangeData + Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
impl<G: Scope, K: ExchangeData + Hashable, R1: ExchangeData + Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
source§fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
&self,
name: &str,
thresh: F,
) -> Collection<G, K, R2>
fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( &self, name: &str, thresh: F, ) -> Collection<G, K, R2>
threshold
with the ability to name the operator.source§fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
&self,
thresh: F,
) -> Collection<G, K, R2>
fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( &self, thresh: F, ) -> Collection<G, K, R2>
source§fn distinct(&self) -> Collection<G, K, isize>
fn distinct(&self) -> Collection<G, K, isize>
source§fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>(
&self,
) -> Collection<G, K, R2>
fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>( &self, ) -> Collection<G, K, R2>
source§impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
source§fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
source§fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>(
&self,
thresh: F,
) -> Collection<G, K, R2>
fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>( &self, thresh: F, ) -> Collection<G, K, R2>
source§fn distinct_total(&self) -> Collection<G, K, isize>
fn distinct_total(&self) -> Collection<G, K, isize>
source§fn distinct_total_core<R2: Abelian + From<i8> + 'static>(
&self,
) -> Collection<G, K, R2>
fn distinct_total_core<R2: Abelian + From<i8> + 'static>( &self, ) -> Collection<G, K, R2>
Auto Trait Implementations§
impl<G, D, R, C> Freeze for Collection<G, D, R, C>where
G: Freeze,
impl<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> !RefUnwindSafe for Collection<G, D, R, C>
impl<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> !Send for Collection<G, D, R, C>
impl<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> !Sync for Collection<G, D, R, C>
impl<G, D, R, C> Unpin for Collection<G, D, R, C>
impl<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> !UnwindSafe for Collection<G, D, R, C>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.