differential_dataflow/algorithms/graphs/propagate.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
//! Directed label reachability.
use std::hash::Hash;
use timely::dataflow::*;
use crate::{Collection, ExchangeData};
use crate::lattice::Lattice;
use crate::difference::{Abelian, Multiply};
use crate::operators::arrange::arrangement::ArrangeByKey;
/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
{
propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
}
/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
F: Fn(&L)->u64+Clone+'static,
{
propagate_core(&edges.arrange_by_key(), nodes, logic)
}
use crate::trace::TraceReader;
use crate::operators::arrange::arrangement::Arranged;
/// Propagates labels forward, retaining the minimum label.
///
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
/// a method `logic` to specify the rounds in which we introduce various labels. The output
/// of `logic should be a number in the interval [0,64],
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
// dataflow graph and reduced memory footprint we instead have a wordier version below. The core differences
// between the two are that 1. the former filters its input and pretends to perform non-monotonic computation,
// whereas the latter creates an initially empty monotonic iteration variable, and 2. the latter rotates the
// iterative computation so that the arrangement produced by `reduce` can be re-used.
// nodes.filter(|_| false)
// .iterate(|inner| {
// let edges = edges.enter(&inner.scope());
// let nodes = nodes.enter_at(&inner.scope(), move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
// inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
// .concat(&nodes)
// .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
// })
nodes.scope().iterative::<usize,_,_>(|scope| {
use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::{ValBuilder, ValSpine};
use timely::order::Product;
let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
let propagate: Collection<_, (N, L), R> =
labels
.join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
proposals.set(&propagate);
labels
.as_collection(|k,v| (k.clone(), v.clone()))
.leave()
})
}