use std::collections::HashMap;
use timely::PartialOrder;
use timely::dataflow::Scope;
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::dataflow::operators::Operator;
use timely::progress::Antichain;
use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{IsZero, Semigroup, Monoid};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};
use differential_dataflow::trace::cursor::IntoOwned;
pub fn lookup_map<G, D, K, R, Tr, F, DOut, ROut, S>(
prefixes: &Collection<G, D, R>,
mut arrangement: Arranged<G, Tr>,
key_selector: F,
mut output_func: S,
supplied_key0: K,
supplied_key1: K,
supplied_key2: K,
) -> Collection<G, DOut, ROut>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>,
for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
K: Hashable + Ord + 'static,
Tr::Diff: Monoid+ExchangeData,
F: FnMut(&D, &mut K)+Clone+'static,
D: ExchangeData,
R: ExchangeData+Monoid,
DOut: Clone+'static,
ROut: Monoid + 'static,
S: FnMut(&D, &R, Tr::Val<'_>, &Tr::Diff)->(DOut, ROut)+'static,
{
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
let mut propose_trace = Some(arrangement.trace);
let propose_stream = arrangement.stream;
let mut stash = HashMap::new();
let mut logic1 = key_selector.clone();
let mut logic2 = key_selector.clone();
let mut buffer = Vec::new();
let mut key: K = supplied_key0;
let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| {
logic1(&update.0, &mut key);
key.hashed().into()
});
let mut key1: K = supplied_key1;
let mut key2: K = supplied_key2;
prefixes.inner.binary_frontier(&propose_stream, exchange, Pipeline, "LookupMap", move |_,_| move |input1, input2, output| {
input1.for_each(|capability, data| {
data.swap(&mut buffer);
stash.entry(capability.retain())
.or_insert(Vec::new())
.extend(buffer.drain(..))
});
input2.for_each(|_, _| { });
if let Some(ref mut trace) = propose_trace {
for (capability, prefixes) in stash.iter_mut() {
if !input2.frontier.less_equal(capability.time()) {
let mut session = output.session(capability);
prefixes.sort_by(|x,y| {
logic2(&x.0, &mut key1);
logic2(&y.0, &mut key2);
key1.cmp(&key2)
});
let (mut cursor, storage) = trace.cursor();
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
if !input2.frontier.less_equal(time) {
logic2(prefix, &mut key1);
use differential_dataflow::trace::cursor::IntoOwned;
cursor.seek_key(&storage, IntoOwned::borrow_as(&key1));
if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = Tr::Diff::zero();
cursor.map_times(&storage, |t, d| {
if t.into_owned().less_equal(time) { count.plus_equals(&d); }
});
if !count.is_zero() {
let (dout, rout) = output_func(prefix, diff, value, &count);
if !rout.is_zero() {
session.give((dout, time.clone(), rout));
}
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff = R::zero();
}
}
prefixes.retain(|ptd| !ptd.2.is_zero());
}
}
}
stash.retain(|_,prefixes| !prefixes.is_empty());
let mut frontier = timely::progress::frontier::Antichain::new();
for time in input1.frontier().frontier().to_vec() {
frontier.insert(time);
}
for key in stash.keys() {
frontier.insert(key.time().clone());
}
propose_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));
if input1.frontier().is_empty() && stash.is_empty() {
propose_trace = None;
}
}).as_collection()
}