pub trait ConsolidateStream<D: ExchangeData + Hashable> {
    fn consolidate_stream(&self) -> Self;
}
Expand description

An extension method for consolidating weighted streams.

Required Methods§

Aggregates the weights of equal records.

Unlike consolidate, this method does not exchange data and does not ensure that at most one copy of each (data, time) pair exists in the results. Instead, it acts on each batch of data and collapses equivalent (data, time) pairs found therein, suppressing any that accumulate to zero.

Examples
extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::input::Input;
use differential_dataflow::operators::consolidate::ConsolidateStream;

fn main() {
    ::timely::example(|scope| {

        let x = scope.new_collection_from(1 .. 10u32).1;

        // nothing to assert, as no particular guarantees.
        x.negate()
         .concat(&x)
         .consolidate_stream();
    });
}

Implementors§