differential_dogs3/operators/
validate.rs

1use std::hash::Hash;
2
3use timely::dataflow::Scope;
4
5use differential_dataflow::{ExchangeData, VecCollection};
6use differential_dataflow::difference::{Semigroup, Monoid, Multiply};
7use differential_dataflow::operators::arrange::Arranged;
8use differential_dataflow::trace::TraceReader;
9
10/// Proposes extensions to a stream of prefixes.
11///
12/// This method takes a stream of prefixes and for each determines a
13/// key with `key_selector` and then proposes all pair af the prefix
14/// and values associated with the key in `arrangement`.
15pub fn validate<G, K, V, Tr, F, P>(
16    extensions: &VecCollection<G, (P, V), Tr::Diff>,
17    arrangement: Arranged<G, Tr>,
18    key_selector: F,
19) -> VecCollection<G, (P, V), Tr::Diff>
20where
21    G: Scope<Timestamp=Tr::Time>,
22    Tr: for<'a> TraceReader<
23        KeyOwn = (K, V),
24        Time: std::hash::Hash,
25        Diff : Semigroup<Tr::DiffGat<'a>>+Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
26    >+Clone+'static,
27    K: Ord+Hash+Clone+Default + 'static,
28    V: ExchangeData+Hash+Default,
29    F: Fn(&P)->K+Clone+'static,
30    P: ExchangeData,
31{
32    crate::operators::lookup_map(
33        extensions,
34        arrangement,
35        move |(pre,val),key| { *key = (key_selector(pre), val.clone()); },
36        |(pre,val),r,_,_| ((pre.clone(), val.clone()), r.clone()),
37        Default::default(),
38        Default::default(),
39        Default::default(),
40    )
41}