1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
//! A partially ordered measure of progress at each timely dataflow location.
use std::fmt::Debug;
use std::any::Any;
use std::default::Default;
use std::hash::Hash;
use crate::ExchangeData;
use crate::order::PartialOrder;
/// A composite trait for types that serve as timestamps in timely dataflow.
pub trait Timestamp: Clone+Eq+PartialOrder+Debug+Send+Any+ExchangeData+Hash+Ord {
/// A type summarizing action on a timestamp along a dataflow path.
type Summary : PathSummary<Self> + 'static;
/// A minimum value suitable as a default.
fn minimum() -> Self;
}
/// A summary of how a timestamp advances along a timely dataflow path.
pub trait PathSummary<T> : Clone+'static+Eq+PartialOrder+Debug+Default {
/// Advances a timestamp according to the timestamp actions on the path.
///
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
/// incrementing fields would result in integer overflow. In this case, `results_in` should
/// return `None`.
///
/// The `feedback` operator, apparently the only point where timestamps are actually incremented
/// in computation, uses this method and will drop messages with timestamps that when advanced
/// result in `None`. Ideally, all other timestamp manipulation should behave similarly.
///
/// # Examples
/// ```
/// use timely::progress::timestamp::PathSummary;
///
/// let timestamp = 3;
///
/// let summary1 = 5;
/// let summary2 = usize::max_value() - 2;
///
/// assert_eq!(summary1.results_in(×tamp), Some(8));
/// assert_eq!(summary2.results_in(×tamp), None);
/// ```
fn results_in(&self, src: &T) -> Option<T>;
/// Composes this path summary with another path summary.
///
/// It is possible that the two composed paths result in an invalid summary, for example when
/// integer additions overflow. If it is correct that all timestamps moved along these paths
/// would also result in overflow and be discarded, `followed_by` can return `None`. It is very
/// important that this not be used casually, as this does not prevent the actual movement of
/// data.
///
/// # Examples
/// ```
/// use timely::progress::timestamp::PathSummary;
///
/// let summary1 = 5;
/// let summary2 = usize::max_value() - 3;
///
/// assert_eq!(summary1.followed_by(&summary2), None);
/// ```
fn followed_by(&self, other: &Self) -> Option<Self>;
}
impl Timestamp for () { type Summary = (); fn minimum() -> Self { }}
impl PathSummary<()> for () {
#[inline] fn results_in(&self, _src: &()) -> Option<()> { Some(()) }
#[inline] fn followed_by(&self, _other: &()) -> Option<()> { Some(()) }
}
/// Implements [`Timestamp`] and [`PathSummary`] for types with a `checked_add` method.
macro_rules! implement_timestamp_add {
($($index_type:ty,)*) => (
$(
impl Timestamp for $index_type {
type Summary = $index_type;
fn minimum() -> Self { Self::MIN }
}
impl PathSummary<$index_type> for $index_type {
#[inline]
fn results_in(&self, src: &$index_type) -> Option<$index_type> { self.checked_add(*src) }
#[inline]
fn followed_by(&self, other: &$index_type) -> Option<$index_type> { self.checked_add(*other) }
}
)*
)
}
implement_timestamp_add!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8,);
impl Timestamp for ::std::time::Duration {
type Summary = ::std::time::Duration;
fn minimum() -> Self { ::std::time::Duration::new(0, 0) }
}
impl PathSummary<::std::time::Duration> for ::std::time::Duration {
#[inline]
fn results_in(&self, src: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*src) }
#[inline]
fn followed_by(&self, other: &::std::time::Duration) -> Option<::std::time::Duration> { self.checked_add(*other) }
}
pub use self::refines::Refines;
mod refines {
use crate::progress::Timestamp;
/// Conversion between pointstamp types.
///
/// This trait is central to nested scopes, for which the inner timestamp must be
/// related to the outer timestamp. These methods define those relationships.
///
/// It would be ideal to use Rust's From and Into traits, but they seem to be messed
/// up due to coherence: we can't implement `Into` because it induces a from implementation
/// we can't control.
pub trait Refines<T: Timestamp> : Timestamp {
/// Converts the outer timestamp to an inner timestamp.
fn to_inner(other: T) -> Self;
/// Converts the inner timestamp to an outer timestamp.
fn to_outer(self) -> T;
/// Summarizes an inner path summary as an outer path summary.
///
/// It is crucial for correctness that the result of this summarization's `results_in`
/// method is equivalent to `|time| path.results_in(time.to_inner()).to_outer()`, or
/// at least produces times less or equal to that result.
fn summarize(path: <Self as Timestamp>::Summary) -> <T as Timestamp>::Summary;
}
/// All types "refine" themselves,
impl<T: Timestamp> Refines<T> for T {
fn to_inner(other: T) -> T { other }
fn to_outer(self) -> T { self }
fn summarize(path: <T as Timestamp>::Summary) -> <T as Timestamp>::Summary { path }
}
/// Implements `Refines<()>` for most types.
///
/// We have a macro here because a blanket implement would conflict with the "refines self"
/// blanket implementation just above. Waiting on specialization to fix that, I guess.
macro_rules! implement_refines_empty {
($($index_type:ty,)*) => (
$(
impl Refines<()> for $index_type {
fn to_inner(_: ()) -> $index_type { Default::default() }
fn to_outer(self) -> () { }
fn summarize(_: <$index_type as Timestamp>::Summary) -> () { }
}
)*
)
}
implement_refines_empty!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, ::std::time::Duration,);
}