differential_dogs3/operators/
propose.rs1use timely::dataflow::Scope;
2
3use differential_dataflow::{ExchangeData, VecCollection, Hashable};
4use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
5use differential_dataflow::operators::arrange::Arranged;
6use differential_dataflow::trace::TraceReader;
7
8pub fn propose<G, Tr, K, F, P, V>(
17 prefixes: &VecCollection<G, P, Tr::Diff>,
18 arrangement: Arranged<G, Tr>,
19 key_selector: F,
20) -> VecCollection<G, (P, V), Tr::Diff>
21where
22 G: Scope<Timestamp=Tr::Time>,
23 Tr: for<'a> TraceReader<
24 KeyOwn = K,
25 ValOwn = V,
26 Time: std::hash::Hash,
27 Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData+Semigroup<Tr::DiffGat<'a>>,
28 >+Clone+'static,
29 K: Hashable + Default + Ord + 'static,
30 F: Fn(&P)->K+Clone+'static,
31 P: ExchangeData,
32 V: Clone + 'static,
33{
34 crate::operators::lookup_map(
35 prefixes,
36 arrangement,
37 move |p: &P, k: &mut K | { *k = key_selector(p); },
38 move |prefix, diff, value, sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone().multiply(sum)),
39 Default::default(),
40 Default::default(),
41 Default::default(),
42 )
43}
44
45pub fn propose_distinct<G, Tr, K, F, P, V>(
51 prefixes: &VecCollection<G, P, Tr::Diff>,
52 arrangement: Arranged<G, Tr>,
53 key_selector: F,
54) -> VecCollection<G, (P, V), Tr::Diff>
55where
56 G: Scope<Timestamp=Tr::Time>,
57 Tr: for<'a> TraceReader<
58 KeyOwn = K,
59 ValOwn = V,
60 Time: std::hash::Hash,
61 Diff : Semigroup<Tr::DiffGat<'a>>+Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
62 >+Clone+'static,
63 K: Hashable + Default + Ord + 'static,
64 F: Fn(&P)->K+Clone+'static,
65 P: ExchangeData,
66 V: Clone + 'static,
67{
68 crate::operators::lookup_map(
69 prefixes,
70 arrangement,
71 move |p: &P, k: &mut K| { *k = key_selector(p); },
72 move |prefix, diff, value, _sum| ((prefix.clone(), Tr::owned_val(value)), diff.clone()),
73 Default::default(),
74 Default::default(),
75 Default::default(),
76 )
77}