differential_dataflow/
lib.rs

1//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
2//!
3//! Differential dataflow programs are written in a collection-oriented style, where you transform
4//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`.
5//! Differential dataflow also includes the less traditional operation `iterate`, which allows you
6//! to repeatedly apply differential dataflow transformations to collections.
7//!
8//! Once you have defined a differential dataflow computation, you may then add records to or remove
9//! records from its inputs; the system will automatically update the computation's outputs with the
10//! appropriate corresponding additions and removals, and report these changes to you.
11//!
12//! Differential dataflow is built on the [timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
13//! framework for data-parallel programming which automatically parallelizes across multiple threads,
14//! processes, and computers. Furthermore, because it uses timely dataflow's primitives, it seamlessly
15//! inter-operates with other timely dataflow computations.
16//!
17//! Differential dataflow is still very much a work in progress, with features and ergonomics still
18//! wildly in development. It is generally improving, though.
19//!
20//! # Examples
21//!
22//! This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts
23//! first the number of times the source coordinate occurs, and then the number of times each count
24//! occurs, giving us a sense for the distribution of degrees in the graph.
25//!
26//! ```ignore
27//! // create a degree counting differential dataflow
28//! let (mut input, probe) = worker.dataflow(|scope| {
29//!
30//!     // create edge input, count a few ways.
31//!     let (input, edges) = scope.new_collection();
32//!
33//!     // extract the source field, and then count.
34//!     let degrs = edges.map(|(src, _dst)| src)
35//!                      .count();
36//!
37//!     // extract the count field, and then count them.
38//!     let distr = degrs.map(|(_src, cnt)| cnt)
39//!                      .count();
40//!
41//!     // report the changes to the count collection, notice when done.
42//!     let probe = distr.inspect(|x| println!("observed: {:?}", x))
43//!                      .probe();
44//!
45//!     (input, probe)
46//! });
47//! ```
48//!
49//! Now assembled, we can drive the computation like a timely dataflow computation, by pushing update
50//! records (triples of data, time, and change in count) at the `input` stream handle. The `probe` is
51//! how timely dataflow tells us that we have seen all corresponding output updates (in case there are
52//! none).
53//!
54//! ```ignore
55//! loop {
56//!     let time = input.epoch();
57//!     for round in time .. time + 100 {
58//!         input.advance_to(round);
59//!         input.insert((round % 13, round % 7));
60//!     }
61//!
62//!     input.flush();
63//!     while probe.less_than(input.time()) {
64//!        worker.step();
65//!     }
66//! }
67//! ```
68//!
69//! This example should print out the 100 changes in the output, in this case each reflecting the increase
70//! of some node degree by one (typically four output changes, corresponding to the addition and deletion
71//! of the new and old counts of the old and new degrees of the affected node).
72
73#![forbid(missing_docs)]
74#![allow(array_into_iter)]
75
76
77use std::fmt::Debug;
78
79pub use collection::{Collection, AsCollection};
80pub use hashable::Hashable;
81pub use difference::Abelian as Diff;
82pub use into_owned::IntoOwned;
83
84/// Data type usable in differential dataflow.
85///
86/// Most differential dataflow operators require the ability to cancel corresponding updates, and the
87/// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort
88/// the data, at which point we can consolidate updates for equivalent records.
89pub trait Data : timely::Data + Ord + Debug { }
90impl<T: timely::Data + Ord + Debug> Data for T { }
91
92/// Data types exchangeable in differential dataflow.
93pub trait ExchangeData : timely::ExchangeData + Ord + Debug { }
94impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }
95
96pub mod hashable;
97pub mod operators;
98pub mod algorithms;
99pub mod lattice;
100pub mod trace;
101pub mod input;
102pub mod difference;
103pub mod dynamic;
104pub mod collection;
105pub mod logging;
106pub mod consolidation;
107pub mod capture;
108pub mod containers;
109mod into_owned;
110
111/// Configuration options for differential dataflow.
112#[derive(Default)]
113pub struct Config {
114    /// An amount of arrangement effort to spend each scheduling quantum.
115    ///
116    /// The default value of `None` will not schedule operators that maintain arrangements
117    /// other than when computation is required. Setting the value to `Some(effort)` will
118    /// cause these operators to reschedule themselves as long as their arrangemnt has not
119    /// reached a compact representation, and each scheduling quantum they will perform
120    /// compaction work as if `effort` records had been added to the arrangement.
121    pub idle_merge_effort: Option<isize>
122}
123
124impl Config {
125    /// Assign an amount of effort to apply to idle arrangement operators.
126    pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
127        self.idle_merge_effort = effort;
128        self
129    }
130}
131
132/// Introduces differential options to a timely configuration.
133pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
134    if let Some(effort) = options.idle_merge_effort {
135        config.set("differential/idle_merge_effort".to_string(), effort);
136        config.set::<trace::ExertionLogic>(
137            "differential/default_exert_logic".to_string(),
138            std::sync::Arc::new(move |batches| {
139                let mut non_empty = 0;
140                for (_index, count, length) in batches {
141                    if *count > 1 { return Some(effort as usize); }
142                    if *length > 0 { non_empty += 1; }
143                    if non_empty > 1 { return Some(effort as usize); }
144                }
145                None
146            }),
147        );
148    }
149}