1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
//! Traits and implementations for differentiating and integrating collections.
//!
//! The `Differentiate` and `Integrate` traits allow us to move between standard differential
//! dataflow collections, and collections that describe their instantaneous change. The first
//! trait converts a collection to one that contains each change at the moment it occurs, but
//! then immediately retracting it. The second trait takes such a representation are recreates
//! the collection from its instantaneous changes.
//!
//! These two traits together allow us to build dataflows that maintain computates over inputs
//! that are the instantaneous changes, and then to reconstruct collections from them. The most
//! clear use case for this are "delta query" implementations of relational joins, where linearity
//! allows us to write dataflows based on instantaneous changes, whose "accumluated state" is
//! almost everywhere empty (and so has a low memory footprint, if the system works as planned).
use timely::dataflow::Scope;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::{Filter, Map};
use differential_dataflow::{AsCollection, Collection, Data};
use differential_dataflow::difference::Abelian;
use crate::altneu::AltNeu;
/// Produce a collection containing the changes at the moments they happen.
pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
}
/// Collect instantaneous changes back in to a collection.
pub trait Integrate<G: Scope, D: Data, R: Abelian> {
fn integrate(&self) -> Collection<G, D, R>;
}
impl<G, D, R> Differentiate<G, D, R> for Collection<G, D, R>
where
G: Scope,
D: Data,
R: Abelian + 'static,
{
// For each (data, Alt(time), diff) we add a (data, Neu(time), -diff).
fn differentiate<'a>(&self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
self.enter(child)
.inner
.flat_map(|(data, time, diff)| {
let mut neg_diff = diff.clone();
neg_diff.negate();
let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
let alt = (data, time, diff);
Some(alt).into_iter().chain(Some(neu))
})
.as_collection()
}
}
impl<'a, G, D, R> Integrate<G, D, R> for Collection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
where
G: Scope,
D: Data,
R: Abelian + 'static,
{
// We discard each `neu` variant and strip off the `alt` wrapper.
fn integrate(&self) -> Collection<G, D, R> {
self.inner
.filter(|(_d,t,_r)| !t.neu)
.as_collection()
.leave()
}
}