differential_dogs3/operators/
lookup_map.rs1use std::collections::HashMap;
2
3use timely::PartialOrder;
4use timely::dataflow::Scope;
5use timely::dataflow::channels::pact::{Pipeline, Exchange};
6use timely::dataflow::operators::Operator;
7use timely::progress::Antichain;
8
9use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable};
10use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
11use differential_dataflow::operators::arrange::Arranged;
12use differential_dataflow::trace::{Cursor, TraceReader};
13use differential_dataflow::trace::implementations::BatchContainer;
14
15pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
21 prefixes: &VecCollection<G, D, R>,
22 mut arrangement: Arranged<G, Tr>,
23 key_selector: F,
24 mut output_func: S,
25 supplied_key0: K,
26 supplied_key1: K,
27 supplied_key2: K,
28) -> VecCollection<G, DOut, ROut>
29where
30 G: Scope<Timestamp=Tr::Time>,
31 Tr: for<'a> TraceReader<
32 KeyOwn = K,
33 Time: std::hash::Hash,
34 Diff : Semigroup<Tr::DiffGat<'a>>+Monoid+ExchangeData,
35 >+Clone+'static,
36 K: Hashable + Ord + 'static,
37 F: FnMut(&D, &mut K)+Clone+'static,
38 D: ExchangeData,
39 R: ExchangeData+Monoid,
40 DOut: Clone+'static,
41 ROut: Monoid + 'static,
42 S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
43{
44 arrangement.trace.set_physical_compaction(Antichain::new().borrow());
46 let mut propose_trace = Some(arrangement.trace);
47 let propose_stream = arrangement.stream;
48
49 let mut stash = HashMap::new();
50 let mut logic1 = key_selector.clone();
51 let mut logic2 = key_selector.clone();
52
53 let mut key: K = supplied_key0;
54 let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
55 logic1(&update.0, &mut key);
56 key.hashed().into()
57 });
58
59 let mut key1: K = supplied_key1;
60 let mut key2: K = supplied_key2;
61
62 prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |(input1, frontier1), (input2, frontier2), output| {
63
64 input1.for_each(|capability, data| {
66 stash.entry(capability.retain())
67 .or_insert(Vec::new())
68 .extend(data.drain(..))
69 });
70
71 input2.for_each(|_, _| { });
74
75 if let Some(ref mut trace) = propose_trace {
76
77 for (capability, prefixes) in stash.iter_mut() {
78
79 if !frontier2.less_equal(capability.time()) {
82
83 let mut session = output.session(capability);
84
85 prefixes.sort_by(|x,y| {
87 logic2(&x.0, &mut key1);
88 logic2(&y.0, &mut key2);
89 key1.cmp(&key2)
90 });
91
92 let (mut cursor, storage) = trace.cursor();
93 let mut key_con = Tr::KeyContainer::with_capacity(1);
95 for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
96 if !frontier2.less_equal(time) {
97 logic2(prefix, &mut key1);
98 key_con.clear(); key_con.push_own(&key1);
99 cursor.seek_key(&storage, key_con.index(1));
100 if cursor.get_key(&storage) == Some(key_con.index(1)) {
101 while let Some(value) = cursor.get_val(&storage) {
102 let mut count = Tr::Diff::zero();
103 cursor.map_times(&storage, |t, d| {
104 if Tr::owned_time(t).less_equal(time) { count.plus_equals(&d); }
105 });
106 if !count.is_zero() {
107 let (dout, rout) = output_func(prefix, diff, value, &count);
108 if !rout.is_zero() {
109 session.give((dout, time.clone(), rout));
110 }
111 }
112 cursor.step_val(&storage);
113 }
114 cursor.rewind_vals(&storage);
115 }
116 *diff = R::zero();
117 }
118 }
119
120 prefixes.retain(|ptd| !ptd.2.is_zero());
121 }
122 }
123
124 }
125
126 stash.retain(|_,prefixes| !prefixes.is_empty());
128
129 let mut frontier = timely::progress::frontier::Antichain::new();
131 for time in frontier1.frontier().to_vec() {
132 frontier.insert(time);
133 }
134 for key in stash.keys() {
135 frontier.insert(key.time().clone());
136 }
137 propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
138
139 if frontier1.is_empty() && stash.is_empty() {
140 propose_trace = None;
141 }
142
143 }).as_collection()
144}