differential_dataflow/trace/
description.rs

1//! Descriptions of intervals of partially ordered times.
2//!
3//! A description provides what intends to be an unambiguous characterization of a batch of
4//! updates. We do assume that these updates are in the context of a known computation and
5//! known input, so there is a well-defined "correct answer" for the full set of updates.
6//!
7//! ```ignore
8//!      full = { (data, time, diff) }
9//! ```
10//!
11//! Our aim with a description is to specify a subset of these updates unambiguously.
12//!
13//! Each description contains three frontiers, sets of mutually incomparable partially ordered
14//! times. The first two frontiers are `lower` and `upper`, and they indicate the subset of
15//! `full` represented in the batch: those updates whose times are greater or equal to some
16//! element of `lower` but not greater or equal to any element of `upper`.
17//!
18//! ```ignore
19//!     subset = { (data, time, diff) in full | lower.any(|t| t.le(time)) &&
20//!                                            !upper.any(|t| t.le(time)) }
21//! ```
22//!
23//! The third frontier `since` is used to indicate that the times presented by the batch may
24//! no longer reflect the values in `subset` above. Although the updates are precisely those
25//! bound by `lower` and `upper`, we may have *advanced* some of the times.
26//!
27//! The guarantee provided by a batch is that for any time greater or equal to some element of
28//! `since`, the accumulated weight of batch updates before that time is identical to the accumulated
29//! weights of updates from `full` at times greater or equal to an element of `lower`, greater
30//! or equal to no element of `upper`, and less or equal to the query time.
31//!
32//! ```ignore
33//!     for all times t1:
34//!
35//!        if since.any(|t2| t2.less_equal(t1)) then:
36//!
37//!            for all data:
38//!
39//!                sum x where (data, t2, x) in batch and t2.less_equal(t1)
40//!            ==
41//!                sum x where (data, t2, x) in full and  t2.less_equal(t1)
42//!                                                  and  lower.any(|t3| t3.less_equal(t2))
43//!                                                  and !upper.any(|t3| t3.less_equal(t2))
44//! ```
45//!
46//! Very importantly, this equality does not make any other guarantees about the contents of
47//! the batch when one iterates through it. There are some consequences of the math that can
48//! be relied upon, though.
49//!
50//! The most important consequence is that when `since <= lower` the contents of the batch
51//! must be identical to the updates in `subset`. If it is ever the case that `since` is
52//! in advance of `lower`, consumers of the batch must take care that they not use the times
53//! observed in the batch without ensuring that they are appropriately advanced (typically by
54//! `since`). Failing to do so may produce updates that are not in advance of `since`, which
55//! will often be a logic bug, as `since` does not advance without a corresponding advance in
56//! times at which data may possibly be sent.
57
58use timely::{PartialOrder, progress::Antichain};
59use serde::{Serialize, Deserialize};
60
61/// Describes an interval of partially ordered times.
62///
63/// A `Description` indicates a set of partially ordered times, and a moment at which they are
64/// observed. The `lower` and `upper` frontiers bound the times contained within, and the `since`
65/// frontier indicates a moment at which the times were observed. If `since` is strictly in
66/// advance of `lower`, the contained times may be "advanced" to times which appear equivalent to
67/// any time after `since`.
68#[derive(Clone, Debug, Serialize, Deserialize)]
69pub struct Description<Time> {
70    /// lower frontier of contained updates.
71    lower: Antichain<Time>,
72    /// upper frontier of contained updates.
73    upper: Antichain<Time>,
74    /// frontier used for update compaction.
75    since: Antichain<Time>,
76}
77
78impl<Time: PartialOrder+Clone> Description<Time> {
79    /// Returns a new description from its component parts.
80    pub fn new(lower: Antichain<Time>, upper: Antichain<Time>, since: Antichain<Time>) -> Self {
81        assert!(!lower.elements().is_empty());    // this should always be true.
82        // assert!(upper.len() > 0);            // this may not always be true.
83        Description {
84            lower,
85            upper,
86            since,
87        }
88    }
89}
90
91impl<Time> Description<Time> {
92    /// The lower envelope for times in the interval.
93    pub fn lower(&self) -> &Antichain<Time> { &self.lower }
94    /// The upper envelope for times in the interval.
95    pub fn upper(&self) -> &Antichain<Time> { &self.upper }
96    /// Times from whose future the interval may be observed.
97    pub fn since(&self) -> &Antichain<Time> { &self.since }
98}
99
100impl<Time: PartialEq> PartialEq for Description<Time> {
101    fn eq(&self, other: &Self) -> bool {
102        self.lower.eq(other.lower())
103            && self.upper.eq(other.upper())
104            && self.since.eq(other.since())
105    }
106}
107
108impl<Time: Eq> Eq for Description<Time> {}