mz_repr/
timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26/// System-wide timestamp type.
27#[derive(
28    Clone,
29    // TODO: De-implement Copy, which is widely used.
30    Copy,
31    PartialEq,
32    Eq,
33    PartialOrd,
34    Ord,
35    Hash,
36    Default,
37    Arbitrary,
38    bytemuck::AnyBitPattern,
39    bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43    /// note no `pub`.
44    internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48    fn eq(&self, other: &&Timestamp) -> bool {
49        self.eq(*other)
50    }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54    fn eq(&self, other: &Timestamp) -> bool {
55        self.internal.eq(&other.internal)
56    }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60    fn into_proto(&self) -> ProtoTimestamp {
61        ProtoTimestamp {
62            internal: self.into(),
63        }
64    }
65
66    fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67        Ok(Timestamp::new(proto.internal))
68    }
69}
70
71mod columnar_timestamp {
72    use crate::Timestamp;
73    use columnar::Columnar;
74    use mz_ore::cast::CastFrom;
75
76    /// A newtype wrapper for a vector of `Timestamp` values.
77    #[derive(Clone, Copy, Default, Debug)]
78    pub struct Timestamps<T>(T);
79    impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
80        #[inline(always)]
81        fn push(&mut self, item: D) {
82            self.0.push(item)
83        }
84    }
85    impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
86        #[inline(always)]
87        fn clear(&mut self) {
88            self.0.clear()
89        }
90    }
91    impl<T: columnar::Len> columnar::Len for Timestamps<T> {
92        #[inline(always)]
93        fn len(&self) -> usize {
94            self.0.len()
95        }
96    }
97    impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
98        type Ref = Timestamp;
99
100        #[inline(always)]
101        fn get(&self, index: usize) -> Self::Ref {
102            self.0[index]
103        }
104    }
105
106    impl Columnar for Timestamp {
107        #[inline(always)]
108        fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
109            other
110        }
111        type Container = Timestamps<Vec<Timestamp>>;
112        #[inline(always)]
113        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
114        where
115            Self: 'a,
116        {
117            thing
118        }
119    }
120
121    impl columnar::Container for Timestamps<Vec<Timestamp>> {
122        type Ref<'a> = Timestamp;
123        type Borrowed<'a>
124            = Timestamps<&'a [Timestamp]>
125        where
126            Self: 'a;
127        #[inline(always)]
128        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
129            Timestamps(self.0.as_slice())
130        }
131        #[inline(always)]
132        fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
133        where
134            Self: 'a,
135        {
136            Timestamps(item.0)
137        }
138
139        #[inline(always)]
140        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
141        where
142            Self: 'a,
143        {
144            item
145        }
146
147        #[inline(always)]
148        fn reserve_for<'a, I>(&mut self, selves: I)
149        where
150            Self: 'a,
151            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
152        {
153            self.0.reserve_for(selves.map(|s| s.0));
154        }
155    }
156
157    impl columnar::HeapSize for Timestamp {}
158
159    impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
160        #[inline(always)]
161        fn heap_size(&self) -> (usize, usize) {
162            self.0.heap_size()
163        }
164    }
165
166    impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
167        #[inline(always)]
168        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
169            std::iter::once((
170                u64::cast_from(align_of::<Timestamp>()),
171                bytemuck::cast_slice(self.0),
172            ))
173        }
174    }
175    impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
176        #[inline(always)]
177        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
178            Timestamps(bytemuck::cast_slice(
179                bytes.next().expect("Iterator exhausted prematurely"),
180            ))
181        }
182    }
183}
184
185impl BucketTimestamp for Timestamp {
186    fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
187        let rhs = 1_u64.checked_shl(exponent)?;
188        Some(self.internal.checked_add(rhs)?.into())
189    }
190}
191
192pub trait TimestampManipulation:
193    timely::progress::Timestamp
194    + timely::order::TotalOrder
195    + differential_dataflow::lattice::Lattice
196    + std::fmt::Debug
197    + mz_persist_types::StepForward
198    + Sync
199{
200    /// Advance a timestamp by the least amount possible such that
201    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
202    fn step_forward(&self) -> Self;
203
204    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
205    fn step_forward_by(&self, amount: &Self) -> Self;
206
207    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
208    fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
209
210    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
211    /// is true. Return `None` if unable to do so.
212    fn try_step_forward(&self) -> Option<Self>;
213
214    /// Retreat a timestamp by the least amount possible such that
215    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
216    /// which must only happen if the timestamp is `Timestamp::minimum()`.
217    fn step_back(&self) -> Option<Self>;
218
219    /// Return the maximum value for this timestamp.
220    fn maximum() -> Self;
221
222    /// Rounds up the timestamp to the time of the next refresh according to the given schedule.
223    /// Returns None if there is no next refresh.
224    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
225
226    /// Rounds down `timestamp - 1` to the time of the previous refresh according to the given
227    /// schedule.
228    /// Returns None if there is no previous refresh.
229    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
230}
231
232impl TimestampManipulation for Timestamp {
233    fn step_forward(&self) -> Self {
234        self.step_forward()
235    }
236
237    fn step_forward_by(&self, amount: &Self) -> Self {
238        self.step_forward_by(amount)
239    }
240
241    fn try_step_forward(&self) -> Option<Self> {
242        self.try_step_forward()
243    }
244
245    fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
246        self.try_step_forward_by(amount)
247    }
248
249    fn step_back(&self) -> Option<Self> {
250        self.step_back()
251    }
252
253    fn maximum() -> Self {
254        Self::MAX
255    }
256
257    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
258        schedule.round_up_timestamp(*self)
259    }
260
261    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
262        schedule.round_down_timestamp_m1(*self)
263    }
264}
265
266impl mz_persist_types::StepForward for Timestamp {
267    fn step_forward(&self) -> Self {
268        self.step_forward()
269    }
270}
271
272impl Timestamp {
273    pub const MAX: Self = Self { internal: u64::MAX };
274    pub const MIN: Self = Self { internal: u64::MIN };
275
276    pub const fn new(timestamp: u64) -> Self {
277        Self {
278            internal: timestamp,
279        }
280    }
281
282    pub fn to_bytes(&self) -> [u8; 8] {
283        self.internal.to_le_bytes()
284    }
285
286    pub fn from_bytes(bytes: [u8; 8]) -> Self {
287        Self {
288            internal: u64::from_le_bytes(bytes),
289        }
290    }
291
292    pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
293        Self {
294            internal: self.internal.saturating_sub(rhs.into().internal),
295        }
296    }
297
298    pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
299        Self {
300            internal: self.internal.saturating_add(rhs.into().internal),
301        }
302    }
303
304    pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
305        Self {
306            internal: self.internal.saturating_mul(rhs.into().internal),
307        }
308    }
309
310    pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
311        self.internal
312            .checked_add(rhs.into().internal)
313            .map(|internal| Self { internal })
314    }
315
316    pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
317        self.internal
318            .checked_sub(rhs.into().internal)
319            .map(|internal| Self { internal })
320    }
321
322    /// Advance a timestamp by the least amount possible such that
323    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
324    pub fn step_forward(&self) -> Self {
325        match self.checked_add(1) {
326            Some(ts) => ts,
327            None => panic!("could not step forward"),
328        }
329    }
330
331    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
332    pub fn step_forward_by(&self, amount: &Self) -> Self {
333        match self.checked_add(*amount) {
334            Some(ts) => ts,
335            None => panic!("could not step {self} forward by {amount}"),
336        }
337    }
338
339    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
340    /// is true. Return `None` if unable to do so.
341    pub fn try_step_forward(&self) -> Option<Self> {
342        self.checked_add(1)
343    }
344
345    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
346    pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
347        self.checked_add(*amount)
348    }
349
350    /// Retreat a timestamp by the least amount possible such that
351    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
352    /// which must only happen if the timestamp is `Timestamp::minimum()`.
353    pub fn step_back(&self) -> Option<Self> {
354        self.checked_sub(1)
355    }
356}
357
358impl From<u64> for Timestamp {
359    fn from(internal: u64) -> Self {
360        Self { internal }
361    }
362}
363
364impl From<Timestamp> for u64 {
365    fn from(ts: Timestamp) -> Self {
366        ts.internal
367    }
368}
369
370impl From<Timestamp> for u128 {
371    fn from(ts: Timestamp) -> Self {
372        u128::from(ts.internal)
373    }
374}
375
376impl TryFrom<Timestamp> for i64 {
377    type Error = TryFromIntError;
378
379    fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
380        value.internal.try_into()
381    }
382}
383
384impl From<&Timestamp> for u64 {
385    fn from(ts: &Timestamp) -> Self {
386        ts.internal
387    }
388}
389
390impl From<Timestamp> for Numeric {
391    fn from(ts: Timestamp) -> Self {
392        ts.internal.into()
393    }
394}
395
396impl From<Timestamp> for Duration {
397    fn from(ts: Timestamp) -> Self {
398        Duration::from_millis(ts.internal)
399    }
400}
401
402impl std::ops::Rem<Timestamp> for Timestamp {
403    type Output = Timestamp;
404
405    fn rem(self, rhs: Timestamp) -> Self::Output {
406        Self {
407            internal: self.internal % rhs.internal,
408        }
409    }
410}
411
412impl Serialize for Timestamp {
413    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
414    where
415        S: Serializer,
416    {
417        self.internal.serialize(serializer)
418    }
419}
420
421impl<'de> Deserialize<'de> for Timestamp {
422    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423    where
424        D: serde::Deserializer<'de>,
425    {
426        Ok(Self {
427            internal: u64::deserialize(deserializer)?,
428        })
429    }
430}
431
432impl timely::order::PartialOrder for Timestamp {
433    fn less_equal(&self, other: &Self) -> bool {
434        self.internal.less_equal(&other.internal)
435    }
436}
437
438impl timely::order::PartialOrder<&Timestamp> for Timestamp {
439    fn less_equal(&self, other: &&Self) -> bool {
440        self.internal.less_equal(&other.internal)
441    }
442}
443
444impl timely::order::PartialOrder<Timestamp> for &Timestamp {
445    fn less_equal(&self, other: &Timestamp) -> bool {
446        self.internal.less_equal(&other.internal)
447    }
448}
449
450impl timely::order::TotalOrder for Timestamp {}
451
452impl timely::progress::Timestamp for Timestamp {
453    type Summary = Timestamp;
454
455    fn minimum() -> Self {
456        Self::MIN
457    }
458}
459
460impl timely::progress::PathSummary<Timestamp> for Timestamp {
461    #[inline]
462    fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
463        self.internal
464            .checked_add(src.internal)
465            .map(|internal| Self { internal })
466    }
467    #[inline]
468    fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
469        self.internal
470            .checked_add(other.internal)
471            .map(|internal| Self { internal })
472    }
473}
474
475impl timely::progress::timestamp::Refines<()> for Timestamp {
476    fn to_inner(_: ()) -> Timestamp {
477        Default::default()
478    }
479    fn to_outer(self) -> () {
480        ()
481    }
482    fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
483        ()
484    }
485}
486
487impl differential_dataflow::lattice::Lattice for Timestamp {
488    #[inline]
489    fn join(&self, other: &Self) -> Self {
490        ::std::cmp::max(*self, *other)
491    }
492    #[inline]
493    fn meet(&self, other: &Self) -> Self {
494        ::std::cmp::min(*self, *other)
495    }
496}
497
498impl mz_persist_types::Codec64 for Timestamp {
499    fn codec_name() -> String {
500        u64::codec_name()
501    }
502
503    fn encode(&self) -> [u8; 8] {
504        self.internal.encode()
505    }
506
507    fn decode(buf: [u8; 8]) -> Self {
508        Self {
509            internal: u64::decode(buf),
510        }
511    }
512}
513
514impl std::fmt::Display for Timestamp {
515    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516        std::fmt::Display::fmt(&self.internal, f)
517    }
518}
519
520impl std::fmt::Debug for Timestamp {
521    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522        std::fmt::Debug::fmt(&self.internal, f)
523    }
524}
525
526impl std::str::FromStr for Timestamp {
527    type Err = String;
528
529    fn from_str(s: &str) -> Result<Self, Self::Err> {
530        Ok(Self {
531            internal: s
532                .parse::<u64>()
533                .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
534                .or_else(|err_num_of_millis| {
535                    parse_timestamptz(s)
536                        .map_err(|parse_error| {
537                            format!(
538                                "{}; could not parse as date and time: {}",
539                                err_num_of_millis, parse_error
540                            )
541                        })?
542                        .timestamp_millis()
543                        .try_into()
544                        .map_err(|_| "out of range for mz_timestamp".to_string())
545                })
546                .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
547        })
548    }
549}
550
551impl TryFrom<Duration> for Timestamp {
552    type Error = TryFromIntError;
553
554    fn try_from(value: Duration) -> Result<Self, Self::Error> {
555        Ok(Self {
556            internal: value.as_millis().try_into()?,
557        })
558    }
559}
560
561impl TryFrom<u128> for Timestamp {
562    type Error = TryFromIntError;
563
564    fn try_from(value: u128) -> Result<Self, Self::Error> {
565        Ok(Self {
566            internal: value.try_into()?,
567        })
568    }
569}
570
571impl TryFrom<i64> for Timestamp {
572    type Error = TryFromIntError;
573
574    fn try_from(value: i64) -> Result<Self, Self::Error> {
575        Ok(Self {
576            internal: value.try_into()?,
577        })
578    }
579}
580
581impl TryFrom<Numeric> for Timestamp {
582    type Error = TryFromDecimalError;
583
584    fn try_from(value: Numeric) -> Result<Self, Self::Error> {
585        Ok(Self {
586            internal: value.try_into()?,
587        })
588    }
589}
590
591impl columnation::Columnation for Timestamp {
592    type InnerRegion = columnation::CopyRegion<Timestamp>;
593}