1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! Assign unique identifiers to records.

use timely::dataflow::Scope;

use crate::{Collection, ExchangeData, Hashable};
use crate::lattice::Lattice;
use crate::operators::*;
use crate::difference::Abelian;

/// Assign unique identifiers to elements of a collection.
pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
    /// Assign unique identifiers to elements of a collection.
    ///
    /// # Example
    /// ```
    /// use differential_dataflow::input::Input;
    /// use differential_dataflow::algorithms::identifiers::Identifiers;
    /// use differential_dataflow::operators::Threshold;
    ///
    /// ::timely::example(|scope| {
    ///
    ///     let identifiers =
    ///     scope.new_collection_from(1 .. 10).1
    ///          .identifiers()
    ///          // assert no conflicts
    ///          .map(|(data, id)| id)
    ///          .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
    ///          .assert_empty();
    /// });
    /// ```
    fn identifiers(&self) -> Collection<G, (D, u64), R>;
}

impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
where
    G: Scope,
    G::Timestamp: Lattice,
    D: ExchangeData+::std::hash::Hash,
    R: ExchangeData+Abelian,
{
    fn identifiers(&self) -> Collection<G, (D, u64), R> {

        // The design here is that we iteratively develop a collection
        // of pairs (round, record), where each pair is a proposal that
        // the hash for record should be (round, record).hashed().
        //
        // Iteratively, any colliding pairs establish a winner (the one
        // with the lower round, breaking ties by record), and indicate
        // that the losers should increment their round and try again.
        //
        // Non-obviously, this happens via a `reduce` operator that yields
        // additions and subtractions of losers, rather than reproducing
        // the winners. This is done under the premise that losers are
        // very rare, and maintaining winners in both the input and output
        // of `reduce` is an unneccesary duplication.

        use crate::collection::AsCollection;

        let init = self.map(|record| (0, record));
        timely::dataflow::operators::generic::operator::empty(&init.scope())
            .as_collection()
            .iterate(|diff|
                init.enter(&diff.scope())
                    .concat(diff)
                    .map(|pair| (pair.hashed(), pair))
                    .reduce(|_hash, input, output| {
                        // keep round-positive records as changes.
                        let ((round, record), count) = &input[0];
                        if *round > 0 {
                            output.push(((0, record.clone()), count.clone().negate()));
                            output.push(((*round, record.clone()), count.clone()));
                        }
                        // if any losers, increment their rounds.
                        for ((round, record), count) in input[1..].iter() {
                            output.push(((0, record.clone()), count.clone().negate()));
                            output.push(((*round+1, record.clone()), count.clone()));
                        }
                    })
                    .map(|(_hash, pair)| pair)
            )
            .concat(&init)
            .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
    }
}

#[cfg(test)]
mod tests {

    #[test]
    fn are_unique() {

        // It is hard to test the above method, because we would want
        // to exercise the case with hash collisions. Instead, we test
        // a version with a crippled hash function to see that even if
        // there are collisions, everyone gets a unique identifier.

        use crate::input::Input;
        use crate::operators::{Threshold, Reduce};
        use crate::operators::iterate::Iterate;

        ::timely::example(|scope| {

            let input = scope.new_collection_from(1 .. 4).1;

            use crate::collection::AsCollection;

            let init = input.map(|record| (0, record));
            timely::dataflow::operators::generic::operator::empty(&init.scope())
                .as_collection()
                .iterate(|diff|
                    init.enter(&diff.scope())
                        .concat(&diff)
                        .map(|(round, num)| ((round + num) / 10, (round, num)))
                        .reduce(|_hash, input, output| {
                            println!("Input: {:?}", input);
                            // keep round-positive records as changes.
                            let ((round, record), count) = &input[0];
                            if *round > 0 {
                                output.push(((0, record.clone()), -*count));
                                output.push(((*round, record.clone()), *count));
                            }
                            // if any losers, increment their rounds.
                            for ((round, record), count) in input[1..].iter() {
                                output.push(((0, record.clone()), -*count));
                                output.push(((*round+1, record.clone()), *count));
                            }
                        })
                        .inspect(|x| println!("{:?}", x))
                        .map(|(_hash, pair)| pair)
                )
                .concat(&init)
                .map(|(round, num)| { (num, (round + num) / 10) })
                .map(|(_data, id)| id)
                .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
                .assert_empty();
        });
    }
}