timely/progress/
timestamp.rs

1//! A partially ordered measure of progress at each timely dataflow location.
2
3use std::fmt::Debug;
4use std::any::Any;
5use std::default::Default;
6use std::hash::Hash;
7
8use crate::ExchangeData;
9use crate::order::PartialOrder;
10
11/// A composite trait for types that serve as timestamps in timely dataflow.
12pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord {
13    /// A type summarizing action on a timestamp along a dataflow path.
14    type Summary : PathSummary<Self> + 'static;
15    /// A minimum value suitable as a default.
16    fn minimum() -> Self;
17}
18
19/// A summary of how a timestamp advances along a timely dataflow path.
20pub trait PathSummary<T> : Clone+'static+Eq+PartialOrder+Debug+Default {
21    /// Advances a timestamp according to the timestamp actions on the path.
22    ///
23    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
24    /// incrementing fields would result in integer overflow. In this case, `results_in` should
25    /// return `None`.
26    ///
27    /// The `feedback` operator, apparently the only point where timestamps are actually incremented
28    /// in computation, uses this method and will drop messages with timestamps that when advanced
29    /// result in `None`. Ideally, all other timestamp manipulation should behave similarly.
30    ///
31    /// # Examples
32    /// ```
33    /// use timely::progress::timestamp::PathSummary;
34    ///
35    /// let timestamp = 3;
36    ///
37    /// let summary1 = 5;
38    /// let summary2 = usize::max_value() - 2;
39    ///
40    /// assert_eq!(summary1.results_in(&timestamp), Some(8));
41    /// assert_eq!(summary2.results_in(&timestamp), None);
42    /// ```
43    fn results_in(&self, src: &T) -> Option<T>;
44    /// Composes this path summary with another path summary.
45    ///
46    /// It is possible that the two composed paths result in an invalid summary, for example when
47    /// integer additions overflow. If it is correct that all timestamps moved along these paths
48    /// would also result in overflow and be discarded, `followed_by` can return `None`. It is very
49    /// important that this not be used casually, as this does not prevent the actual movement of
50    /// data.
51    ///
52    /// # Examples
53    /// ```
54    /// use timely::progress::timestamp::PathSummary;
55    ///
56    /// let summary1 = 5;
57    /// let summary2 = usize::max_value() - 3;
58    ///
59    /// assert_eq!(summary1.followed_by(&summary2), None);
60    /// ```
61    fn followed_by(&self, other: &Self) -> Option<Self>;
62}
63
64impl Timestamp for () { type Summary = (); fn minimum() -> Self { }}
65impl PathSummary<()> for () {
66    #[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) }
67    #[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) }
68}
69
70/// Implements [`Timestamp`] and [`PathSummary`] for types with a `checked_add` method.
71macro_rules! implement_timestamp_add {
72    ($($index_type:ty,)*) => (
73        $(
74            impl Timestamp for $index_type {
75                type Summary = $index_type;
76                fn minimum() -> Self { Self::MIN }
77            }
78            impl PathSummary<$index_type> for $index_type {
79                #[inline]
80                fn results_in(&self, src: &$index_type) -> Option<$index_type> { self.checked_add(*src) }
81                #[inline]
82                fn followed_by(&self, other: &$index_type) -> Option<$index_type> { self.checked_add(*other) }
83            }
84        )*
85    )
86}
87
88implement_timestamp_add!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8,);
89
90impl Timestamp for ::std::time::Duration {
91    type Summary = ::std::time::Duration;
92    fn minimum() -> Self { ::std::time::Duration::new(0, 0) }
93}
94impl PathSummary<::std::time::Duration> for ::std::time::Duration {
95    #[inline]
96    fn results_in(&self, src: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*src) }
97    #[inline]
98    fn followed_by(&self, other: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*other) }
99}
100
101pub use self::refines::Refines;
102mod refines {
103
104    use crate::progress::Timestamp;
105
106    /// Conversion between pointstamp types.
107    ///
108    /// This trait is central to nested scopes, for which the inner timestamp must be
109    /// related to the outer timestamp. These methods define those relationships.
110    ///
111    /// It would be ideal to use Rust's From and Into traits, but they seem to be messed
112    /// up due to coherence: we can't implement `Into` because it induces a from implementation
113    /// we can't control.
114    pub trait Refines<T: Timestamp> : Timestamp {
115        /// Converts the outer timestamp to an inner timestamp.
116        fn to_inner(other: T) -> Self;
117        /// Converts the inner timestamp to an outer timestamp.
118        fn to_outer(self) -> T;
119        /// Summarizes an inner path summary as an outer path summary.
120        ///
121        /// It is crucial for correctness that the result of this summarization's `results_in`
122        /// method is equivalent to `|time| path.results_in(time.to_inner()).to_outer()`, or
123        /// at least produces times less or equal to that result.
124        fn summarize(path: <Self as Timestamp>::Summary) -> <T as Timestamp>::Summary;
125    }
126
127    /// All types "refine" themselves,
128    impl<T: Timestamp> Refines<T> for T {
129        fn to_inner(other: T) -> T { other }
130        fn to_outer(self) -> T { self }
131        fn summarize(path: <T as Timestamp>::Summary) -> <T as Timestamp>::Summary { path }
132    }
133
134    /// Implements `Refines<()>` for most types.
135    ///
136    /// We have a macro here because a blanket implement would conflict with the "refines self"
137    /// blanket implementation just above. Waiting on specialization to fix that, I guess.
138    macro_rules! implement_refines_empty {
139        ($($index_type:ty,)*) => (
140            $(
141                impl Refines<()> for $index_type {
142                    fn to_inner(_: ()) -> $index_type { Default::default() }
143                    fn to_outer(self) -> () { }
144                    fn summarize(_: <$index_type as Timestamp>::Summary) -> () { }
145                }
146            )*
147        )
148    }
149
150    implement_refines_empty!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, ::std::time::Duration,);
151}