differential_dataflow/algorithms/identifiers.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
//! Assign unique identifiers to records.
use timely::dataflow::Scope;
use crate::{Collection, ExchangeData, Hashable};
use crate::lattice::Lattice;
use crate::operators::*;
use crate::difference::Abelian;
/// Assign unique identifiers to elements of a collection.
pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {
/// Assign unique identifiers to elements of a collection.
///
/// # Example
/// ```
/// use differential_dataflow::input::Input;
/// use differential_dataflow::algorithms::identifiers::Identifiers;
/// use differential_dataflow::operators::Threshold;
///
/// ::timely::example(|scope| {
///
/// let identifiers =
/// scope.new_collection_from(1 .. 10).1
/// .identifiers()
/// // assert no conflicts
/// .map(|(data, id)| id)
/// .threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
/// .assert_empty();
/// });
/// ```
fn identifiers(&self) -> Collection<G, (D, u64), R>;
}
impl<G, D, R> Identifiers<G, D, R> for Collection<G, D, R>
where
G: Scope,
G::Timestamp: Lattice,
D: ExchangeData+::std::hash::Hash,
R: ExchangeData+Abelian,
{
fn identifiers(&self) -> Collection<G, (D, u64), R> {
// The design here is that we iteratively develop a collection
// of pairs (round, record), where each pair is a proposal that
// the hash for record should be (round, record).hashed().
//
// Iteratively, any colliding pairs establish a winner (the one
// with the lower round, breaking ties by record), and indicate
// that the losers should increment their round and try again.
//
// Non-obviously, this happens via a `reduce` operator that yields
// additions and subtractions of losers, rather than reproducing
// the winners. This is done under the premise that losers are
// very rare, and maintaining winners in both the input and output
// of `reduce` is an unnecessary duplication.
use crate::collection::AsCollection;
let init = self.map(|record| (0, record));
timely::dataflow::operators::generic::operator::empty(&init.scope())
.as_collection()
.iterate(|diff|
init.enter(&diff.scope())
.concat(diff)
.map(|pair| (pair.hashed(), pair))
.reduce(|_hash, input, output| {
// keep round-positive records as changes.
let ((round, record), count) = &input[0];
if *round > 0 {
let mut neg_count = count.clone();
neg_count.negate();
output.push(((0, record.clone()), neg_count));
output.push(((*round, record.clone()), count.clone()));
}
// if any losers, increment their rounds.
for ((round, record), count) in input[1..].iter() {
let mut neg_count = count.clone();
neg_count.negate();
output.push(((0, record.clone()), neg_count));
output.push(((*round+1, record.clone()), count.clone()));
}
})
.map(|(_hash, pair)| pair)
)
.concat(&init)
.map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
}
}
#[cfg(test)]
mod tests {
#[test]
fn are_unique() {
// It is hard to test the above method, because we would want
// to exercise the case with hash collisions. Instead, we test
// a version with a crippled hash function to see that even if
// there are collisions, everyone gets a unique identifier.
use crate::input::Input;
use crate::operators::{Threshold, Reduce};
use crate::operators::iterate::Iterate;
::timely::example(|scope| {
let input = scope.new_collection_from(1 .. 4).1;
use crate::collection::AsCollection;
let init = input.map(|record| (0, record));
timely::dataflow::operators::generic::operator::empty(&init.scope())
.as_collection()
.iterate(|diff|
init.enter(&diff.scope())
.concat(&diff)
.map(|(round, num)| ((round + num) / 10, (round, num)))
.reduce(|_hash, input, output| {
println!("Input: {:?}", input);
// keep round-positive records as changes.
let ((round, record), count) = &input[0];
if *round > 0 {
output.push(((0, record.clone()), -*count));
output.push(((*round, record.clone()), *count));
}
// if any losers, increment their rounds.
for ((round, record), count) in input[1..].iter() {
output.push(((0, record.clone()), -*count));
output.push(((*round+1, record.clone()), *count));
}
})
.inspect(|x| println!("{:?}", x))
.map(|(_hash, pair)| pair)
)
.concat(&init)
.map(|(round, num)| { (num, (round + num) / 10) })
.map(|(_data, id)| id)
.threshold(|_id,cnt| if cnt > &1 { *cnt } else { 0 })
.assert_empty();
});
}
}