mz_repr/
timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26/// System-wide timestamp type.
27#[derive(
28    Clone,
29    // TODO: De-implement Copy, which is widely used.
30    Copy,
31    PartialEq,
32    Eq,
33    PartialOrd,
34    Ord,
35    Hash,
36    Default,
37    Arbitrary,
38    bytemuck::AnyBitPattern,
39    bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43    /// note no `pub`.
44    internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48    fn eq(&self, other: &&Timestamp) -> bool {
49        self.eq(*other)
50    }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54    fn eq(&self, other: &Timestamp) -> bool {
55        self.internal.eq(&other.internal)
56    }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60    fn into_proto(&self) -> ProtoTimestamp {
61        ProtoTimestamp {
62            internal: self.into(),
63        }
64    }
65
66    fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67        Ok(Timestamp::new(proto.internal))
68    }
69}
70
71mod columnar_timestamp {
72    use crate::Timestamp;
73    use columnar::Columnar;
74    use mz_ore::cast::CastFrom;
75    use std::ops::Range;
76
77    /// A newtype wrapper for a vector of `Timestamp` values.
78    #[derive(Clone, Copy, Default, Debug)]
79    pub struct Timestamps<T>(T);
80    impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
81        #[inline(always)]
82        fn push(&mut self, item: D) {
83            self.0.push(item)
84        }
85    }
86    impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
87        #[inline(always)]
88        fn clear(&mut self) {
89            self.0.clear()
90        }
91    }
92    impl<T: columnar::Len> columnar::Len for Timestamps<T> {
93        #[inline(always)]
94        fn len(&self) -> usize {
95            self.0.len()
96        }
97    }
98    impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
99        type Ref = Timestamp;
100
101        #[inline(always)]
102        fn get(&self, index: usize) -> Self::Ref {
103            self.0[index]
104        }
105    }
106
107    impl Columnar for Timestamp {
108        #[inline(always)]
109        fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
110            other
111        }
112        type Container = Timestamps<Vec<Timestamp>>;
113        #[inline(always)]
114        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
115        where
116            Self: 'a,
117        {
118            thing
119        }
120    }
121
122    impl columnar::Borrow for Timestamps<Vec<Timestamp>> {
123        type Ref<'a> = Timestamp;
124        type Borrowed<'a>
125            = Timestamps<&'a [Timestamp]>
126        where
127            Self: 'a;
128        #[inline(always)]
129        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
130            Timestamps(self.0.as_slice())
131        }
132        #[inline(always)]
133        fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
134        where
135            Self: 'a,
136        {
137            Timestamps(item.0)
138        }
139
140        #[inline(always)]
141        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
142        where
143            Self: 'a,
144        {
145            item
146        }
147    }
148
149    impl columnar::Container for Timestamps<Vec<Timestamp>> {
150        #[inline(always)]
151        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
152            self.0.extend_from_self(other.0, range)
153        }
154        #[inline(always)]
155        fn reserve_for<'a, I>(&mut self, selves: I)
156        where
157            Self: 'a,
158            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
159        {
160            self.0.reserve_for(selves.map(|s| s.0));
161        }
162    }
163
164    impl columnar::HeapSize for Timestamp {}
165
166    impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
167        #[inline(always)]
168        fn heap_size(&self) -> (usize, usize) {
169            self.0.heap_size()
170        }
171    }
172
173    impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
174        #[inline(always)]
175        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
176            std::iter::once((
177                u64::cast_from(align_of::<Timestamp>()),
178                bytemuck::cast_slice(self.0),
179            ))
180        }
181    }
182    impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
183        #[inline(always)]
184        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
185            Timestamps(bytemuck::cast_slice(
186                bytes.next().expect("Iterator exhausted prematurely"),
187            ))
188        }
189    }
190}
191
192impl BucketTimestamp for Timestamp {
193    fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
194        let rhs = 1_u64.checked_shl(exponent)?;
195        Some(self.internal.checked_add(rhs)?.into())
196    }
197}
198
199pub trait TimestampManipulation:
200    timely::progress::Timestamp
201    + timely::order::TotalOrder
202    + differential_dataflow::lattice::Lattice
203    + std::fmt::Debug
204    + mz_persist_types::StepForward
205    + Sync
206{
207    /// Advance a timestamp by the least amount possible such that
208    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
209    fn step_forward(&self) -> Self;
210
211    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
212    fn step_forward_by(&self, amount: &Self) -> Self;
213
214    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
215    fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
216
217    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
218    /// is true. Return `None` if unable to do so.
219    fn try_step_forward(&self) -> Option<Self>;
220
221    /// Retreat a timestamp by the least amount possible such that
222    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
223    /// which must only happen if the timestamp is `Timestamp::minimum()`.
224    fn step_back(&self) -> Option<Self>;
225
226    /// Return the maximum value for this timestamp.
227    fn maximum() -> Self;
228
229    /// Rounds up the timestamp to the time of the next refresh according to the given schedule.
230    /// Returns None if there is no next refresh.
231    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
232
233    /// Rounds down `timestamp - 1` to the time of the previous refresh according to the given
234    /// schedule.
235    /// Returns None if there is no previous refresh.
236    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
237}
238
239impl TimestampManipulation for Timestamp {
240    fn step_forward(&self) -> Self {
241        self.step_forward()
242    }
243
244    fn step_forward_by(&self, amount: &Self) -> Self {
245        self.step_forward_by(amount)
246    }
247
248    fn try_step_forward(&self) -> Option<Self> {
249        self.try_step_forward()
250    }
251
252    fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
253        self.try_step_forward_by(amount)
254    }
255
256    fn step_back(&self) -> Option<Self> {
257        self.step_back()
258    }
259
260    fn maximum() -> Self {
261        Self::MAX
262    }
263
264    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
265        schedule.round_up_timestamp(*self)
266    }
267
268    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
269        schedule.round_down_timestamp_m1(*self)
270    }
271}
272
273impl mz_persist_types::StepForward for Timestamp {
274    fn step_forward(&self) -> Self {
275        self.step_forward()
276    }
277}
278
279impl Timestamp {
280    pub const MAX: Self = Self { internal: u64::MAX };
281    pub const MIN: Self = Self { internal: u64::MIN };
282
283    pub const fn new(timestamp: u64) -> Self {
284        Self {
285            internal: timestamp,
286        }
287    }
288
289    pub fn to_bytes(&self) -> [u8; 8] {
290        self.internal.to_le_bytes()
291    }
292
293    pub fn from_bytes(bytes: [u8; 8]) -> Self {
294        Self {
295            internal: u64::from_le_bytes(bytes),
296        }
297    }
298
299    pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
300        Self {
301            internal: self.internal.saturating_sub(rhs.into().internal),
302        }
303    }
304
305    pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
306        Self {
307            internal: self.internal.saturating_add(rhs.into().internal),
308        }
309    }
310
311    pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
312        Self {
313            internal: self.internal.saturating_mul(rhs.into().internal),
314        }
315    }
316
317    pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
318        self.internal
319            .checked_add(rhs.into().internal)
320            .map(|internal| Self { internal })
321    }
322
323    pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
324        self.internal
325            .checked_sub(rhs.into().internal)
326            .map(|internal| Self { internal })
327    }
328
329    /// Advance a timestamp by the least amount possible such that
330    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
331    pub fn step_forward(&self) -> Self {
332        match self.checked_add(1) {
333            Some(ts) => ts,
334            None => panic!("could not step forward"),
335        }
336    }
337
338    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
339    pub fn step_forward_by(&self, amount: &Self) -> Self {
340        match self.checked_add(*amount) {
341            Some(ts) => ts,
342            None => panic!("could not step {self} forward by {amount}"),
343        }
344    }
345
346    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
347    /// is true. Return `None` if unable to do so.
348    pub fn try_step_forward(&self) -> Option<Self> {
349        self.checked_add(1)
350    }
351
352    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
353    pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
354        self.checked_add(*amount)
355    }
356
357    /// Retreat a timestamp by the least amount possible such that
358    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
359    /// which must only happen if the timestamp is `Timestamp::minimum()`.
360    pub fn step_back(&self) -> Option<Self> {
361        self.checked_sub(1)
362    }
363}
364
365impl From<u64> for Timestamp {
366    fn from(internal: u64) -> Self {
367        Self { internal }
368    }
369}
370
371impl From<Timestamp> for u64 {
372    fn from(ts: Timestamp) -> Self {
373        ts.internal
374    }
375}
376
377impl From<Timestamp> for u128 {
378    fn from(ts: Timestamp) -> Self {
379        u128::from(ts.internal)
380    }
381}
382
383impl TryFrom<Timestamp> for i64 {
384    type Error = TryFromIntError;
385
386    fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
387        value.internal.try_into()
388    }
389}
390
391impl From<&Timestamp> for u64 {
392    fn from(ts: &Timestamp) -> Self {
393        ts.internal
394    }
395}
396
397impl From<Timestamp> for Numeric {
398    fn from(ts: Timestamp) -> Self {
399        ts.internal.into()
400    }
401}
402
403impl From<Timestamp> for Duration {
404    fn from(ts: Timestamp) -> Self {
405        Duration::from_millis(ts.internal)
406    }
407}
408
409impl std::ops::Rem<Timestamp> for Timestamp {
410    type Output = Timestamp;
411
412    fn rem(self, rhs: Timestamp) -> Self::Output {
413        Self {
414            internal: self.internal % rhs.internal,
415        }
416    }
417}
418
419impl Serialize for Timestamp {
420    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421    where
422        S: Serializer,
423    {
424        self.internal.serialize(serializer)
425    }
426}
427
428impl<'de> Deserialize<'de> for Timestamp {
429    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
430    where
431        D: serde::Deserializer<'de>,
432    {
433        Ok(Self {
434            internal: u64::deserialize(deserializer)?,
435        })
436    }
437}
438
439impl timely::order::PartialOrder for Timestamp {
440    fn less_equal(&self, other: &Self) -> bool {
441        self.internal.less_equal(&other.internal)
442    }
443}
444
445impl timely::order::PartialOrder<&Timestamp> for Timestamp {
446    fn less_equal(&self, other: &&Self) -> bool {
447        self.internal.less_equal(&other.internal)
448    }
449}
450
451impl timely::order::PartialOrder<Timestamp> for &Timestamp {
452    fn less_equal(&self, other: &Timestamp) -> bool {
453        self.internal.less_equal(&other.internal)
454    }
455}
456
457impl timely::order::TotalOrder for Timestamp {}
458
459impl timely::progress::Timestamp for Timestamp {
460    type Summary = Timestamp;
461
462    fn minimum() -> Self {
463        Self::MIN
464    }
465}
466
467impl timely::progress::PathSummary<Timestamp> for Timestamp {
468    #[inline]
469    fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
470        self.internal
471            .checked_add(src.internal)
472            .map(|internal| Self { internal })
473    }
474    #[inline]
475    fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
476        self.internal
477            .checked_add(other.internal)
478            .map(|internal| Self { internal })
479    }
480}
481
482impl timely::progress::timestamp::Refines<()> for Timestamp {
483    fn to_inner(_: ()) -> Timestamp {
484        Default::default()
485    }
486    fn to_outer(self) -> () {
487        ()
488    }
489    fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
490        ()
491    }
492}
493
494impl differential_dataflow::lattice::Lattice for Timestamp {
495    #[inline]
496    fn join(&self, other: &Self) -> Self {
497        ::std::cmp::max(*self, *other)
498    }
499    #[inline]
500    fn meet(&self, other: &Self) -> Self {
501        ::std::cmp::min(*self, *other)
502    }
503}
504
505impl mz_persist_types::Codec64 for Timestamp {
506    fn codec_name() -> String {
507        u64::codec_name()
508    }
509
510    fn encode(&self) -> [u8; 8] {
511        self.internal.encode()
512    }
513
514    fn decode(buf: [u8; 8]) -> Self {
515        Self {
516            internal: u64::decode(buf),
517        }
518    }
519}
520
521impl std::fmt::Display for Timestamp {
522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523        std::fmt::Display::fmt(&self.internal, f)
524    }
525}
526
527impl std::fmt::Debug for Timestamp {
528    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529        std::fmt::Debug::fmt(&self.internal, f)
530    }
531}
532
533impl std::str::FromStr for Timestamp {
534    type Err = String;
535
536    fn from_str(s: &str) -> Result<Self, Self::Err> {
537        Ok(Self {
538            internal: s
539                .parse::<u64>()
540                .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
541                .or_else(|err_num_of_millis| {
542                    parse_timestamptz(s)
543                        .map_err(|parse_error| {
544                            format!(
545                                "{}; could not parse as date and time: {}",
546                                err_num_of_millis, parse_error
547                            )
548                        })?
549                        .timestamp_millis()
550                        .try_into()
551                        .map_err(|_| "out of range for mz_timestamp".to_string())
552                })
553                .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
554        })
555    }
556}
557
558impl TryFrom<Duration> for Timestamp {
559    type Error = TryFromIntError;
560
561    fn try_from(value: Duration) -> Result<Self, Self::Error> {
562        Ok(Self {
563            internal: value.as_millis().try_into()?,
564        })
565    }
566}
567
568impl TryFrom<u128> for Timestamp {
569    type Error = TryFromIntError;
570
571    fn try_from(value: u128) -> Result<Self, Self::Error> {
572        Ok(Self {
573            internal: value.try_into()?,
574        })
575    }
576}
577
578impl TryFrom<i64> for Timestamp {
579    type Error = TryFromIntError;
580
581    fn try_from(value: i64) -> Result<Self, Self::Error> {
582        Ok(Self {
583            internal: value.try_into()?,
584        })
585    }
586}
587
588impl TryFrom<Numeric> for Timestamp {
589    type Error = TryFromDecimalError;
590
591    fn try_from(value: Numeric) -> Result<Self, Self::Error> {
592        Ok(Self {
593            internal: value.try_into()?,
594        })
595    }
596}
597
598impl columnation::Columnation for Timestamp {
599    type InnerRegion = columnation::CopyRegion<Timestamp>;
600}