1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
//! Sequential (non-concurrent) graph algorithms.
use std::hash::Hash;
use timely::dataflow::*;
use crate::{Collection, ExchangeData};
use crate::lattice::Lattice;
use crate::operators::*;
use crate::hashable::Hashable;
fn _color<G, N>(edges: &Collection<G, (N,N)>) -> Collection<G,(N,Option<u32>)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
{
// need some bogus initial values.
let start = edges.map(|(x,_y)| (x,u32::max_value()))
.distinct();
// repeatedly apply color-picking logic.
sequence(&start, edges, |_node, vals| {
// look for the first absent positive integer.
// start at 1 in case we ever use NonZero<u32>.
(1u32 ..)
.find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.unwrap()
})
}
/// Applies `logic` to nodes sequentially, in order of node identifiers.
///
/// The `logic` function updates a node's state as a function of its
/// neighbor states. It will only be called on complete input.
///
/// Internally, this method performs a fixed-point computation in which
/// a node "fires" once all of its neighbors with lower identifier have
/// fired, and we apply `logic` to the new state of lower neighbors and
/// the old state (input) of higher neighbors.
pub fn sequence<G, N, V, F>(
state: &Collection<G, (N,V)>,
edges: &Collection<G, (N,N)>,
logic: F) -> Collection<G, (N,Option<V>)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
N: ExchangeData+Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)])->V+'static
{
// start iteration with None messages for all.
state
.map(|(node, _state)| (node, None))
.iterate(|new_state| {
// immutable content: edges and initial state.
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope());
// .map(|x| (x.0, Some(x.1)));
// break edges into forward and reverse directions.
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
// new state goes along forward edges, old state along reverse edges
let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone()));
let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone()));
let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
// // determine who has incoming `None` messages, and suppress all of them.
// let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
// merge messages; suppress computation if not all inputs available yet.
messages
// .concat(&old_messages) // /-- possibly too clever: None if any inputs None.
// .antijoin(&incomplete)
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(&incomplete.map(|x| (x, None)))
})
}