differential_dogs3/
lib.rs

1use std::hash::Hash;
2
3use timely::dataflow::Scope;
4use timely::progress::Timestamp;
5use timely::dataflow::operators::Partition;
6use timely::dataflow::operators::Concatenate;
7
8use differential_dataflow::{ExchangeData, Collection, AsCollection};
9use differential_dataflow::operators::Threshold;
10use differential_dataflow::difference::{Monoid, Multiply};
11use differential_dataflow::lattice::Lattice;
12use differential_dataflow::operators::arrange::TraceAgent;
13use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey};
14
15pub mod altneu;
16pub mod calculus;
17pub mod operators;
18
19/// A type capable of extending a stream of prefixes.
20///
21/**
22    Implementors of `PrefixExtension` provide types and methods for extending a differential dataflow collection,
23    via the three methods `count`, `propose`, and `validate`.
24**/
25pub trait PrefixExtender<G: Scope, R: Monoid+Multiply<Output = R>> {
26    /// The required type of prefix to extend.
27    type Prefix;
28    /// The type to be produced as extension.
29    type Extension;
30    /// Annotates prefixes with the number of extensions the relation would propose.
31    fn count(&mut self, prefixes: &Collection<G, (Self::Prefix, usize, usize), R>, index: usize) -> Collection<G, (Self::Prefix, usize, usize), R>;
32    /// Extends each prefix with corresponding extensions.
33    fn propose(&mut self, prefixes: &Collection<G, Self::Prefix, R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;
34    /// Restricts proposed extensions by those the extender would have proposed.
35    fn validate(&mut self, extensions: &Collection<G, (Self::Prefix, Self::Extension), R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;
36}
37
38pub trait ProposeExtensionMethod<G: Scope, P: ExchangeData+Ord, R: Monoid+Multiply<Output = R>> {
39    fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>;
40    fn extend<E: ExchangeData+Ord>(&self, extenders: &mut [&mut dyn PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>;
41}
42
43impl<G, P, R> ProposeExtensionMethod<G, P, R> for Collection<G, P, R>
44where
45    G: Scope,
46    P: ExchangeData+Ord,
47    R: Monoid+Multiply<Output = R>+'static,
48{
49    fn propose_using<PE>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>
50    where
51        PE: PrefixExtender<G, R, Prefix=P>
52    {
53        extender.propose(self)
54    }
55    fn extend<E>(&self, extenders: &mut [&mut dyn PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>
56    where
57        E: ExchangeData+Ord
58    {
59
60        if extenders.len() == 1 {
61            extenders[0].propose(&self.clone())
62        }
63        else {
64            let mut counts = self.map(|p| (p, 1 << 31, 0));
65            for (index,extender) in extenders.iter_mut().enumerate() {
66                counts = extender.count(&counts, index);
67            }
68
69            let parts = counts.inner.partition(extenders.len() as u64, |((p, _, i),t,d)| (i as u64, (p,t,d)));
70
71            let mut results = Vec::new();
72            for (index, nominations) in parts.into_iter().enumerate() {
73                let mut extensions = extenders[index].propose(&nominations.as_collection());
74                for other in (0..extenders.len()).filter(|&x| x != index) {
75                    extensions = extenders[other].validate(&extensions);
76                }
77
78                results.push(extensions.inner);    // save extensions
79            }
80
81            self.scope().concatenate(results).as_collection()
82        }
83    }
84}
85
86pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P, E> {
87    fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R>;
88}
89
90impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
91    fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R> {
92        extender.validate(self)
93    }
94}
95
96// These are all defined here so that users can be assured a common layout.
97use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
98type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K,V,T,R>>;
99type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K,T,R>>;
100
101pub struct CollectionIndex<K, V, T, R>
102where
103    K: ExchangeData,
104    V: ExchangeData,
105    T: Lattice+ExchangeData+Timestamp,
106    R: Monoid+Multiply<Output = R>+ExchangeData,
107{
108    /// A trace of type (K, ()), used to count extensions for each prefix.
109    count_trace: TraceKeyHandle<K, T, isize>,
110
111    /// A trace of type (K, V), used to propose extensions for each prefix.
112    propose_trace: TraceValHandle<K, V, T, R>,
113
114    /// A trace of type ((K, V), ()), used to validate proposed extensions.
115    validate_trace: TraceKeyHandle<(K, V), T, R>,
116}
117
118impl<K, V, T, R> Clone for CollectionIndex<K, V, T, R>
119where
120    K: ExchangeData+Hash,
121    V: ExchangeData+Hash,
122    T: Lattice+ExchangeData+Timestamp,
123    R: Monoid+Multiply<Output = R>+ExchangeData,
124{
125    fn clone(&self) -> Self {
126        CollectionIndex {
127            count_trace: self.count_trace.clone(),
128            propose_trace: self.propose_trace.clone(),
129            validate_trace: self.validate_trace.clone(),
130        }
131    }
132}
133
134impl<K, V, T, R> CollectionIndex<K, V, T, R>
135where
136    K: ExchangeData+Hash,
137    V: ExchangeData+Hash,
138    T: Lattice+ExchangeData+Timestamp,
139    R: Monoid+Multiply<Output = R>+ExchangeData,
140{
141
142    pub fn index<G: Scope<Timestamp = T>>(collection: &Collection<G, (K, V), R>) -> Self {
143        // We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation.
144        // counts and validate can share the base arrangement
145        let arranged = collection.arrange_by_self();
146        let counts = arranged
147            .distinct()
148            .map(|(k, _v)| k)
149            .arrange_by_self()
150            .trace;
151        let propose = collection.arrange_by_key().trace;
152        let validate = arranged.trace;
153
154        CollectionIndex {
155            count_trace: counts,
156            propose_trace: propose,
157            validate_trace: validate,
158        }
159    }
160    pub fn extend_using<P, F: Fn(&P)->K+Clone>(&self, logic: F) -> CollectionExtender<K, V, T, R, P, F> {
161        CollectionExtender {
162            phantom: std::marker::PhantomData,
163            indices: self.clone(),
164            key_selector: logic,
165        }
166    }
167}
168
169pub struct CollectionExtender<K, V, T, R, P, F>
170where
171    K: ExchangeData,
172    V: ExchangeData,
173    T: Lattice+ExchangeData+Timestamp,
174    R: Monoid+Multiply<Output = R>+ExchangeData,
175    F: Fn(&P)->K+Clone,
176{
177    phantom: std::marker::PhantomData<P>,
178    indices: CollectionIndex<K, V, T, R>,
179    key_selector: F,
180}
181
182impl<G, K, V, R, P, F> PrefixExtender<G, R> for CollectionExtender<K, V, G::Timestamp, R, P, F>
183where
184    G: Scope,
185    K: ExchangeData+Hash+Default,
186    V: ExchangeData+Hash+Default,
187    P: ExchangeData,
188    G::Timestamp: Lattice+ExchangeData,
189    R: Monoid+Multiply<Output = R>+ExchangeData,
190    F: Fn(&P)->K+Clone+'static,
191{
192    type Prefix = P;
193    type Extension = V;
194
195    fn count(&mut self, prefixes: &Collection<G, (P, usize, usize), R>, index: usize) -> Collection<G, (P, usize, usize), R> {
196        let counts = self.indices.count_trace.import(&prefixes.scope());
197        operators::count::count(prefixes, counts, self.key_selector.clone(), index)
198    }
199
200    fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
201        let propose = self.indices.propose_trace.import(&prefixes.scope());
202        operators::propose::propose(prefixes, propose, self.key_selector.clone())
203    }
204
205    fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
206        let validate = self.indices.validate_trace.import(&extensions.scope());
207        operators::validate::validate(extensions, validate, self.key_selector.clone())
208    }
209}