1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use std::collections::HashMap;

use timely::PartialOrder;
use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
use timely::progress::Antichain;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::trace::cursor::IntoOwned;

/// Proposes extensions to a stream of prefixes.
///
/// This method takes a stream of prefixes and for each determines a
/// key with `key_selector` and then proposes all pair af the prefix
/// and values associated with the key in `arrangement`.
pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
    prefixes: &Collection<G, D, R>,
    mut arrangement: Arranged<G, Tr>,
    key_selector: F,
    mut output_func: S,
    supplied_key0: K,
    supplied_key1: K,
    supplied_key2: K,
) -> Collection<G, DOut, ROut>
where
    G: Scope<Timestamp=Tr::Time>,
    Tr: TraceReader+Clone+'static,
    for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
    for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
    K: Hashable + Ord + 'static,
    Tr::Diff: Monoid+ExchangeData,
    F: FnMut(&D, &mut K)+Clone+'static,
    D: ExchangeData,
    R: ExchangeData+Monoid,
    DOut: Clone+'static,
    ROut: Monoid + 'static,
    S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
    // No need to block physical merging for this operator.
    arrangement.trace.set_physical_compaction(Antichain::new().borrow());
    let mut propose_trace = Some(arrangement.trace);
    let propose_stream = arrangement.stream;

    let mut stash = HashMap::new();
    let mut logic1 = key_selector.clone();
    let mut logic2 = key_selector.clone();

    let mut buffer = Vec::new();

    let mut key: K = supplied_key0;
    let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
        logic1(&update.0, &mut key);
        key.hashed().into()
    });

    let mut key1: K = supplied_key1;
    let mut key2: K = supplied_key2;

    prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {

        // drain the first input, stashing requests.
        input1.for_each(|capability, data| {
            data.swap(&mut buffer);
            stash.entry(capability.retain())
                 .or_insert(Vec::new())
                 .extend(buffer.drain(..))
        });

        // Drain input batches; although we do not observe them, we want access to the input
        // to observe the frontier and to drive scheduling.
        input2.for_each(|_, _| { });

        if let Some(ref mut trace) = propose_trace {

            for (capability, prefixes) in stash.iter_mut() {

                // defer requests at incomplete times.
                // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
                if !input2.frontier.less_equal(capability.time()) {

                    let mut session = output.session(capability);

                    // sort requests for in-order cursor traversal. could consolidate?
                    prefixes.sort_by(|x,y| {
                        logic2(&x.0, &mut key1);
                        logic2(&y.0, &mut key2);
                        key1.cmp(&key2)
                    });

                    let (mut cursor, storage) = trace.cursor();

                    for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
                        if !input2.frontier.less_equal(time) {
                            logic2(prefix, &mut key1);
                            use differential_dataflow::trace::cursor::IntoOwned;
                            cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
                            if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
                                while let Some(value) = cursor.get_val(&storage) {
                                    let mut count = Tr::Diff::zero();
                                    cursor.map_times(&storage, |t, d| {
                                        if t.into_owned().less_equal(time) { count.plus_equals(&d); }
                                    });
                                    if !count.is_zero() {
                                        let (dout, rout) = output_func(prefix, diff, value, &count);
                                        if !rout.is_zero() {
                                            session.give((dout, time.clone(), rout));
                                        }
                                    }
                                    cursor.step_val(&storage);
                                }
                                cursor.rewind_vals(&storage);
                            }
                            *diff = R::zero();
                        }
                    }

                    prefixes.retain(|ptd| !ptd.2.is_zero());
                }
            }

        }

        // drop fully processed capabilities.
        stash.retain(|_,prefixes| !prefixes.is_empty());

        // The logical merging frontier depends on both input1 and stash.
        let mut frontier = timely::progress::frontier::Antichain::new();
        for time in input1.frontier().frontier().to_vec() {
            frontier.insert(time);
        }
        for key in stash.keys() {
            frontier.insert(key.time().clone());
        }
        propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));

        if input1.frontier().is_empty() && stash.is_empty() {
            propose_trace = None;
        }

    }).as_collection()
}