1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
//! A type that can be treated as a difference.
//!
//! Differential dataflow most commonly tracks the counts associated with records in a multiset, but it
//! generalizes to tracking any map from the records to an Abelian group. The most common generalization
//! is when we maintain both a count and another accumulation, for example height. The differential
//! dataflow collections would then track for each record the total of counts and heights, which allows
//! us to track something like the average.

use crate::Data;

#[deprecated]
pub use self::Abelian as Diff;

/// A type with addition and a test for zero.
///
/// These traits are currently the minimal requirements for a type to be a "difference" in differential
/// dataflow. Addition allows differential dataflow to compact multiple updates to the same data, and
/// the test for zero allows differential dataflow to retire updates that have no effect. There is no
/// requirement that the test for zero ever return true, and the zero value does not need to inhabit the
/// type.
///
/// There is a light presumption of commutativity here, in that while we will largely perform addition
/// in order of timestamps, for many types of timestamps there is no total order and consequently no
/// obvious order to respect. Non-commutative semigroups should be used with care.
pub trait Semigroup : ::std::marker::Sized + Data + Clone {
    /// The method of `std::ops::AddAssign`, for types that do not implement `AddAssign`.
    fn plus_equals(&mut self, rhs: &Self);
    /// Returns true if the element is the additive identity.
    ///
    /// This is primarily used by differential dataflow to know when it is safe to delete an update.
    /// When a difference accumulates to zero, the difference has no effect on any accumulation and can
    /// be removed.
    ///
    /// A semigroup is not obligated to have a zero element, and this method could always return
    /// false in such a setting.
    fn is_zero(&self) -> bool;
}

/// A semigroup with an explicit zero element.
pub trait Monoid : Semigroup {
    /// A zero element under the semigroup addition operator.
    fn zero() -> Self;
}

/// A `Monoid` with negation.
///
/// This trait extends the requirements of `Semigroup` to include a negation operator.
/// Several differential dataflow operators require negation in order to retract prior outputs, but
/// not quite as many as you might imagine.
pub trait Abelian : Monoid {
    /// The method of `std::ops::Neg`, for types that do not implement `Neg`.
    fn negate(self) -> Self;
}

/// A replacement for `std::ops::Mul` for types that do not implement it.
pub trait Multiply<Rhs = Self> {
    /// Output type per the `Mul` trait.
    type Output;
    /// Core method per the `Mul` trait.
    fn multiply(self, rhs: &Rhs) -> Self::Output;
}

/// Implementation for built-in signed integers.
macro_rules! builtin_implementation {
    ($t:ty) => {
        impl Semigroup for $t {
            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
            #[inline] fn is_zero(&self) -> bool { self == &0 }
        }

        impl Monoid for $t {
            #[inline] fn zero() -> Self { 0 }
        }

        impl Multiply<Self> for $t {
            type Output = Self;
            fn multiply(self, rhs: &Self) -> Self { self * rhs}
        }
    };
}

macro_rules! builtin_abelian_implementation {
    ($t:ty) => {
        impl Abelian for $t {
            #[inline] fn negate(self) -> Self { -self }
        }
    };
}

builtin_implementation!(i8);
builtin_implementation!(i16);
builtin_implementation!(i32);
builtin_implementation!(i64);
builtin_implementation!(i128);
builtin_implementation!(isize);
builtin_implementation!(u8);
builtin_implementation!(u16);
builtin_implementation!(u32);
builtin_implementation!(u64);
builtin_implementation!(u128);
builtin_implementation!(usize);

builtin_abelian_implementation!(i8);
builtin_abelian_implementation!(i16);
builtin_abelian_implementation!(i32);
builtin_abelian_implementation!(i64);
builtin_abelian_implementation!(i128);
builtin_abelian_implementation!(isize);

/// Implementations for wrapping signed integers, which have a different zero.
macro_rules! wrapping_implementation {
    ($t:ty) => {
        impl Semigroup for $t {
            #[inline] fn plus_equals(&mut self, rhs: &Self) { *self += rhs; }
            #[inline] fn is_zero(&self) -> bool { self == &std::num::Wrapping(0) }
        }

        impl Monoid for $t {
            #[inline] fn zero() -> Self { std::num::Wrapping(0) }
        }

        impl Abelian for $t {
            #[inline] fn negate(self) -> Self { -self }
        }

        impl Multiply<Self> for $t {
            type Output = Self;
            fn multiply(self, rhs: &Self) -> Self { self * rhs}
        }
    };
}

wrapping_implementation!(std::num::Wrapping<i8>);
wrapping_implementation!(std::num::Wrapping<i16>);
wrapping_implementation!(std::num::Wrapping<i32>);
wrapping_implementation!(std::num::Wrapping<i64>);
wrapping_implementation!(std::num::Wrapping<i128>);
wrapping_implementation!(std::num::Wrapping<isize>);


pub use self::present::Present;
mod present {
    use abomonation_derive::Abomonation;
    use serde::{Deserialize, Serialize};

    /// A zero-sized difference that indicates the presence of a record.
    ///
    /// This difference type has no negation, and present records cannot be retracted.
    /// Addition and multiplication maintain presence, and zero does not inhabit the type.
    ///
    /// The primary feature of this type is that it has zero size, which reduces the overhead
    /// of differential dataflow's representations for settings where collections either do
    /// not change, or for which records are only added (for example, derived facts in Datalog).
    #[derive(Abomonation, Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
    pub struct Present;

    impl<T: Clone> super::Multiply<T> for Present {
        type Output = T;
        fn multiply(self, rhs: &T) -> T {
            rhs.clone()
        }
    }

    impl super::Semigroup for Present {
        fn plus_equals(&mut self, _rhs: &Self) { }
        fn is_zero(&self) -> bool { false }
    }
}

// Pair implementations.
mod tuples {

    use super::{Semigroup, Monoid, Abelian, Multiply};

    /// Implementations for tuples. The two arguments must have the same length.
    macro_rules! tuple_implementation {
        ( ($($name:ident)*), ($($name2:ident)*) ) => (
            impl<$($name: Semigroup),*> Semigroup for ($($name,)*) {
                #[allow(non_snake_case)]
                #[inline] fn plus_equals(&mut self, rhs: &Self) {
                    let ($(ref mut $name,)*) = *self;
                    let ($(ref $name2,)*) = *rhs;
                    $($name.plus_equals($name2);)*
                }
                #[allow(unused_mut)]
                #[allow(non_snake_case)]
                #[inline] fn is_zero(&self) -> bool {
                    let mut zero = true;
                    let ($(ref $name,)*) = *self;
                    $( zero &= $name.is_zero(); )*
                    zero
                }
            }

            impl<$($name: Monoid),*> Monoid for ($($name,)*) {
                #[allow(non_snake_case)]
                #[inline] fn zero() -> Self {
                    ( $($name::zero(), )* )
                }
            }

            impl<$($name: Abelian),*> Abelian for ($($name,)*) {
                #[allow(non_snake_case)]
                #[inline] fn negate(self) -> Self {
                    let ($($name,)*) = self;
                    ( $($name.negate(), )* )
                }
            }

            impl<T, $($name: Multiply<T>),*> Multiply<T> for ($($name,)*) {
                type Output = ($(<$name as Multiply<T>>::Output,)*);
                #[allow(unused_variables)]
                #[allow(non_snake_case)]
                #[inline] fn multiply(self, rhs: &T) -> Self::Output {
                    let ($($name,)*) = self;
                    ( $($name.multiply(rhs), )* )
                }
            }
        )
    }

    tuple_implementation!((), ());
    tuple_implementation!((A1), (A2));
    tuple_implementation!((A1 B1), (A2 B2));
    tuple_implementation!((A1 B1 C1), (A2 B2 C2));
    tuple_implementation!((A1 B1 C1 D1), (A2 B2 C2 D2));
}

// Vector implementations
mod vector {

    use super::{Semigroup, Monoid, Abelian, Multiply};

    impl<R: Semigroup> Semigroup for Vec<R> {
        fn plus_equals(&mut self, rhs: &Self) {
            // Ensure sufficient length to receive addition.
            while self.len() < rhs.len() {
                let element = &rhs[self.len()];
                self.push(element.clone());
            }

            // As other is not longer, apply updates without tests.
            for (index, update) in rhs.iter().enumerate() {
                self[index].plus_equals(update);
            }
        }
        fn is_zero(&self) -> bool {
            self.iter().all(|x| x.is_zero())
        }
    }

    impl<R: Monoid> Monoid for Vec<R> {
        fn zero() -> Self {
            Self::new()
        }
    }

    impl<R: Abelian> Abelian for Vec<R> {
        fn negate(mut self) -> Self {
            for update in self.iter_mut() {
                *update = update.clone().negate();
            }
            self
        }
    }

    impl<T, R: Multiply<T>> Multiply<T> for Vec<R> {
        type Output = Vec<<R as Multiply<T>>::Output>;
        fn multiply(self, rhs: &T) -> Self::Output {
            self.into_iter()
                .map(|x| x.multiply(rhs))
                .collect()
        }
    }
}