pub trait ConsolidateStream<D: ExchangeData + Hashable> {
fn consolidate_stream(&self) -> Self;
}
Expand description
An extension method for consolidating weighted streams.
Required Methods§
sourcefn consolidate_stream(&self) -> Self
fn consolidate_stream(&self) -> Self
Aggregates the weights of equal records.
Unlike consolidate
, this method does not exchange data and does not
ensure that at most one copy of each (data, time)
pair exists in the
results. Instead, it acts on each batch of data and collapses equivalent
(data, time)
pairs found therein, suppressing any that accumulate to
zero.
Examples
extern crate timely;
extern crate differential_dataflow;
use differential_dataflow::input::Input;
use differential_dataflow::operators::consolidate::ConsolidateStream;
fn main() {
::timely::example(|scope| {
let x = scope.new_collection_from(1 .. 10u32).1;
// nothing to assert, as no particular guarantees.
x.negate()
.concat(&x)
.consolidate_stream();
});
}