1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//! Strongly connected component structure.

use std::mem;
use std::hash::Hash;

use timely::dataflow::*;

use crate::{Collection, ExchangeData};
use crate::operators::*;
use crate::lattice::Lattice;
use crate::difference::{Abelian, Multiply};

use super::propagate::propagate;

/// Iteratively removes nodes with no in-edges.
pub fn trim<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData + Abelian,
    R: Multiply<R, Output=R>,
    R: From<i8>,
{
    graph.iterate(|edges| {
        // keep edges from active edge destinations.
        let active =
        edges.map(|(_src,dst)| dst)
             .threshold(|_,c| if c.is_zero() { R::from(0_i8) } else { R::from(1_i8) });

        graph.enter(&edges.scope())
             .semijoin(&active)
    })
}

/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: &Collection<G, (N,N), R>) -> Collection<G, (N,N), R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData + Abelian,
    R: Multiply<R, Output=R>,
    R: From<i8>
{
    graph.iterate(|inner| {
        let edges = graph.enter(&inner.scope());
        let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1));
        trim_edges(&trim_edges(inner, &edges), &trans)
    })
}

fn trim_edges<G, N, R>(cycle: &Collection<G, (N,N), R>, edges: &Collection<G, (N,N), R>)
    -> Collection<G, (N,N), R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData + Abelian,
    R: Multiply<R, Output=R>,
    R: From<i8>
{
    let nodes = edges.map_in_place(|x| x.0 = x.1.clone())
                     .consolidate();

    // NOTE: With a node -> int function, can be improved by:
    // let labels = propagate_at(&cycle, &nodes, |x| *x as u64);
    let labels = propagate(cycle, &nodes);

    edges.join_map(&labels, |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
         .join_map(&labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
         .filter(|(_,(l1,l2))| l1 == l2)
         .map(|((x1,x2),_)| (x2,x1))
}