differential_dogs3/calculus.rs
1//! Traits and implementations for differentiating and integrating collections.
2//!
3//! The `Differentiate` and `Integrate` traits allow us to move between standard differential
4//! dataflow collections, and collections that describe their instantaneous change. The first
5//! trait converts a collection to one that contains each change at the moment it occurs, but
6//! then immediately retracting it. The second trait takes such a representation are recreates
7//! the collection from its instantaneous changes.
8//!
9//! These two traits together allow us to build dataflows that maintain computates over inputs
10//! that are the instantaneous changes, and then to reconstruct collections from them. The most
11//! clear use case for this are "delta query" implementations of relational joins, where linearity
12//! allows us to write dataflows based on instantaneous changes, whose "accumluated state" is
13//! almost everywhere empty (and so has a low memory footprint, if the system works as planned).
14
15use timely::dataflow::Scope;
16use timely::dataflow::scopes::Child;
17use timely::dataflow::operators::{Filter, Map};
18use differential_dataflow::{AsCollection, Collection, Data};
19use differential_dataflow::difference::Abelian;
20
21use crate::altneu::AltNeu;
22
23/// Produce a collection containing the changes at the moments they happen.
24pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
25 fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
26}
27
28/// Collect instantaneous changes back in to a collection.
29pub trait Integrate<G: Scope, D: Data, R: Abelian> {
30 fn integrate(&self) -> Collection<G, D, R>;
31}
32
33impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
34where
35 G: Scope,
36 D: Data,
37 R: Abelian + 'static,
38{
39 // For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
40 fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
41 self.enter(child)
42 .inner
43 .flat_map(|(data, time, diff)| {
44 let mut neg_diff = diff.clone();
45 neg_diff.negate();
46 let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
47 let alt = (data, time, diff);
48 Some(alt).into_iter().chain(Some(neu))
49 })
50 .as_collection()
51 }
52}
53
54impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
55where
56 G: Scope,
57 D: Data,
58 R: Abelian + 'static,
59{
60 // We discard each `neu` variant and strip off the `alt` wrapper.
61 fn integrate(&self) -> Collection<G, D, R> {
62 self.inner
63 .filter(|(_d,t,_r)| !t.neu)
64 .as_collection()
65 .leave()
66 }
67}