differential_dataflow/lib.rs
1//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
2//!
3//! Differential dataflow programs are written in a collection-oriented style, where you transform
4//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`.
5//! Differential dataflow also includes the less traditional operation `iterate`, which allows you
6//! to repeatedly apply differential dataflow transformations to collections.
7//!
8//! Once you have defined a differential dataflow computation, you may then add records to or remove
9//! records from its inputs; the system will automatically update the computation's outputs with the
10//! appropriate corresponding additions and removals, and report these changes to you.
11//!
12//! Differential dataflow is built on the [timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
13//! framework for data-parallel programming which automatically parallelizes across multiple threads,
14//! processes, and computers. Furthermore, because it uses timely dataflow's primitives, it seamlessly
15//! inter-operates with other timely dataflow computations.
16//!
17//! Differential dataflow is still very much a work in progress, with features and ergonomics still
18//! wildly in development. It is generally improving, though.
19//!
20//! # Examples
21//!
22//! This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts
23//! first the number of times the source coordinate occurs, and then the number of times each count
24//! occurs, giving us a sense for the distribution of degrees in the graph.
25//!
26//! ```ignore
27//! // create a degree counting differential dataflow
28//! let (mut input, probe) = worker.dataflow(|scope| {
29//!
30//! // create edge input, count a few ways.
31//! let (input, edges) = scope.new_collection();
32//!
33//! // extract the source field, and then count.
34//! let degrs = edges.map(|(src, _dst)| src)
35//! .count();
36//!
37//! // extract the count field, and then count them.
38//! let distr = degrs.map(|(_src, cnt)| cnt)
39//! .count();
40//!
41//! // report the changes to the count collection, notice when done.
42//! let probe = distr.inspect(|x| println!("observed: {:?}", x))
43//! .probe();
44//!
45//! (input, probe)
46//! });
47//! ```
48//!
49//! Now assembled, we can drive the computation like a timely dataflow computation, by pushing update
50//! records (triples of data, time, and change in count) at the `input` stream handle. The `probe` is
51//! how timely dataflow tells us that we have seen all corresponding output updates (in case there are
52//! none).
53//!
54//! ```ignore
55//! loop {
56//! let time = input.epoch();
57//! for round in time .. time + 100 {
58//! input.advance_to(round);
59//! input.insert((round % 13, round % 7));
60//! }
61//!
62//! input.flush();
63//! while probe.less_than(input.time()) {
64//! worker.step();
65//! }
66//! }
67//! ```
68//!
69//! This example should print out the 100 changes in the output, in this case each reflecting the increase
70//! of some node degree by one (typically four output changes, corresponding to the addition and deletion
71//! of the new and old counts of the old and new degrees of the affected node).
72
73#![forbid(missing_docs)]
74#![allow(array_into_iter)]
75
76
77use std::fmt::Debug;
78
79pub use collection::{Collection, AsCollection};
80pub use hashable::Hashable;
81pub use difference::Abelian as Diff;
82pub use into_owned::IntoOwned;
83
84/// Data type usable in differential dataflow.
85///
86/// Most differential dataflow operators require the ability to cancel corresponding updates, and the
87/// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort
88/// the data, at which point we can consolidate updates for equivalent records.
89pub trait Data : timely::Data + Ord + Debug { }
90impl<T: timely::Data + Ord + Debug> Data for T { }
91
92/// Data types exchangeable in differential dataflow.
93pub trait ExchangeData : timely::ExchangeData + Ord + Debug { }
94impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }
95
96pub mod hashable;
97pub mod operators;
98pub mod algorithms;
99pub mod lattice;
100pub mod trace;
101pub mod input;
102pub mod difference;
103pub mod dynamic;
104pub mod collection;
105pub mod logging;
106pub mod consolidation;
107pub mod capture;
108pub mod containers;
109mod into_owned;
110
111/// Configuration options for differential dataflow.
112#[derive(Default)]
113pub struct Config {
114 /// An amount of arrangement effort to spend each scheduling quantum.
115 ///
116 /// The default value of `None` will not schedule operators that maintain arrangements
117 /// other than when computation is required. Setting the value to `Some(effort)` will
118 /// cause these operators to reschedule themselves as long as their arrangemnt has not
119 /// reached a compact representation, and each scheduling quantum they will perform
120 /// compaction work as if `effort` records had been added to the arrangement.
121 pub idle_merge_effort: Option<isize>
122}
123
124impl Config {
125 /// Assign an amount of effort to apply to idle arrangement operators.
126 pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
127 self.idle_merge_effort = effort;
128 self
129 }
130}
131
132/// Introduces differential options to a timely configuration.
133pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
134 if let Some(effort) = options.idle_merge_effort {
135 config.set("differential/idle_merge_effort".to_string(), effort);
136 config.set::<trace::ExertionLogic>(
137 "differential/default_exert_logic".to_string(),
138 std::sync::Arc::new(move |batches| {
139 let mut non_empty = 0;
140 for (_index, count, length) in batches {
141 if *count > 1 { return Some(effort as usize); }
142 if *length > 0 { non_empty += 1; }
143 if non_empty > 1 { return Some(effort as usize); }
144 }
145 None
146 }),
147 );
148 }
149}