differential_dogs3/operators/
validate.rs
1use std::hash::Hash;
2
3use timely::dataflow::Scope;
4
5use differential_dataflow::{ExchangeData, Collection};
6use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
7use differential_dataflow::operators::arrange::Arranged;
8use differential_dataflow::trace::TraceReader;
9use differential_dataflow::trace::cursor::IntoOwned;
10
11pub fn validate<G, K, V, Tr, F, P>(
17 extensions: &Collection<G, (P, V), Tr::Diff>,
18 arrangement: Arranged<G, Tr>,
19 key_selector: F,
20) -> Collection<G, (P, V), Tr::Diff>
21where
22 G: Scope<Timestamp=Tr::Time>,
23 Tr: TraceReader+Clone+'static,
24 for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>,
25 for<'a> Tr::Diff : Semigroup<Tr::DiffGat<'a>>,
26 K: Ord+Hash+Clone+Default + 'static,
27 V: ExchangeData+Hash+Default,
28 Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
29 F: Fn(&P)->K+Clone+'static,
30 P: ExchangeData,
31{
32 crate::operators::lookup_map(
33 extensions,
34 arrangement,
35 move |(pre,val),key| { *key = (key_selector(pre), val.clone()); },
36 |(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()),
37 Default::default(),
38 Default::default(),
39 Default::default(),
40 )
41}