mz_repr/
timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use columnar::Columnar;
15use dec::TryFromDecimalError;
16use mz_proto::{RustType, TryFromProtoError};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26/// System-wide timestamp type.
27#[derive(
28    Clone,
29    // TODO: De-implement Copy, which is widely used.
30    Copy,
31    PartialEq,
32    Eq,
33    PartialOrd,
34    Ord,
35    Hash,
36    Default,
37    Arbitrary,
38    Columnar,
39)]
40#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
41pub struct Timestamp {
42    /// note no `pub`.
43    internal: u64,
44}
45
46impl PartialEq<&Timestamp> for Timestamp {
47    fn eq(&self, other: &&Timestamp) -> bool {
48        self.eq(*other)
49    }
50}
51
52impl PartialEq<Timestamp> for &Timestamp {
53    fn eq(&self, other: &Timestamp) -> bool {
54        self.internal.eq(&other.internal)
55    }
56}
57
58impl RustType<ProtoTimestamp> for Timestamp {
59    fn into_proto(&self) -> ProtoTimestamp {
60        ProtoTimestamp {
61            internal: self.into(),
62        }
63    }
64
65    fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
66        Ok(Timestamp::new(proto.internal))
67    }
68}
69
70pub trait TimestampManipulation:
71    timely::progress::Timestamp
72    + timely::order::TotalOrder
73    + differential_dataflow::lattice::Lattice
74    + std::fmt::Debug
75    + mz_persist_types::StepForward
76    + Sync
77{
78    /// Advance a timestamp by the least amount possible such that
79    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
80    fn step_forward(&self) -> Self;
81
82    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
83    fn step_forward_by(&self, amount: &Self) -> Self;
84
85    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
86    fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
87
88    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
89    /// is true. Return `None` if unable to do so.
90    fn try_step_forward(&self) -> Option<Self>;
91
92    /// Retreat a timestamp by the least amount possible such that
93    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
94    /// which must only happen if the timestamp is `Timestamp::minimum()`.
95    fn step_back(&self) -> Option<Self>;
96
97    /// Return the maximum value for this timestamp.
98    fn maximum() -> Self;
99
100    /// Rounds up the timestamp to the time of the next refresh according to the given schedule.
101    /// Returns None if there is no next refresh.
102    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
103
104    /// Rounds down `timestamp - 1` to the time of the previous refresh according to the given
105    /// schedule.
106    /// Returns None if there is no previous refresh.
107    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
108}
109
110impl TimestampManipulation for Timestamp {
111    fn step_forward(&self) -> Self {
112        self.step_forward()
113    }
114
115    fn step_forward_by(&self, amount: &Self) -> Self {
116        self.step_forward_by(amount)
117    }
118
119    fn try_step_forward(&self) -> Option<Self> {
120        self.try_step_forward()
121    }
122
123    fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
124        self.try_step_forward_by(amount)
125    }
126
127    fn step_back(&self) -> Option<Self> {
128        self.step_back()
129    }
130
131    fn maximum() -> Self {
132        Self::MAX
133    }
134
135    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
136        schedule.round_up_timestamp(*self)
137    }
138
139    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
140        schedule.round_down_timestamp_m1(*self)
141    }
142}
143
144impl mz_persist_types::StepForward for Timestamp {
145    fn step_forward(&self) -> Self {
146        self.step_forward()
147    }
148}
149
150impl Timestamp {
151    pub const MAX: Self = Self { internal: u64::MAX };
152    pub const MIN: Self = Self { internal: u64::MIN };
153
154    pub const fn new(timestamp: u64) -> Self {
155        Self {
156            internal: timestamp,
157        }
158    }
159
160    pub fn to_bytes(&self) -> [u8; 8] {
161        self.internal.to_le_bytes()
162    }
163
164    pub fn from_bytes(bytes: [u8; 8]) -> Self {
165        Self {
166            internal: u64::from_le_bytes(bytes),
167        }
168    }
169
170    pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
171        Self {
172            internal: self.internal.saturating_sub(rhs.into().internal),
173        }
174    }
175
176    pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
177        Self {
178            internal: self.internal.saturating_add(rhs.into().internal),
179        }
180    }
181
182    pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
183        Self {
184            internal: self.internal.saturating_mul(rhs.into().internal),
185        }
186    }
187
188    pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
189        self.internal
190            .checked_add(rhs.into().internal)
191            .map(|internal| Self { internal })
192    }
193
194    pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
195        self.internal
196            .checked_sub(rhs.into().internal)
197            .map(|internal| Self { internal })
198    }
199
200    /// Advance a timestamp by the least amount possible such that
201    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
202    pub fn step_forward(&self) -> Self {
203        match self.checked_add(1) {
204            Some(ts) => ts,
205            None => panic!("could not step forward"),
206        }
207    }
208
209    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
210    pub fn step_forward_by(&self, amount: &Self) -> Self {
211        match self.checked_add(*amount) {
212            Some(ts) => ts,
213            None => panic!("could not step {self} forward by {amount}"),
214        }
215    }
216
217    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
218    /// is true. Return `None` if unable to do so.
219    pub fn try_step_forward(&self) -> Option<Self> {
220        self.checked_add(1)
221    }
222
223    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
224    pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
225        self.checked_add(*amount)
226    }
227
228    /// Retreat a timestamp by the least amount possible such that
229    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
230    /// which must only happen if the timestamp is `Timestamp::minimum()`.
231    pub fn step_back(&self) -> Option<Self> {
232        self.checked_sub(1)
233    }
234}
235
236impl From<u64> for Timestamp {
237    fn from(internal: u64) -> Self {
238        Self { internal }
239    }
240}
241
242impl From<Timestamp> for u64 {
243    fn from(ts: Timestamp) -> Self {
244        ts.internal
245    }
246}
247
248impl From<Timestamp> for u128 {
249    fn from(ts: Timestamp) -> Self {
250        u128::from(ts.internal)
251    }
252}
253
254impl TryFrom<Timestamp> for i64 {
255    type Error = TryFromIntError;
256
257    fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
258        value.internal.try_into()
259    }
260}
261
262impl From<&Timestamp> for u64 {
263    fn from(ts: &Timestamp) -> Self {
264        ts.internal
265    }
266}
267
268impl From<Timestamp> for Numeric {
269    fn from(ts: Timestamp) -> Self {
270        ts.internal.into()
271    }
272}
273
274impl From<Timestamp> for Duration {
275    fn from(ts: Timestamp) -> Self {
276        Duration::from_millis(ts.internal)
277    }
278}
279
280impl std::ops::Rem<Timestamp> for Timestamp {
281    type Output = Timestamp;
282
283    fn rem(self, rhs: Timestamp) -> Self::Output {
284        Self {
285            internal: self.internal % rhs.internal,
286        }
287    }
288}
289
290impl Serialize for Timestamp {
291    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
292    where
293        S: Serializer,
294    {
295        self.internal.serialize(serializer)
296    }
297}
298
299impl<'de> Deserialize<'de> for Timestamp {
300    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
301    where
302        D: serde::Deserializer<'de>,
303    {
304        Ok(Self {
305            internal: u64::deserialize(deserializer)?,
306        })
307    }
308}
309
310impl timely::order::PartialOrder for Timestamp {
311    fn less_equal(&self, other: &Self) -> bool {
312        self.internal.less_equal(&other.internal)
313    }
314}
315
316impl timely::order::PartialOrder<&Timestamp> for Timestamp {
317    fn less_equal(&self, other: &&Self) -> bool {
318        self.internal.less_equal(&other.internal)
319    }
320}
321
322impl timely::order::PartialOrder<Timestamp> for &Timestamp {
323    fn less_equal(&self, other: &Timestamp) -> bool {
324        self.internal.less_equal(&other.internal)
325    }
326}
327
328impl timely::order::TotalOrder for Timestamp {}
329
330impl timely::progress::Timestamp for Timestamp {
331    type Summary = Timestamp;
332
333    fn minimum() -> Self {
334        Self::MIN
335    }
336}
337
338impl timely::progress::PathSummary<Timestamp> for Timestamp {
339    #[inline]
340    fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
341        self.internal
342            .checked_add(src.internal)
343            .map(|internal| Self { internal })
344    }
345    #[inline]
346    fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
347        self.internal
348            .checked_add(other.internal)
349            .map(|internal| Self { internal })
350    }
351}
352
353impl timely::progress::timestamp::Refines<()> for Timestamp {
354    fn to_inner(_: ()) -> Timestamp {
355        Default::default()
356    }
357    fn to_outer(self) -> () {
358        ()
359    }
360    fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
361        ()
362    }
363}
364
365impl differential_dataflow::lattice::Lattice for Timestamp {
366    #[inline]
367    fn join(&self, other: &Self) -> Self {
368        ::std::cmp::max(*self, *other)
369    }
370    #[inline]
371    fn meet(&self, other: &Self) -> Self {
372        ::std::cmp::min(*self, *other)
373    }
374}
375
376impl mz_persist_types::Codec64 for Timestamp {
377    fn codec_name() -> String {
378        u64::codec_name()
379    }
380
381    fn encode(&self) -> [u8; 8] {
382        self.internal.encode()
383    }
384
385    fn decode(buf: [u8; 8]) -> Self {
386        Self {
387            internal: u64::decode(buf),
388        }
389    }
390}
391
392impl std::fmt::Display for Timestamp {
393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394        std::fmt::Display::fmt(&self.internal, f)
395    }
396}
397
398impl std::fmt::Debug for Timestamp {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        std::fmt::Debug::fmt(&self.internal, f)
401    }
402}
403
404impl std::str::FromStr for Timestamp {
405    type Err = String;
406
407    fn from_str(s: &str) -> Result<Self, Self::Err> {
408        Ok(Self {
409            internal: s
410                .parse::<u64>()
411                .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
412                .or_else(|err_num_of_millis| {
413                    parse_timestamptz(s)
414                        .map_err(|parse_error| {
415                            format!(
416                                "{}; could not parse as date and time: {}",
417                                err_num_of_millis, parse_error
418                            )
419                        })?
420                        .timestamp_millis()
421                        .try_into()
422                        .map_err(|_| "out of range for mz_timestamp".to_string())
423                })
424                .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
425        })
426    }
427}
428
429impl TryFrom<Duration> for Timestamp {
430    type Error = TryFromIntError;
431
432    fn try_from(value: Duration) -> Result<Self, Self::Error> {
433        Ok(Self {
434            internal: value.as_millis().try_into()?,
435        })
436    }
437}
438
439impl TryFrom<u128> for Timestamp {
440    type Error = TryFromIntError;
441
442    fn try_from(value: u128) -> Result<Self, Self::Error> {
443        Ok(Self {
444            internal: value.try_into()?,
445        })
446    }
447}
448
449impl TryFrom<i64> for Timestamp {
450    type Error = TryFromIntError;
451
452    fn try_from(value: i64) -> Result<Self, Self::Error> {
453        Ok(Self {
454            internal: value.try_into()?,
455        })
456    }
457}
458
459impl TryFrom<Numeric> for Timestamp {
460    type Error = TryFromDecimalError;
461
462    fn try_from(value: Numeric) -> Result<Self, Self::Error> {
463        Ok(Self {
464            internal: value.try_into()?,
465        })
466    }
467}
468
469impl columnation::Columnation for Timestamp {
470    type InnerRegion = columnation::CopyRegion<Timestamp>;
471}
472
473mod differential {
474    use differential_dataflow::IntoOwned;
475
476    use crate::Timestamp;
477
478    impl<'a> IntoOwned<'a> for Timestamp {
479        type Owned = Self;
480
481        fn into_owned(self) -> Self::Owned {
482            self
483        }
484
485        fn clone_onto(self, other: &mut Self::Owned) {
486            *other = self;
487        }
488
489        fn borrow_as(owned: &'a Self::Owned) -> Self {
490            *owned
491        }
492    }
493}