differential_dataflow/algorithms/graphs/
scc.rs

1//! Strongly connected component structure.
2
3use std::mem;
4use std::hash::Hash;
5
6use timely::dataflow::*;
7
8use crate::{Collection, ExchangeData};
9use crate::operators::*;
10use crate::lattice::Lattice;
11use crate::difference::{Abelian, Multiply};
12
13use super::propagate::propagate;
14
15/// Iteratively removes nodes with no in-edges.
16pub fn trim<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
17where
18    G: Scope,
19    G::Timestamp: Lattice+Ord,
20    N: ExchangeData+Hash,
21    R: ExchangeData + Abelian,
22    R: Multiply<R, Output=R>,
23    R: From<i8>,
24{
25    graph.iterate(|edges| {
26        // keep edges from active edge destinations.
27        let active =
28        edges.map(|(_src,dst)| dst)
29             .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });
30
31        graph.enter(&edges.scope())
32             .semijoin(&active)
33    })
34}
35
36/// Returns the subset of edges in the same strongly connected component.
37pub fn strongly_connected<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
38where
39    G: Scope,
40    G::Timestamp: Lattice+Ord,
41    N: ExchangeData+Hash,
42    R: ExchangeData + Abelian,
43    R: Multiply<R, Output=R>,
44    R: From<i8>
45{
46    graph.iterate(|inner| {
47        let edges = graph.enter(&inner.scope());
48        let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
49        trim_edges(&trim_edges(inner, &edges), &trans)
50    })
51}
52
53fn trim_edges<G, N, R>(cycle: &Collection<G, (N,N), R>, edges: &Collection<G, (N,N), R>)
54    -> Collection<G, (N,N), R>
55where
56    G: Scope,
57    G::Timestamp: Lattice+Ord,
58    N: ExchangeData+Hash,
59    R: ExchangeData + Abelian,
60    R: Multiply<R, Output=R>,
61    R: From<i8>
62{
63    let nodes = edges.map_in_place(|x| x.0 = x.1.clone())
64                     .consolidate();
65
66    // NOTE: With a node -> int function, can be improved by:
67    // let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
68    let labels = propagate(cycle, &nodes);
69
70    edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
71         .join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
72         .filter(|(_,(l1,l2))| l1 == l2)
73         .map(|((x1,x2),_)| (x2,x1))
74}