differential_dataflow/
lib.rs

1//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
2//!
3//! Differential dataflow programs are written in a collection-oriented style, where you transform
4//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`.
5//! Differential dataflow also includes the less traditional operation `iterate`, which allows you
6//! to repeatedly apply differential dataflow transformations to collections.
7//!
8//! Once you have defined a differential dataflow computation, you may then add records to or remove
9//! records from its inputs; the system will automatically update the computation's outputs with the
10//! appropriate corresponding additions and removals, and report these changes to you.
11//!
12//! Differential dataflow is built on the [timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
13//! framework for data-parallel programming which automatically parallelizes across multiple threads,
14//! processes, and computers. Furthermore, because it uses timely dataflow's primitives, it seamlessly
15//! inter-operates with other timely dataflow computations.
16//!
17//! Differential dataflow is still very much a work in progress, with features and ergonomics still
18//! wildly in development. It is generally improving, though.
19//!
20//! # Examples
21//!
22//! This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts
23//! first the number of times the source coordinate occurs, and then the number of times each count
24//! occurs, giving us a sense for the distribution of degrees in the graph.
25//!
26//! ```ignore
27//! // create a degree counting differential dataflow
28//! let (mut input, probe) = worker.dataflow(|scope| {
29//!
30//!     // create edge input, count a few ways.
31//!     let (input, edges) = scope.new_collection();
32//!
33//!     // extract the source field, and then count.
34//!     let degrs = edges.map(|(src, _dst)| src)
35//!                      .count();
36//!
37//!     // extract the count field, and then count them.
38//!     let distr = degrs.map(|(_src, cnt)| cnt)
39//!                      .count();
40//!
41//!     // report the changes to the count collection, notice when done.
42//!     let probe = distr.inspect(|x| println!("observed: {:?}", x))
43//!                      .probe();
44//!
45//!     (input, probe)
46//! });
47//! ```
48//!
49//! Now assembled, we can drive the computation like a timely dataflow computation, by pushing update
50//! records (triples of data, time, and change in count) at the `input` stream handle. The `probe` is
51//! how timely dataflow tells us that we have seen all corresponding output updates (in case there are
52//! none).
53//!
54//! ```ignore
55//! loop {
56//!     let time = input.epoch();
57//!     for round in time .. time + 100 {
58//!         input.advance_to(round);
59//!         input.insert((round % 13, round % 7));
60//!     }
61//!
62//!     input.flush();
63//!     while probe.less_than(input.time()) {
64//!        worker.step();
65//!     }
66//! }
67//! ```
68//!
69//! This example should print out the 100 changes in the output, in this case each reflecting the increase
70//! of some node degree by one (typically four output changes, corresponding to the addition and deletion
71//! of the new and old counts of the old and new degrees of the affected node).
72
73#![forbid(missing_docs)]
74#![allow(array_into_iter)]
75
76
77use std::fmt::Debug;
78
79pub use collection::{Collection, AsCollection};
80pub use hashable::Hashable;
81pub use difference::Abelian as Diff;
82
83/// Data type usable in differential dataflow.
84///
85/// Most differential dataflow operators require the ability to cancel corresponding updates, and the
86/// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort
87/// the data, at which point we can consolidate updates for equivalent records.
88pub trait Data : timely::Data + Ord + Debug { }
89impl<T: timely::Data + Ord + Debug> Data for T { }
90
91/// Data types exchangeable in differential dataflow.
92pub trait ExchangeData : timely::ExchangeData + Ord + Debug { }
93impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }
94
95pub mod hashable;
96pub mod operators;
97pub mod algorithms;
98pub mod lattice;
99pub mod trace;
100pub mod input;
101pub mod difference;
102pub mod dynamic;
103pub mod collection;
104pub mod logging;
105pub mod consolidation;
106pub mod capture;
107pub mod containers;
108
109/// Configuration options for differential dataflow.
110#[derive(Default)]
111pub struct Config {
112    /// An amount of arrangement effort to spend each scheduling quantum.
113    ///
114    /// The default value of `None` will not schedule operators that maintain arrangements
115    /// other than when computation is required. Setting the value to `Some(effort)` will
116    /// cause these operators to reschedule themselves as long as their arrangemnt has not
117    /// reached a compact representation, and each scheduling quantum they will perform
118    /// compaction work as if `effort` records had been added to the arrangement.
119    pub idle_merge_effort: Option<isize>
120}
121
122impl Config {
123    /// Assign an amount of effort to apply to idle arrangement operators.
124    pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
125        self.idle_merge_effort = effort;
126        self
127    }
128}
129
130/// Introduces differential options to a timely configuration.
131pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
132    if let Some(effort) = options.idle_merge_effort {
133        config.set("differential/idle_merge_effort".to_string(), effort);
134        config.set::<trace::ExertionLogic>(
135            "differential/default_exert_logic".to_string(),
136            std::sync::Arc::new(move |batches| {
137                let mut non_empty = 0;
138                for (_index, count, length) in batches {
139                    if *count > 1 { return Some(effort as usize); }
140                    if *length > 0 { non_empty += 1; }
141                    if non_empty > 1 { return Some(effort as usize); }
142                }
143                None
144            }),
145        );
146    }
147}