differential_dataflow/operators/negate.rs
1//! Negate the diffs of collections and streams.
2
3use timely::Data;
4use timely::dataflow::{Scope, Stream, StreamCore};
5use timely::dataflow::operators::Map;
6
7use crate::{AsCollection, Collection};
8use crate::difference::Abelian;
9
10/// Negate the contents of a stream.
11pub trait Negate<G, C> {
12 /// Creates a new collection whose counts are the negation of those in the input.
13 ///
14 /// This method is most commonly used with `concat` to get those element in one collection but not another.
15 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
16 /// including negative counts.
17 ///
18 /// # Examples
19 ///
20 /// ```
21 /// use differential_dataflow::input::Input;
22 ///
23 /// ::timely::example(|scope| {
24 ///
25 /// let data = scope.new_collection_from(1 .. 10).1;
26 ///
27 /// let odds = data.filter(|x| x % 2 == 1);
28 /// let evens = data.filter(|x| x % 2 == 0);
29 ///
30 /// odds.negate()
31 /// .concat(&data)
32 /// .assert_eq(&evens);
33 /// });
34 /// ```
35 fn negate(&self) -> Self;
36}
37
38impl<G, D, R, C> Negate<G, C> for Collection<G, D, R, C>
39where
40 G: Scope,
41 C: Clone,
42 StreamCore<G, C>: Negate<G, C>,
43{
44 fn negate(&self) -> Self {
45 self.inner.negate().as_collection()
46 }
47}
48
49impl<G: Scope, D: Data, T: Data, R: Data + Abelian> Negate<G, Vec<(D, T, R)>> for Stream<G, (D, T, R)> {
50 fn negate(&self) -> Self {
51 self.map_in_place(|x| x.2.negate())
52 }
53}
54