timely/progress/timestamp.rs
1//! A partially ordered measure of progress at each timely dataflow location.
2
3use std::fmt::Debug;
4use std::any::Any;
5use std::default::Default;
6
7use crate::ExchangeData;
8use crate::order::PartialOrder;
9
10/// A composite trait for types that serve as timestamps in timely dataflow.
11///
12/// By implementing this trait, you promise that the type's [PartialOrder] implementation
13/// is compatible with [Ord], such that if `a.less_equal(b)` then `a <= b`.
14pub trait Timestamp: Clone+Eq+PartialOrder+Ord+Debug+Any+ExchangeData {
15 /// A type summarizing action on a timestamp along a dataflow path.
16 type Summary : PathSummary<Self> + 'static;
17 /// A unique minimum value in our partial order.
18 ///
19 /// This value will often be used as an initial value, and should be cheap to construct.
20 fn minimum() -> Self;
21}
22
23/// A summary of how a timestamp advances along a timely dataflow path.
24pub trait PathSummary<T> : Clone+Eq+PartialOrder+Debug+Default {
25 /// Advances a timestamp according to the timestamp actions on the path.
26 ///
27 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
28 /// incrementing fields would result in integer overflow. In this case, `results_in` should
29 /// return `None`.
30 ///
31 /// The `feedback` operator, apparently the only point where timestamps are actually incremented
32 /// in computation, uses this method and will drop messages with timestamps that when advanced
33 /// result in `None`. Ideally, all other timestamp manipulation should behave similarly.
34 ///
35 /// This function must be monotonic increasing in both inputs.
36 /// If `s1.less_equal(&s2)` then for all `t` we have `s1.results_in(&t).less_equal(&s2.results_in(&t))`.
37 /// If `t1.less_equal(&t2)` then for all `s` we have `s.results_in(&t1).less_equal(&s.results_in(&t2))`.
38 ///
39 /// Note that `Self::default()` is expected to behave as an "empty" or "noop" summary, such that
40 /// `Self::default().results_in(&t) == Some(t)`. The default summary does not need to be a minimal
41 /// summary, in that summaries are technically permitted to walk timestamps backwards. Care should
42 /// be used when doing this to avoid potentially cyclic dataflows without strict timestamp advancement.
43 ///
44 /// # Examples
45 /// ```
46 /// use timely::progress::timestamp::PathSummary;
47 ///
48 /// let timestamp = 3;
49 ///
50 /// let summary1 = 5;
51 /// let summary2 = usize::MAX - 2;
52 ///
53 /// assert_eq!(summary1.results_in(×tamp), Some(8));
54 /// assert_eq!(summary2.results_in(×tamp), None);
55 /// ```
56 fn results_in(&self, src: &T) -> Option<T>;
57 /// Composes this path summary with another path summary.
58 ///
59 /// It is possible that the two composed paths result in an invalid summary, for example when
60 /// integer additions overflow. If it is correct that all timestamps moved along these paths
61 /// would also result in overflow and be discarded, `followed_by` can return `None`. It is very
62 /// important that this not be used casually, as this does not prevent the actual movement of
63 /// data.
64 ///
65 /// Calling `results_in` on the composed summary should behave the same as though the two
66 /// summaries were applied to the argument in order.
67 ///
68 /// # Examples
69 /// ```
70 /// use timely::progress::timestamp::PathSummary;
71 ///
72 /// let summary1 = 5;
73 /// let summary2 = usize::MAX - 3;
74 ///
75 /// assert_eq!(summary1.followed_by(&summary2), None);
76 ///
77 /// let time = 10;
78 /// let summary2 = 15;
79 /// assert_eq!(
80 /// // Applying the composed summary...
81 /// summary1.followed_by(&summary2).and_then(|s| s.results_in(&time)),
82 /// // ...has the same result as applying the two summaries in sequence.
83 /// summary1.results_in(&time).and_then(|t| summary2.results_in(&t)),
84 /// );
85 ///
86 /// ```
87 fn followed_by(&self, other: &Self) -> Option<Self>;
88}
89
90impl Timestamp for () { type Summary = (); fn minimum() -> Self { }}
91impl PathSummary<()> for () {
92 #[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) }
93 #[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) }
94}
95
96/// Implements [`Timestamp`] and [`PathSummary`] for types with a `checked_add` method.
97macro_rules! implement_timestamp_add {
98 ($($index_type:ty,)*) => (
99 $(
100 impl Timestamp for $index_type {
101 type Summary = $index_type;
102 fn minimum() -> Self { Self::MIN }
103 }
104 impl PathSummary<$index_type> for $index_type {
105 #[inline]
106 fn results_in(&self, src: &$index_type) -> Option<$index_type> { self.checked_add(*src) }
107 #[inline]
108 fn followed_by(&self, other: &$index_type) -> Option<$index_type> { self.checked_add(*other) }
109 }
110 )*
111 )
112}
113
114implement_timestamp_add!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8,);
115
116impl Timestamp for ::std::time::Duration {
117 type Summary = ::std::time::Duration;
118 fn minimum() -> Self { ::std::time::Duration::new(0, 0) }
119}
120impl PathSummary<::std::time::Duration> for ::std::time::Duration {
121 #[inline]
122 fn results_in(&self, src: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*src) }
123 #[inline]
124 fn followed_by(&self, other: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*other) }
125}
126
127pub use self::refines::Refines;
128mod refines {
129
130 use crate::progress::Timestamp;
131
132 /// Conversion between pointstamp types.
133 ///
134 /// This trait is central to nested scopes, for which the inner timestamp must be
135 /// related to the outer timestamp. These methods define those relationships.
136 ///
137 /// It would be ideal to use Rust's From and Into traits, but they seem to be messed
138 /// up due to coherence: we can't implement `Into` because it induces a from implementation
139 /// we can't control.
140 pub trait Refines<T: Timestamp> : Timestamp {
141 /// Converts the outer timestamp to an inner timestamp.
142 fn to_inner(other: T) -> Self;
143 /// Converts the inner timestamp to an outer timestamp.
144 fn to_outer(self) -> T;
145 /// Summarizes an inner path summary as an outer path summary.
146 ///
147 /// It is crucial for correctness that the result of this summarization's `results_in`
148 /// method is equivalent to `|time| path.results_in(time.to_inner()).to_outer()`, or
149 /// at least produces times less or equal to that result.
150 fn summarize(path: <Self as Timestamp>::Summary) -> <T as Timestamp>::Summary;
151 }
152
153 /// All types "refine" themselves,
154 impl<T: Timestamp> Refines<T> for T {
155 fn to_inner(other: T) -> T { other }
156 fn to_outer(self) -> T { self }
157 fn summarize(path: <T as Timestamp>::Summary) -> <T as Timestamp>::Summary { path }
158 }
159
160 /// Implements `Refines<()>` for most types.
161 ///
162 /// We have a macro here because a blanket implement would conflict with the "refines self"
163 /// blanket implementation just above. Waiting on specialization to fix that, I guess.
164 macro_rules! implement_refines_empty {
165 ($($index_type:ty,)*) => (
166 $(
167 impl Refines<()> for $index_type {
168 fn to_inner(_: ()) -> $index_type { Default::default() }
169 fn to_outer(self) -> () { }
170 fn summarize(_: <$index_type as Timestamp>::Summary) -> () { }
171 }
172 )*
173 )
174 }
175
176 implement_refines_empty!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, ::std::time::Duration,);
177}