1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Differential dataflow is a high-throughput, low-latency data-parallel programming framework.
//!
//! Differential dataflow programs are written in a collection-oriented style, where you transform
//! collections of records using traditional operations like `map`, `filter`, `join`, and `reduce`.
//! Differential dataflow also includes the less traditional operation `iterate`, which allows you
//! to repeatedly apply differential dataflow transformations to collections.
//!
//! Once you have defined a differential dataflow computation, you may then add records to or remove
//! records from its inputs; the system will automatically update the computation's outputs with the
//! appropriate corresponding additions and removals, and report these changes to you.
//!
//! Differential dataflow is built on the [timely dataflow](https://github.com/frankmcsherry/timely-dataflow)
//! framework for data-parallel programming which automatically parallelizes across multiple threads,
//! processes, and computers. Furthermore, because it uses timely dataflow's primitives, it seamlessly
//! inter-operates with other timely dataflow computations.
//!
//! Differential dataflow is still very much a work in progress, with features and ergonomics still
//! wildly in development. It is generally improving, though.
//!
//! # Examples
//!
//! This fragment creates a collection of pairs of integers, imagined as graph edges, and then counts
//! first the number of times the source coordinate occurs, and then the number of times each count
//! occurs, giving us a sense for the distribution of degrees in the graph.
//!
//! ```ignore
//! // create a degree counting differential dataflow
//! let (mut input, probe) = worker.dataflow(|scope| {
//!
//!     // create edge input, count a few ways.
//!     let (input, edges) = scope.new_collection();
//!
//!     // extract the source field, and then count.
//!     let degrs = edges.map(|(src, _dst)| src)
//!                      .count();
//!
//!     // extract the count field, and then count them.
//!     let distr = degrs.map(|(_src, cnt)| cnt)
//!                      .count();
//!
//!     // report the changes to the count collection, notice when done.
//!     let probe = distr.inspect(|x| println!("observed: {:?}", x))
//!                      .probe();
//!
//!     (input, probe)
//! });
//! ```
//!
//! Now assembled, we can drive the computation like a timely dataflow computation, by pushing update
//! records (triples of data, time, and change in count) at the `input` stream handle. The `probe` is
//! how timely dataflow tells us that we have seen all corresponding output updates (in case there are
//! none).
//!
//! ```ignore
//! loop {
//!     let time = input.epoch();
//!     for round in time .. time + 100 {
//!         input.advance_to(round);
//!         input.insert((round % 13, round % 7));
//!     }
//!
//!     input.flush();
//!     while probe.less_than(input.time()) {
//!        worker.step();
//!     }
//! }
//! ```
//!
//! This example should print out the 100 changes in the output, in this case each reflecting the increase
//! of some node degree by one (typically four output changes, corresponding to the addition and deletion
//! of the new and old counts of the old and new degrees of the affected node).

#![forbid(missing_docs)]
#![allow(array_into_iter)]


use std::fmt::Debug;

pub use collection::{Collection, AsCollection};
pub use hashable::Hashable;
pub use difference::Abelian as Diff;

/// Data type usable in differential dataflow.
///
/// Most differential dataflow operators require the ability to cancel corresponding updates, and the
/// way that they do this is by putting the data in a canonical form. The `Ord` trait allows us to sort
/// the data, at which point we can consolidate updates for equivalent records.
pub trait Data : timely::Data + Ord + Debug { }
impl<T: timely::Data + Ord + Debug> Data for T { }

/// Data types exchangeable in differential dataflow.
pub trait ExchangeData : timely::ExchangeData + Ord + Debug { }
impl<T: timely::ExchangeData + Ord + Debug> ExchangeData for T { }

pub mod hashable;
pub mod operators;
pub mod algorithms;
pub mod lattice;
pub mod trace;
pub mod input;
pub mod difference;
pub mod dynamic;
pub mod collection;
pub mod logging;
pub mod consolidation;
pub mod capture;

/// Configuration options for differential dataflow.
#[derive(Default)]
pub struct Config {
    /// An amount of arrangement effort to spend each scheduling quantum.
    ///
    /// The default value of `None` will not schedule operators that maintain arrangements
    /// other than when computation is required. Setting the value to `Some(effort)` will
    /// cause these operators to reschedule themselves as long as their arrangemnt has not
    /// reached a compact representation, and each scheduling quantum they will perform
    /// compaction work as if `effort` records had been added to the arrangement.
    pub idle_merge_effort: Option<isize>
}

impl Config {
    /// Assign an amount of effort to apply to idle arrangement operators.
    pub fn idle_merge_effort(mut self, effort: Option<isize>) -> Self {
        self.idle_merge_effort = effort;
        self
    }
}

/// Introduces differential options to a timely configuration.
pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
    if let Some(effort) = options.idle_merge_effort {
        config.set("differential/idle_merge_effort".to_string(), effort);
        config.set::<trace::ExertionLogic>(
            "differential/default_exert_logic".to_string(),
            std::sync::Arc::new(move |batches| {
                let mut non_empty = 0;
                for (_index, count, length) in batches {
                    if *count > 1 { return Some(effort as usize); }
                    if *length > 0 { non_empty += 1; }
                    if non_empty > 1 { return Some(effort as usize); }
                }
                None
            }),
        );
    }
}