differential_dataflow/lib.rs
1//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
2//!
3//! Differential dataflow programs are written in a collection-oriented style, where you transform
4//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`.
5//! Differential dataflow also includes the less traditional operation `iterate`, which allows you
6//! to repeatedly apply differential dataflow transformations to collections.
7//!
8//! Once you have defined a differential dataflow computation, you may then add records to or remove
9//! records from its inputs; the system will automatically update the computation's outputs with the
10//! appropriate corresponding additions and removals, and report these changes to you.
11//!
12//! Differential dataflow is built on the [timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
13//! framework for data-parallel programming which automatically parallelizes across multiple threads,
14//! processes, and computers. Furthermore, because it uses timely dataflow's primitives, it seamlessly
15//! inter-operates with other timely dataflow computations.
16//!
17//! Differential dataflow is still very much a work in progress, with features and ergonomics still
18//! wildly in development. It is generally improving, though.
19//!
20//! # Examples
21//!
22//! This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts
23//! first the number of times the source coordinate occurs, and then the number of times each count
24//! occurs, giving us a sense for the distribution of degrees in the graph.
25//!
26//! ```ignore
27//! // create a degree counting differential dataflow
28//! let (mut input, probe) = worker.dataflow(|scope| {
29//!
30//! // create edge input, count a few ways.
31//! let (input, edges) = scope.new_collection();
32//!
33//! // extract the source field, and then count.
34//! let degrs = edges.map(|(src, _dst)| src)
35//! .count();
36//!
37//! // extract the count field, and then count them.
38//! let distr = degrs.map(|(_src, cnt)| cnt)
39//! .count();
40//!
41//! // report the changes to the count collection, notice when done.
42//! let probe = distr.inspect(|x| println!("observed: {:?}", x))
43//! .probe();
44//!
45//! (input, probe)
46//! });
47//! ```
48//!
49//! Now assembled, we can drive the computation like a timely dataflow computation, by pushing update
50//! records (triples of data, time, and change in count) at the `input` stream handle. The `probe` is
51//! how timely dataflow tells us that we have seen all corresponding output updates (in case there are
52//! none).
53//!
54//! ```ignore
55//! loop {
56//! let time = input.epoch();
57//! for round in time .. time + 100 {
58//! input.advance_to(round);
59//! input.insert((round % 13, round % 7));
60//! }
61//!
62//! input.flush();
63//! while probe.less_than(input.time()) {
64//! worker.step();
65//! }
66//! }
67//! ```
68//!
69//! This example should print out the 100 changes in the output, in this case each reflecting the increase
70//! of some node degree by one (typically four output changes, corresponding to the addition and deletion
71//! of the new and old counts of the old and new degrees of the affected node).
72
73#![forbid(missing_docs)]
74#![allow(array_into_iter)]
75
76
77use std::fmt::Debug;
78
79pub use collection::{Collection, AsCollection};
80pub use hashable::Hashable;
81pub use difference::Abelian as Diff;
82
83/// Data type usable in differential dataflow.
84///
85/// Most differential dataflow operators require the ability to cancel corresponding updates, and the
86/// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort
87/// the data, at which point we can consolidate updates for equivalent records.
88pub trait Data : timely::Data + Ord + Debug { }
89impl<T: timely::Data + Ord + Debug> Data for T { }
90
91/// Data types exchangeable in differential dataflow.
92pub trait ExchangeData : timely::ExchangeData + Ord + Debug { }
93impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }
94
95pub mod hashable;
96pub mod operators;
97pub mod algorithms;
98pub mod lattice;
99pub mod trace;
100pub mod input;
101pub mod difference;
102pub mod dynamic;
103pub mod collection;
104pub mod logging;
105pub mod consolidation;
106pub mod capture;
107pub mod containers;
108
109/// Configuration options for differential dataflow.
110#[derive(Default)]
111pub struct Config {
112 /// An amount of arrangement effort to spend each scheduling quantum.
113 ///
114 /// The default value of `None` will not schedule operators that maintain arrangements
115 /// other than when computation is required. Setting the value to `Some(effort)` will
116 /// cause these operators to reschedule themselves as long as their arrangemnt has not
117 /// reached a compact representation, and each scheduling quantum they will perform
118 /// compaction work as if `effort` records had been added to the arrangement.
119 pub idle_merge_effort: Option<isize>
120}
121
122impl Config {
123 /// Assign an amount of effort to apply to idle arrangement operators.
124 pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
125 self.idle_merge_effort = effort;
126 self
127 }
128}
129
130/// Introduces differential options to a timely configuration.
131pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
132 if let Some(effort) = options.idle_merge_effort {
133 config.set("differential/idle_merge_effort".to_string(), effort);
134 config.set::<trace::ExertionLogic>(
135 "differential/default_exert_logic".to_string(),
136 std::sync::Arc::new(move |batches| {
137 let mut non_empty = 0;
138 for (_index, count, length) in batches {
139 if *count > 1 { return Some(effort as usize); }
140 if *length > 0 { non_empty += 1; }
141 if non_empty > 1 { return Some(effort as usize); }
142 }
143 None
144 }),
145 );
146 }
147}