Crate differential_dataflow

source ·
Expand description

Differential dataflow is a high-throughput, low-latency data-parallel programming framework.

Differential dataflow programs are written in a collection-oriented style, where you transform collections of records using traditional operations like map, filter, join, and reduce. Differential dataflow also includes the less traditional operation iterate, which allows you to repeatedly apply differential dataflow transformations to collections.

Once you have defined a differential dataflow computation, you may then add records to or remove records from its inputs; the system will automatically update the computation’s outputs with the appropriate corresponding additions and removals, and report these changes to you.

Differential dataflow is built on the timely dataflow framework for data-parallel programming which automatically parallelizes across multiple threads, processes, and computers. Furthermore, because it uses timely dataflow’s primitives, it seamlessly inter-operates with other timely dataflow computations.

Differential dataflow is still very much a work in progress, with features and ergonomics still wildly in development. It is generally improving, though.


This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts first the number of times the source coordinate occurs, and then the number of times each count occurs, giving us a sense for the distribution of degrees in the graph.

// create a degree counting differential dataflow
let (mut input, probe) = worker.dataflow(|scope| {

    // create edge input, count a few ways.
    let (input, edges) = scope.new_collection();

    // extract the source field, and then count.
    let degrs =|(src, _dst)| src)

    // extract the count field, and then count them.
    let distr =|(_src, cnt)| cnt)

    // report the changes to the count collection, notice when done.
    let probe = distr.inspect(|x| println!("observed: {:?}", x))

    (input, probe)

Now assembled, we can drive the computation like a timely dataflow computation, by pushing update records (triples of data, time, and change in count) at the input stream handle. The probe is how timely dataflow tells us that we have seen all corresponding output updates (in case there are none).

loop {
    let time = input.epoch();
    for round in time .. time + 100 {
        input.insert((round % 13, round % 7));

    while probe.less_than(input.time()) {

This example should print out the 100 changes in the output, in this case each reflecting the increase of some node degree by one (typically four output changes, corresponding to the addition and deletion of the new and old counts of the old and new degrees of the affected node).



  • Common algorithms constructed from differential dataflow operators.
  • Logic related to capture and replay of differential collections.
  • Types and traits associated with collections of data.
  • Common logic for the consolidation of vectors of Semigroups.
  • A type that can be treated as a difference.
  • Types and operators for dynamically scoped iterative dataflows.
  • Traits and types related to the distribution of data.
  • Input sessions for simplified collection updates.
  • Partially ordered elements with a least upper bound.
  • Loggers and logging events for differential dataflow.
  • Specialize differential dataflow operators.
  • Traits and datastructures representing a collection trace.


  • Configuration options for differential dataflow.


  • Data type usable in differential dataflow.
  • Data types exchangeable in differential dataflow.


  • Introduces differential options to a timely configuration.