differential_dogs3/operators/
lookup_map.rs

1use std::collections::HashMap;
2
3use timely::PartialOrder;
4use timely::dataflow::Scope;
5use timely::dataflow::channels::pact::{Pipeline, Exchange};
6use timely::dataflow::operators::Operator;
7use timely::progress::Antichain;
8
9use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
10use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
11use differential_dataflow::operators::arrange::Arranged;
12use differential_dataflow::trace::{Cursor, TraceReader};
13use differential_dataflow::trace::cursor::IntoOwned;
14
15/// Proposes extensions to a stream of prefixes.
16///
17/// This method takes a stream of prefixes and for each determines a
18/// key with `key_selector` and then proposes all pair af the prefix
19/// and values associated with the key in `arrangement`.
20pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
21    prefixes: &Collection<G, D, R>,
22    mut arrangement: Arranged<G, Tr>,
23    key_selector: F,
24    mut output_func: S,
25    supplied_key0: K,
26    supplied_key1: K,
27    supplied_key2: K,
28) -> Collection<G, DOut, ROut>
29where
30    G: Scope<Timestamp=Tr::Time>,
31    Tr: TraceReader+Clone+'static,
32    for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
33    for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
34    K: Hashable + Ord + 'static,
35    Tr::Diff: Monoid+ExchangeData,
36    F: FnMut(&D, &mut K)+Clone+'static,
37    D: ExchangeData,
38    R: ExchangeData+Monoid,
39    DOut: Clone+'static,
40    ROut: Monoid + 'static,
41    S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
42{
43    // No need to block physical merging for this operator.
44    arrangement.trace.set_physical_compaction(Antichain::new().borrow());
45    let mut propose_trace = Some(arrangement.trace);
46    let propose_stream = arrangement.stream;
47
48    let mut stash = HashMap::new();
49    let mut logic1 = key_selector.clone();
50    let mut logic2 = key_selector.clone();
51
52    let mut key: K = supplied_key0;
53    let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
54        logic1(&update.0, &mut key);
55        key.hashed().into()
56    });
57
58    let mut key1: K = supplied_key1;
59    let mut key2: K = supplied_key2;
60
61    prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {
62
63        // drain the first input, stashing requests.
64        input1.for_each(|capability, data| {
65            stash.entry(capability.retain())
66                 .or_insert(Vec::new())
67                 .extend(data.drain(..))
68        });
69
70        // Drain input batches; although we do not observe them, we want access to the input
71        // to observe the frontier and to drive scheduling.
72        input2.for_each(|_, _| { });
73
74        if let Some(ref mut trace) = propose_trace {
75
76            for (capability, prefixes) in stash.iter_mut() {
77
78                // defer requests at incomplete times.
79                // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
80                if !input2.frontier.less_equal(capability.time()) {
81
82                    let mut session = output.session(capability);
83
84                    // sort requests for in-order cursor traversal. could consolidate?
85                    prefixes.sort_by(|x,y| {
86                        logic2(&x.0, &mut key1);
87                        logic2(&y.0, &mut key2);
88                        key1.cmp(&key2)
89                    });
90
91                    let (mut cursor, storage) = trace.cursor();
92
93                    for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
94                        if !input2.frontier.less_equal(time) {
95                            logic2(prefix, &mut key1);
96                            use differential_dataflow::trace::cursor::IntoOwned;
97                            cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
98                            if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
99                                while let Some(value) = cursor.get_val(&storage) {
100                                    let mut count = Tr::Diff::zero();
101                                    cursor.map_times(&storage, |t, d| {
102                                        if t.into_owned().less_equal(time) { count.plus_equals(&d); }
103                                    });
104                                    if !count.is_zero() {
105                                        let (dout, rout) = output_func(prefix, diff, value, &count);
106                                        if !rout.is_zero() {
107                                            session.give((dout, time.clone(), rout));
108                                        }
109                                    }
110                                    cursor.step_val(&storage);
111                                }
112                                cursor.rewind_vals(&storage);
113                            }
114                            *diff = R::zero();
115                        }
116                    }
117
118                    prefixes.retain(|ptd| !ptd.2.is_zero());
119                }
120            }
121
122        }
123
124        // drop fully processed capabilities.
125        stash.retain(|_,prefixes| !prefixes.is_empty());
126
127        // The logical merging frontier depends on both input1 and stash.
128        let mut frontier = timely::progress::frontier::Antichain::new();
129        for time in input1.frontier().frontier().to_vec() {
130            frontier.insert(time);
131        }
132        for key in stash.keys() {
133            frontier.insert(key.time().clone());
134        }
135        propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
136
137        if input1.frontier().is_empty() && stash.is_empty() {
138            propose_trace = None;
139        }
140
141    }).as_collection()
142}