1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//! Traits and implementations for differentiating and integrating collections.
//!
//! The `Differentiate` and `Integrate` traits allow us to move between standard differential
//! dataflow collections, and collections that describe their instantaneous change. The first
//! trait converts a collection to one that contains each change at the moment it occurs, but
//! then immediately retracting it. The second trait takes such a representation are recreates
//! the collection from its instantaneous changes.
//!
//! These two traits together allow us to build dataflows that maintain computates over inputs
//! that are the instantaneous changes, and then to reconstruct collections from them. The most
//! clear use case for this are "delta query" implementations of relational joins, where linearity
//! allows us to write dataflows based on instantaneous changes, whose "accumluated state" is
//! almost everywhere empty (and so has a low memory footprint, if the system works as planned).

use timely::dataflow::Scope;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::{Filter, Map};
use differential_dataflow::{AsCollection, Collection, Data};
use differential_dataflow::difference::Abelian;

use crate::altneu::AltNeu;

/// Produce a collection containing the changes at the moments they happen.
pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
    fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
}

/// Collect instantaneous changes back in to a collection.
pub trait Integrate<G: Scope, D: Data, R: Abelian> {
    fn integrate(&self) -> Collection<G, D, R>;
}

impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
where
    G: Scope,
    D: Data,
    R: Abelian + 'static,
{
    // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
    fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
        self.enter(child)
            .inner
            .flat_map(|(data, time, diff)| {
                let mut neg_diff = diff.clone();
                neg_diff.negate();
                let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
                let alt = (data, time, diff);
                Some(alt).into_iter().chain(Some(neu))
            })
            .as_collection()
    }
}

impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
where
    G: Scope,
    D: Data,
    R: Abelian + 'static,
{
    // We discard each `neu` variant and strip off the `alt` wrapper.
    fn integrate(&self) -> Collection<G, D, R> {
        self.inner
            .filter(|(_d,t,_r)| !t.neu)
            .as_collection()
            .leave()
    }
}