timely/progress/
timestamp.rs

1//! A partially ordered measure of progress at each timely dataflow location.
2
3use std::fmt::Debug;
4use std::any::Any;
5use std::default::Default;
6use std::hash::Hash;
7
8use crate::ExchangeData;
9use crate::order::PartialOrder;
10
11/// A composite trait for types that serve as timestamps in timely dataflow.
12///
13/// By implementing this trait, you promise that the type's [PartialOrder] implementation
14/// is compatible with [Ord], such that if `a.less_equal(b)` then `a <= b`.
15pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord {
16    /// A type summarizing action on a timestamp along a dataflow path.
17    type Summary : PathSummary<Self> + 'static;
18    /// A unique minimum value in our partial order.
19    ///
20    /// This value will often be used as an initial value, and should be cheap to construct.
21    fn minimum() -> Self;
22}
23
24/// A summary of how a timestamp advances along a timely dataflow path.
25pub trait PathSummary<T> : Clone+'static+Eq+PartialOrder+Debug+Default {
26    /// Advances a timestamp according to the timestamp actions on the path.
27    ///
28    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
29    /// incrementing fields would result in integer overflow. In this case, `results_in` should
30    /// return `None`.
31    ///
32    /// The `feedback` operator, apparently the only point where timestamps are actually incremented
33    /// in computation, uses this method and will drop messages with timestamps that when advanced
34    /// result in `None`. Ideally, all other timestamp manipulation should behave similarly.
35    ///
36    /// This function must be monotonic increasing in both inputs.
37    /// If `s1.less_equal(&s2)` then for all `t` we have `s1.results_in(&t).less_equal(&s2.results_in(&t))`.
38    /// If `t1.less_equal(&t2)` then for all `s` we have `s.results_in(&t1).less_equal(&s.results_in(&t2))`.
39    ///
40    /// Note that `Self::default()` is expected to behave as an "empty" or "noop" summary, such that
41    /// `Self::default().results_in(&t) == Some(t)`. The default summary does not need to be a minimal
42    /// summary, in that summaries are technically permitted to walk timestamps backwards. Care should
43    /// be used when doing this to avoid potentially cyclic dataflows without strict timestamp advancement.
44    ///
45    /// # Examples
46    /// ```
47    /// use timely::progress::timestamp::PathSummary;
48    ///
49    /// let timestamp = 3;
50    ///
51    /// let summary1 = 5;
52    /// let summary2 = usize::MAX - 2;
53    ///
54    /// assert_eq!(summary1.results_in(&timestamp), Some(8));
55    /// assert_eq!(summary2.results_in(&timestamp), None);
56    /// ```
57    fn results_in(&self, src: &T) -> Option<T>;
58    /// Composes this path summary with another path summary.
59    ///
60    /// It is possible that the two composed paths result in an invalid summary, for example when
61    /// integer additions overflow. If it is correct that all timestamps moved along these paths
62    /// would also result in overflow and be discarded, `followed_by` can return `None`. It is very
63    /// important that this not be used casually, as this does not prevent the actual movement of
64    /// data.
65    ///
66    /// Calling `results_in` on the composed summary should behave the same as though the two
67    /// summaries were applied to the argument in order.
68    ///
69    /// # Examples
70    /// ```
71    /// use timely::progress::timestamp::PathSummary;
72    ///
73    /// let summary1 = 5;
74    /// let summary2 = usize::MAX - 3;
75    ///
76    /// assert_eq!(summary1.followed_by(&summary2), None);
77    ///
78    /// let time = 10;
79    /// let summary2 = 15;
80    /// assert_eq!(
81    ///     // Applying the composed summary...
82    ///     summary1.followed_by(&summary2).and_then(|s| s.results_in(&time)),
83    ///     // ...has the same result as applying the two summaries in sequence.
84    ///     summary1.results_in(&time).and_then(|t| summary2.results_in(&t)),
85    /// );
86    ///
87    /// ```
88    fn followed_by(&self, other: &Self) -> Option<Self>;
89}
90
91impl Timestamp for () { type Summary = (); fn minimum() -> Self { }}
92impl PathSummary<()> for () {
93    #[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) }
94    #[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) }
95}
96
97/// Implements [`Timestamp`] and [`PathSummary`] for types with a `checked_add` method.
98macro_rules! implement_timestamp_add {
99    ($($index_type:ty,)*) => (
100        $(
101            impl Timestamp for $index_type {
102                type Summary = $index_type;
103                fn minimum() -> Self { Self::MIN }
104            }
105            impl PathSummary<$index_type> for $index_type {
106                #[inline]
107                fn results_in(&self, src: &$index_type) -> Option<$index_type> { self.checked_add(*src) }
108                #[inline]
109                fn followed_by(&self, other: &$index_type) -> Option<$index_type> { self.checked_add(*other) }
110            }
111        )*
112    )
113}
114
115implement_timestamp_add!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8,);
116
117impl Timestamp for ::std::time::Duration {
118    type Summary = ::std::time::Duration;
119    fn minimum() -> Self { ::std::time::Duration::new(0, 0) }
120}
121impl PathSummary<::std::time::Duration> for ::std::time::Duration {
122    #[inline]
123    fn results_in(&self, src: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*src) }
124    #[inline]
125    fn followed_by(&self, other: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*other) }
126}
127
128pub use self::refines::Refines;
129mod refines {
130
131    use crate::progress::Timestamp;
132
133    /// Conversion between pointstamp types.
134    ///
135    /// This trait is central to nested scopes, for which the inner timestamp must be
136    /// related to the outer timestamp. These methods define those relationships.
137    ///
138    /// It would be ideal to use Rust's From and Into traits, but they seem to be messed
139    /// up due to coherence: we can't implement `Into` because it induces a from implementation
140    /// we can't control.
141    pub trait Refines<T: Timestamp> : Timestamp {
142        /// Converts the outer timestamp to an inner timestamp.
143        fn to_inner(other: T) -> Self;
144        /// Converts the inner timestamp to an outer timestamp.
145        fn to_outer(self) -> T;
146        /// Summarizes an inner path summary as an outer path summary.
147        ///
148        /// It is crucial for correctness that the result of this summarization's `results_in`
149        /// method is equivalent to `|time| path.results_in(time.to_inner()).to_outer()`, or
150        /// at least produces times less or equal to that result.
151        fn summarize(path: <Self as Timestamp>::Summary) -> <T as Timestamp>::Summary;
152    }
153
154    /// All types "refine" themselves,
155    impl<T: Timestamp> Refines<T> for T {
156        fn to_inner(other: T) -> T { other }
157        fn to_outer(self) -> T { self }
158        fn summarize(path: <T as Timestamp>::Summary) -> <T as Timestamp>::Summary { path }
159    }
160
161    /// Implements `Refines<()>` for most types.
162    ///
163    /// We have a macro here because a blanket implement would conflict with the "refines self"
164    /// blanket implementation just above. Waiting on specialization to fix that, I guess.
165    macro_rules! implement_refines_empty {
166        ($($index_type:ty,)*) => (
167            $(
168                impl Refines<()> for $index_type {
169                    fn to_inner(_: ()) -> $index_type { Default::default() }
170                    fn to_outer(self) -> () { }
171                    fn summarize(_: <$index_type as Timestamp>::Summary) -> () { }
172                }
173            )*
174        )
175    }
176
177    implement_refines_empty!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, ::std::time::Duration,);
178}