differential_dataflow/algorithms/graphs/
bijkstra.rs1use std::hash::Hash;
4
5use timely::order::Product;
6use timely::dataflow::*;
7
8use crate::{Collection, ExchangeData};
9use crate::operators::*;
10use crate::lattice::Lattice;
11use crate::operators::iterate::Variable;
12
13pub fn bidijkstra<G, N>(edges: &Collection<G, (N,N)>, goals: &Collection<G, (N,N)>) -> Collection<G, ((N,N), u32)>
24where
25 G: Scope,
26 G::Timestamp: Lattice+Ord,
27 N: ExchangeData+Hash,
28{
29 use crate::operators::arrange::arrangement::ArrangeByKey;
30 let forward = edges.arrange_by_key();
31 let reverse = edges.map(|(x,y)| (y,x)).arrange_by_key();
32 bidijkstra_arranged(&forward, &reverse, goals)
33}
34
35use crate::trace::TraceReader;
36use crate::operators::arrange::Arranged;
37
38pub fn bidijkstra_arranged<G, N, Tr>(
40 forward: &Arranged<G, Tr>,
41 reverse: &Arranged<G, Tr>,
42 goals: &Collection<G, (N,N)>
43) -> Collection<G, ((N,N), u32)>
44where
45 G: Scope<Timestamp=Tr::Time>,
46 N: ExchangeData+Hash,
47 Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
48{
49 forward
50 .stream
51 .scope().iterative::<u64,_,_>(|inner| {
52
53 let forward_edges = forward.enter(inner);
54 let reverse_edges = reverse.enter(inner);
55
56 let forward = Variable::new_from(goals.map(|(x,_)| (x.clone(),(x.clone(),0))).enter(inner), Product::new(Default::default(), 1));
62 let reverse = Variable::new_from(goals.map(|(_,y)| (y.clone(),(y.clone(),0))).enter(inner), Product::new(Default::default(), 1));
63
64 forward.map(|_| ()).consolidate().inspect(|x| println!("forward: {:?}", x));
65 reverse.map(|_| ()).consolidate().inspect(|x| println!("reverse: {:?}", x));
66
67 let goals = goals.enter(inner);
68 let reached =
76 forward
77 .join_map(&reverse, |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
78 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
79 .semijoin(&goals);
80
81 let active =
82 reached
83 .negate()
84 .map(|(srcdst,_)| srcdst)
85 .concat(&goals)
86 .consolidate();
87
88 let forward_active = active.map(|(x,_y)| x).distinct();
90 let forward_next =
91 forward
92 .map(|(med, (src, dist))| (src, (med, dist)))
93 .semijoin(&forward_active)
94 .map(|(src, (med, dist))| (med, (src, dist)))
95 .join_core(&forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
96 .concat(&forward)
97 .map(|(next, (src, dist))| ((next, src), dist))
98 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
99 .map(|((next, src), dist)| (next, (src, dist)));
100
101 forward_next.map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
102
103 forward.set(&forward_next);
104
105 let reverse_active = active.map(|(_x,y)| y).distinct();
107 let reverse_next =
108 reverse
109 .map(|(med, (rev, dist))| (rev, (med, dist)))
110 .semijoin(&reverse_active)
111 .map(|(rev, (med, dist))| (med, (rev, dist)))
112 .join_core(&reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
113 .concat(&reverse)
114 .map(|(next, (rev, dist))| ((next, rev), dist))
115 .reduce(|_key, s, t| t.push((*s[0].0, 1)))
116 .map(|((next,rev), dist)| (next, (rev, dist)));
117
118 reverse_next.map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
119
120 reverse.set(&reverse_next);
121
122 reached.leave()
123 })
124}