differential_dataflow/trace/description.rs
1//! Descriptions of intervals of partially ordered times.
2//!
3//! A description provides what intends to be an unambiguous characterization of a batch of
4//! updates. We do assume that these updates are in the context of a known computation and
5//! known input, so there is a well-defined "correct answer" for the full set of updates.
6//!
7//! ```ignore
8//! full = { (data, time, diff) }
9//! ```
10//!
11//! Our aim with a description is to specify a subset of these updates unambiguously.
12//!
13//! Each description contains three frontiers, sets of mutually incomparable partially ordered
14//! times. The first two frontiers are `lower` and `upper`, and they indicate the subset of
15//! `full` represented in the batch: those updates whose times are greater or equal to some
16//! element of `lower` but not greater or equal to any element of `upper`.
17//!
18//! ```ignore
19//! subset = { (data, time, diff) in full | lower.any(|t| t.le(time)) &&
20//! !upper.any(|t| t.le(time)) }
21//! ```
22//!
23//! The third frontier `since` is used to indicate that the times presented by the batch may
24//! no longer reflect the values in `subset` above. Although the updates are precisely those
25//! bound by `lower` and `upper`, we may have *advanced* some of the times.
26//!
27//! The guarantee provided by a batch is that for any time greater or equal to some element of
28//! `since`, the accumulated weight of batch updates before that time is identical to the accumulated
29//! weights of updates from `full` at times greater or equal to an element of `lower`, greater
30//! or equal to no element of `upper`, and less or equal to the query time.
31//!
32//! ```ignore
33//! for all times t1:
34//!
35//! if since.any(|t2| t2.less_equal(t1)) then:
36//!
37//! for all data:
38//!
39//! sum x where (data, t2, x) in batch and t2.less_equal(t1)
40//! ==
41//! sum x where (data, t2, x) in full and t2.less_equal(t1)
42//! and lower.any(|t3| t3.less_equal(t2))
43//! and !upper.any(|t3| t3.less_equal(t2))
44//! ```
45//!
46//! Very importantly, this equality does not make any other guarantees about the contents of
47//! the batch when one iterates through it. There are some consequences of the math that can
48//! be relied upon, though.
49//!
50//! The most important consequence is that when `since <= lower` the contents of the batch
51//! must be identical to the updates in `subset`. If it is ever the case that `since` is
52//! in advance of `lower`, consumers of the batch must take care that they not use the times
53//! observed in the batch without ensuring that they are appropriately advanced (typically by
54//! `since`). Failing to do so may produce updates that are not in advance of `since`, which
55//! will often be a logic bug, as `since` does not advance without a corresponding advance in
56//! times at which data may possibly be sent.
57
58use timely::{PartialOrder, progress::Antichain};
59use serde::{Serialize, Deserialize};
60
61/// Describes an interval of partially ordered times.
62///
63/// A `Description` indicates a set of partially ordered times, and a moment at which they are
64/// observed. The `lower` and `upper` frontiers bound the times contained within, and the `since`
65/// frontier indicates a moment at which the times were observed. If `since` is strictly in
66/// advance of `lower`, the contained times may be "advanced" to times which appear equivalent to
67/// any time after `since`.
68#[derive(Clone, Debug, Serialize, Deserialize)]
69pub struct Description<Time> {
70 /// lower frontier of contained updates.
71 lower: Antichain<Time>,
72 /// upper frontier of contained updates.
73 upper: Antichain<Time>,
74 /// frontier used for update compaction.
75 since: Antichain<Time>,
76}
77
78impl<Time: PartialOrder+Clone> Description<Time> {
79 /// Returns a new description from its component parts.
80 pub fn new(lower: Antichain<Time>, upper: Antichain<Time>, since: Antichain<Time>) -> Self {
81 assert!(!lower.elements().is_empty()); // this should always be true.
82 // assert!(upper.len() > 0); // this may not always be true.
83 Description {
84 lower,
85 upper,
86 since,
87 }
88 }
89}
90
91impl<Time> Description<Time> {
92 /// The lower envelope for times in the interval.
93 pub fn lower(&self) -> &Antichain<Time> { &self.lower }
94 /// The upper envelope for times in the interval.
95 pub fn upper(&self) -> &Antichain<Time> { &self.upper }
96 /// Times from whose future the interval may be observed.
97 pub fn since(&self) -> &Antichain<Time> { &self.since }
98}
99
100impl<Time: PartialEq> PartialEq for Description<Time> {
101 fn eq(&self, other: &Self) -> bool {
102 self.lower.eq(other.lower())
103 && self.upper.eq(other.upper())
104 && self.since.eq(other.since())
105 }
106}
107
108impl<Time: Eq> Eq for Description<Time> {}