differential_dogs3/
calculus.rs

1//! Traits and implementations for differentiating and integrating collections.
2//!
3//! The `Differentiate` and `Integrate` traits allow us to move between standard differential
4//! dataflow collections, and collections that describe their instantaneous change. The first
5//! trait converts a collection to one that contains each change at the moment it occurs, but
6//! then immediately retracting it. The second trait takes such a representation are recreates
7//! the collection from its instantaneous changes.
8//!
9//! These two traits together allow us to build dataflows that maintain computates over inputs
10//! that are the instantaneous changes, and then to reconstruct collections from them. The most
11//! clear use case for this are "delta query" implementations of relational joins, where linearity
12//! allows us to write dataflows based on instantaneous changes, whose "accumluated state" is
13//! almost everywhere empty (and so has a low memory footprint, if the system works as planned).
14
15use timely::dataflow::Scope;
16use timely::dataflow::scopes::Child;
17use timely::dataflow::operators::{Filter, Map};
18use differential_dataflow::{AsCollection, Collection, Data};
19use differential_dataflow::difference::Abelian;
20
21use crate::altneu::AltNeu;
22
23/// Produce a collection containing the changes at the moments they happen.
24pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
25    fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
26}
27
28/// Collect instantaneous changes back in to a collection.
29pub trait Integrate<G: Scope, D: Data, R: Abelian> {
30    fn integrate(&self) -> Collection<G, D, R>;
31}
32
33impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
34where
35    G: Scope,
36    D: Data,
37    R: Abelian + 'static,
38{
39    // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
40    fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
41        self.enter(child)
42            .inner
43            .flat_map(|(data, time, diff)| {
44                let mut neg_diff = diff.clone();
45                neg_diff.negate();
46                let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
47                let alt = (data, time, diff);
48                Some(alt).into_iter().chain(Some(neu))
49            })
50            .as_collection()
51    }
52}
53
54impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
55where
56    G: Scope,
57    D: Data,
58    R: Abelian + 'static,
59{
60    // We discard each `neu` variant and strip off the `alt` wrapper.
61    fn integrate(&self) -> Collection<G, D, R> {
62        self.inner
63            .filter(|(_d,t,_r)| !t.neu)
64            .as_collection()
65            .leave()
66    }
67}