Skip to main content

mz_repr/
timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17#[cfg(any(test, feature = "proptest"))]
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize, Serializer};
20
21use crate::adt::numeric::Numeric;
22use crate::refresh_schedule::RefreshSchedule;
23use crate::strconv::parse_timestamptz;
24
25include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
26
27/// System-wide timestamp type.
28#[derive(
29    Clone,
30    // TODO: De-implement Copy, which is widely used.
31    Copy,
32    PartialEq,
33    Eq,
34    PartialOrd,
35    Ord,
36    Hash,
37    Default,
38    bytemuck::AnyBitPattern,
39    bytemuck::NoUninit,
40)]
41#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
42#[repr(transparent)]
43pub struct Timestamp {
44    /// note no `pub`.
45    internal: u64,
46}
47
48impl PartialEq<&Timestamp> for Timestamp {
49    fn eq(&self, other: &&Timestamp) -> bool {
50        self.eq(*other)
51    }
52}
53
54impl PartialEq<Timestamp> for &Timestamp {
55    fn eq(&self, other: &Timestamp) -> bool {
56        self.internal.eq(&other.internal)
57    }
58}
59
60impl RustType<ProtoTimestamp> for Timestamp {
61    fn into_proto(&self) -> ProtoTimestamp {
62        ProtoTimestamp {
63            internal: self.into(),
64        }
65    }
66
67    fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
68        Ok(Timestamp::new(proto.internal))
69    }
70}
71
72mod columnar_timestamp {
73    use crate::Timestamp;
74    use columnar::Columnar;
75    use mz_ore::cast::CastFrom;
76    use std::ops::Range;
77
78    /// A newtype wrapper for a vector of `Timestamp` values.
79    #[derive(Clone, Copy, Default, Debug)]
80    pub struct Timestamps<T>(T);
81    impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
82        #[inline(always)]
83        fn push(&mut self, item: D) {
84            self.0.push(item)
85        }
86    }
87    impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
88        #[inline(always)]
89        fn clear(&mut self) {
90            self.0.clear()
91        }
92    }
93    impl<T: columnar::Len> columnar::Len for Timestamps<T> {
94        #[inline(always)]
95        fn len(&self) -> usize {
96            self.0.len()
97        }
98    }
99    impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
100        type Ref = Timestamp;
101
102        #[inline(always)]
103        fn get(&self, index: usize) -> Self::Ref {
104            self.0[index]
105        }
106    }
107
108    impl Columnar for Timestamp {
109        #[inline(always)]
110        fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
111            other
112        }
113        type Container = Timestamps<Vec<Timestamp>>;
114        #[inline(always)]
115        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
116        where
117            Self: 'a,
118        {
119            thing
120        }
121    }
122
123    impl columnar::Borrow for Timestamps<Vec<Timestamp>> {
124        type Ref<'a> = Timestamp;
125        type Borrowed<'a>
126            = Timestamps<&'a [Timestamp]>
127        where
128            Self: 'a;
129        #[inline(always)]
130        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
131            Timestamps(self.0.as_slice())
132        }
133        #[inline(always)]
134        fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
135        where
136            Self: 'a,
137        {
138            Timestamps(item.0)
139        }
140
141        #[inline(always)]
142        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
143        where
144            Self: 'a,
145        {
146            item
147        }
148    }
149
150    impl columnar::Container for Timestamps<Vec<Timestamp>> {
151        #[inline(always)]
152        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
153            self.0.extend_from_self(other.0, range)
154        }
155        #[inline(always)]
156        fn reserve_for<'a, I>(&mut self, selves: I)
157        where
158            Self: 'a,
159            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
160        {
161            self.0.reserve_for(selves.map(|s| s.0));
162        }
163    }
164
165    impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
166        const SLICE_COUNT: usize = 1;
167        #[inline(always)]
168        fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
169            debug_assert!(index < Self::SLICE_COUNT);
170            (
171                u64::cast_from(align_of::<Timestamp>()),
172                bytemuck::cast_slice(self.0),
173            )
174        }
175        #[inline(always)]
176        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
177            std::iter::once((
178                u64::cast_from(align_of::<Timestamp>()),
179                bytemuck::cast_slice(self.0),
180            ))
181        }
182    }
183    impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
184        const SLICE_COUNT: usize = 1;
185        #[inline(always)]
186        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
187            Timestamps(bytemuck::cast_slice(
188                bytes.next().expect("Iterator exhausted prematurely"),
189            ))
190        }
191    }
192}
193
194impl BucketTimestamp for Timestamp {
195    fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
196        let rhs = 1_u64.checked_shl(exponent)?;
197        Some(self.internal.checked_add(rhs)?.into())
198    }
199}
200
201pub trait TimestampManipulation:
202    timely::progress::Timestamp
203    + timely::order::TotalOrder
204    + differential_dataflow::lattice::Lattice
205    + std::fmt::Debug
206    + mz_persist_types::StepForward
207    + Sync
208{
209    /// Advance a timestamp by the least amount possible such that
210    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
211    fn step_forward(&self) -> Self;
212
213    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
214    fn step_forward_by(&self, amount: &Self) -> Self;
215
216    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
217    fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
218
219    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
220    /// is true. Return `None` if unable to do so.
221    fn try_step_forward(&self) -> Option<Self>;
222
223    /// Retreat a timestamp by the least amount possible such that
224    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
225    /// which must only happen if the timestamp is `Timestamp::minimum()`.
226    fn step_back(&self) -> Option<Self>;
227
228    /// Return the maximum value for this timestamp.
229    fn maximum() -> Self;
230
231    /// Rounds up the timestamp to the time of the next refresh according to the given schedule.
232    /// Returns None if there is no next refresh.
233    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
234
235    /// Rounds down `timestamp - 1` to the time of the previous refresh according to the given
236    /// schedule.
237    /// Returns None if there is no previous refresh.
238    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
239}
240
241impl TimestampManipulation for Timestamp {
242    fn step_forward(&self) -> Self {
243        self.step_forward()
244    }
245
246    fn step_forward_by(&self, amount: &Self) -> Self {
247        self.step_forward_by(amount)
248    }
249
250    fn try_step_forward(&self) -> Option<Self> {
251        self.try_step_forward()
252    }
253
254    fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
255        self.try_step_forward_by(amount)
256    }
257
258    fn step_back(&self) -> Option<Self> {
259        self.step_back()
260    }
261
262    fn maximum() -> Self {
263        Self::MAX
264    }
265
266    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
267        schedule.round_up_timestamp(*self)
268    }
269
270    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
271        schedule.round_down_timestamp_m1(*self)
272    }
273}
274
275impl mz_persist_types::StepForward for Timestamp {
276    fn step_forward(&self) -> Self {
277        self.step_forward()
278    }
279}
280
281impl Timestamp {
282    pub const MAX: Self = Self { internal: u64::MAX };
283    pub const MIN: Self = Self { internal: u64::MIN };
284
285    pub const fn new(timestamp: u64) -> Self {
286        Self {
287            internal: timestamp,
288        }
289    }
290
291    pub fn to_bytes(&self) -> [u8; 8] {
292        self.internal.to_le_bytes()
293    }
294
295    pub fn from_bytes(bytes: [u8; 8]) -> Self {
296        Self {
297            internal: u64::from_le_bytes(bytes),
298        }
299    }
300
301    pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
302        Self {
303            internal: self.internal.saturating_sub(rhs.into().internal),
304        }
305    }
306
307    pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
308        Self {
309            internal: self.internal.saturating_add(rhs.into().internal),
310        }
311    }
312
313    pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
314        Self {
315            internal: self.internal.saturating_mul(rhs.into().internal),
316        }
317    }
318
319    pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
320        self.internal
321            .checked_add(rhs.into().internal)
322            .map(|internal| Self { internal })
323    }
324
325    pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
326        self.internal
327            .checked_sub(rhs.into().internal)
328            .map(|internal| Self { internal })
329    }
330
331    /// Advance a timestamp by the least amount possible such that
332    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
333    pub fn step_forward(&self) -> Self {
334        match self.checked_add(1) {
335            Some(ts) => ts,
336            None => panic!("could not step forward"),
337        }
338    }
339
340    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
341    pub fn step_forward_by(&self, amount: &Self) -> Self {
342        match self.checked_add(*amount) {
343            Some(ts) => ts,
344            None => panic!("could not step {self} forward by {amount}"),
345        }
346    }
347
348    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
349    /// is true. Return `None` if unable to do so.
350    pub fn try_step_forward(&self) -> Option<Self> {
351        self.checked_add(1)
352    }
353
354    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
355    pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
356        self.checked_add(*amount)
357    }
358
359    /// Retreat a timestamp by the least amount possible such that
360    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
361    /// which must only happen if the timestamp is `Timestamp::minimum()`.
362    pub fn step_back(&self) -> Option<Self> {
363        self.checked_sub(1)
364    }
365}
366
367impl From<u64> for Timestamp {
368    fn from(internal: u64) -> Self {
369        Self { internal }
370    }
371}
372
373impl From<Timestamp> for u64 {
374    fn from(ts: Timestamp) -> Self {
375        ts.internal
376    }
377}
378
379impl From<Timestamp> for u128 {
380    fn from(ts: Timestamp) -> Self {
381        u128::from(ts.internal)
382    }
383}
384
385impl TryFrom<Timestamp> for i64 {
386    type Error = TryFromIntError;
387
388    fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
389        value.internal.try_into()
390    }
391}
392
393impl From<&Timestamp> for u64 {
394    fn from(ts: &Timestamp) -> Self {
395        ts.internal
396    }
397}
398
399impl From<Timestamp> for Numeric {
400    fn from(ts: Timestamp) -> Self {
401        ts.internal.into()
402    }
403}
404
405impl From<Timestamp> for Duration {
406    fn from(ts: Timestamp) -> Self {
407        Duration::from_millis(ts.internal)
408    }
409}
410
411impl std::ops::Rem<Timestamp> for Timestamp {
412    type Output = Timestamp;
413
414    fn rem(self, rhs: Timestamp) -> Self::Output {
415        Self {
416            internal: self.internal % rhs.internal,
417        }
418    }
419}
420
421impl Serialize for Timestamp {
422    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
423    where
424        S: Serializer,
425    {
426        self.internal.serialize(serializer)
427    }
428}
429
430impl<'de> Deserialize<'de> for Timestamp {
431    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
432    where
433        D: serde::Deserializer<'de>,
434    {
435        Ok(Self {
436            internal: u64::deserialize(deserializer)?,
437        })
438    }
439}
440
441impl timely::order::PartialOrder for Timestamp {
442    fn less_equal(&self, other: &Self) -> bool {
443        self.internal.less_equal(&other.internal)
444    }
445}
446
447impl timely::order::PartialOrder<&Timestamp> for Timestamp {
448    fn less_equal(&self, other: &&Self) -> bool {
449        self.internal.less_equal(&other.internal)
450    }
451}
452
453impl timely::order::PartialOrder<Timestamp> for &Timestamp {
454    fn less_equal(&self, other: &Timestamp) -> bool {
455        self.internal.less_equal(&other.internal)
456    }
457}
458
459impl timely::order::TotalOrder for Timestamp {}
460
461impl timely::progress::Timestamp for Timestamp {
462    type Summary = Timestamp;
463
464    fn minimum() -> Self {
465        Self::MIN
466    }
467}
468
469impl timely::progress::PathSummary<Timestamp> for Timestamp {
470    #[inline]
471    fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
472        self.internal
473            .checked_add(src.internal)
474            .map(|internal| Self { internal })
475    }
476    #[inline]
477    fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
478        self.internal
479            .checked_add(other.internal)
480            .map(|internal| Self { internal })
481    }
482}
483
484impl timely::progress::timestamp::Refines<()> for Timestamp {
485    fn to_inner(_: ()) -> Timestamp {
486        Default::default()
487    }
488    fn to_outer(self) -> () {
489        ()
490    }
491    fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
492        ()
493    }
494}
495
496impl differential_dataflow::lattice::Lattice for Timestamp {
497    #[inline]
498    fn join(&self, other: &Self) -> Self {
499        ::std::cmp::max(*self, *other)
500    }
501    #[inline]
502    fn meet(&self, other: &Self) -> Self {
503        ::std::cmp::min(*self, *other)
504    }
505}
506
507impl mz_persist_types::Codec64 for Timestamp {
508    fn codec_name() -> String {
509        u64::codec_name()
510    }
511
512    fn encode(&self) -> [u8; 8] {
513        self.internal.encode()
514    }
515
516    fn decode(buf: [u8; 8]) -> Self {
517        Self {
518            internal: u64::decode(buf),
519        }
520    }
521}
522
523impl std::fmt::Display for Timestamp {
524    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525        std::fmt::Display::fmt(&self.internal, f)
526    }
527}
528
529impl std::fmt::Debug for Timestamp {
530    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531        std::fmt::Debug::fmt(&self.internal, f)
532    }
533}
534
535impl std::str::FromStr for Timestamp {
536    type Err = String;
537
538    fn from_str(s: &str) -> Result<Self, Self::Err> {
539        Ok(Self {
540            internal: s
541                .parse::<u64>()
542                .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
543                .or_else(|err_num_of_millis| {
544                    parse_timestamptz(s)
545                        .map_err(|parse_error| {
546                            format!(
547                                "{}; could not parse as date and time: {}",
548                                err_num_of_millis, parse_error
549                            )
550                        })?
551                        .timestamp_millis()
552                        .try_into()
553                        .map_err(|_| "out of range for mz_timestamp".to_string())
554                })
555                .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
556        })
557    }
558}
559
560impl TryFrom<Duration> for Timestamp {
561    type Error = TryFromIntError;
562
563    fn try_from(value: Duration) -> Result<Self, Self::Error> {
564        Ok(Self {
565            internal: value.as_millis().try_into()?,
566        })
567    }
568}
569
570impl TryFrom<u128> for Timestamp {
571    type Error = TryFromIntError;
572
573    fn try_from(value: u128) -> Result<Self, Self::Error> {
574        Ok(Self {
575            internal: value.try_into()?,
576        })
577    }
578}
579
580impl TryFrom<i64> for Timestamp {
581    type Error = TryFromIntError;
582
583    fn try_from(value: i64) -> Result<Self, Self::Error> {
584        Ok(Self {
585            internal: value.try_into()?,
586        })
587    }
588}
589
590impl TryFrom<Numeric> for Timestamp {
591    type Error = TryFromDecimalError;
592
593    fn try_from(value: Numeric) -> Result<Self, Self::Error> {
594        Ok(Self {
595            internal: value.try_into()?,
596        })
597    }
598}
599
600impl columnation::Columnation for Timestamp {
601    type InnerRegion = columnation::CopyRegion<Timestamp>;
602}