mz_repr/
timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26/// System-wide timestamp type.
27#[derive(
28    Clone,
29    // TODO: De-implement Copy, which is widely used.
30    Copy,
31    PartialEq,
32    Eq,
33    PartialOrd,
34    Ord,
35    Hash,
36    Default,
37    Arbitrary,
38    bytemuck::AnyBitPattern,
39    bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43    /// note no `pub`.
44    internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48    fn eq(&self, other: &&Timestamp) -> bool {
49        self.eq(*other)
50    }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54    fn eq(&self, other: &Timestamp) -> bool {
55        self.internal.eq(&other.internal)
56    }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60    fn into_proto(&self) -> ProtoTimestamp {
61        ProtoTimestamp {
62            internal: self.into(),
63        }
64    }
65
66    fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67        Ok(Timestamp::new(proto.internal))
68    }
69}
70
71mod columnar_timestamp {
72    use crate::Timestamp;
73    use columnar::Columnar;
74    use mz_ore::cast::CastFrom;
75
76    /// A newtype wrapper for a vector of `Timestamp` values.
77    #[derive(Clone, Copy, Default, Debug)]
78    pub struct Timestamps<T>(T);
79    impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
80        #[inline(always)]
81        fn push(&mut self, item: D) {
82            self.0.push(item)
83        }
84    }
85    impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
86        #[inline(always)]
87        fn clear(&mut self) {
88            self.0.clear()
89        }
90    }
91    impl<T: columnar::Len> columnar::Len for Timestamps<T> {
92        #[inline(always)]
93        fn len(&self) -> usize {
94            self.0.len()
95        }
96    }
97    impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
98        type Ref = Timestamp;
99
100        #[inline(always)]
101        fn get(&self, index: usize) -> Self::Ref {
102            self.0[index]
103        }
104    }
105
106    impl Columnar for Timestamp {
107        type Ref<'a> = Timestamp;
108        #[inline(always)]
109        fn into_owned<'a>(other: Self::Ref<'a>) -> Self {
110            other
111        }
112        type Container = Timestamps<Vec<Timestamp>>;
113        #[inline(always)]
114        fn reborrow<'b, 'a: 'b>(thing: Self::Ref<'a>) -> Self::Ref<'b>
115        where
116            Self: 'a,
117        {
118            thing
119        }
120    }
121
122    impl columnar::Container<Timestamp> for Timestamps<Vec<Timestamp>> {
123        type Borrowed<'a>
124            = Timestamps<&'a [Timestamp]>
125        where
126            Self: 'a;
127        #[inline(always)]
128        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
129            Timestamps(self.0.as_slice())
130        }
131        #[inline(always)]
132        fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
133        where
134            Self: 'a,
135        {
136            Timestamps(item.0)
137        }
138    }
139
140    impl columnar::HeapSize for Timestamp {}
141
142    impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
143        #[inline(always)]
144        fn heap_size(&self) -> (usize, usize) {
145            self.0.heap_size()
146        }
147    }
148
149    impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
150        #[inline(always)]
151        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
152            std::iter::once((
153                u64::cast_from(align_of::<Timestamp>()),
154                bytemuck::cast_slice(self.0),
155            ))
156        }
157    }
158    impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
159        #[inline(always)]
160        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
161            Timestamps(bytemuck::cast_slice(
162                bytes.next().expect("Iterator exhausted prematurely"),
163            ))
164        }
165    }
166}
167
168impl BucketTimestamp for Timestamp {
169    fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
170        let rhs = 1_u64.checked_shl(exponent)?;
171        Some(self.internal.checked_add(rhs)?.into())
172    }
173}
174
175pub trait TimestampManipulation:
176    timely::progress::Timestamp
177    + timely::order::TotalOrder
178    + differential_dataflow::lattice::Lattice
179    + std::fmt::Debug
180    + mz_persist_types::StepForward
181    + Sync
182{
183    /// Advance a timestamp by the least amount possible such that
184    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
185    fn step_forward(&self) -> Self;
186
187    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
188    fn step_forward_by(&self, amount: &Self) -> Self;
189
190    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
191    fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
192
193    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
194    /// is true. Return `None` if unable to do so.
195    fn try_step_forward(&self) -> Option<Self>;
196
197    /// Retreat a timestamp by the least amount possible such that
198    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
199    /// which must only happen if the timestamp is `Timestamp::minimum()`.
200    fn step_back(&self) -> Option<Self>;
201
202    /// Return the maximum value for this timestamp.
203    fn maximum() -> Self;
204
205    /// Rounds up the timestamp to the time of the next refresh according to the given schedule.
206    /// Returns None if there is no next refresh.
207    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
208
209    /// Rounds down `timestamp - 1` to the time of the previous refresh according to the given
210    /// schedule.
211    /// Returns None if there is no previous refresh.
212    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
213}
214
215impl TimestampManipulation for Timestamp {
216    fn step_forward(&self) -> Self {
217        self.step_forward()
218    }
219
220    fn step_forward_by(&self, amount: &Self) -> Self {
221        self.step_forward_by(amount)
222    }
223
224    fn try_step_forward(&self) -> Option<Self> {
225        self.try_step_forward()
226    }
227
228    fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
229        self.try_step_forward_by(amount)
230    }
231
232    fn step_back(&self) -> Option<Self> {
233        self.step_back()
234    }
235
236    fn maximum() -> Self {
237        Self::MAX
238    }
239
240    fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
241        schedule.round_up_timestamp(*self)
242    }
243
244    fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
245        schedule.round_down_timestamp_m1(*self)
246    }
247}
248
249impl mz_persist_types::StepForward for Timestamp {
250    fn step_forward(&self) -> Self {
251        self.step_forward()
252    }
253}
254
255impl Timestamp {
256    pub const MAX: Self = Self { internal: u64::MAX };
257    pub const MIN: Self = Self { internal: u64::MIN };
258
259    pub const fn new(timestamp: u64) -> Self {
260        Self {
261            internal: timestamp,
262        }
263    }
264
265    pub fn to_bytes(&self) -> [u8; 8] {
266        self.internal.to_le_bytes()
267    }
268
269    pub fn from_bytes(bytes: [u8; 8]) -> Self {
270        Self {
271            internal: u64::from_le_bytes(bytes),
272        }
273    }
274
275    pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
276        Self {
277            internal: self.internal.saturating_sub(rhs.into().internal),
278        }
279    }
280
281    pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
282        Self {
283            internal: self.internal.saturating_add(rhs.into().internal),
284        }
285    }
286
287    pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
288        Self {
289            internal: self.internal.saturating_mul(rhs.into().internal),
290        }
291    }
292
293    pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
294        self.internal
295            .checked_add(rhs.into().internal)
296            .map(|internal| Self { internal })
297    }
298
299    pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
300        self.internal
301            .checked_sub(rhs.into().internal)
302            .map(|internal| Self { internal })
303    }
304
305    /// Advance a timestamp by the least amount possible such that
306    /// `ts.less_than(ts.step_forward())` is true. Panic if unable to do so.
307    pub fn step_forward(&self) -> Self {
308        match self.checked_add(1) {
309            Some(ts) => ts,
310            None => panic!("could not step forward"),
311        }
312    }
313
314    /// Advance a timestamp forward by the given `amount`. Panic if unable to do so.
315    pub fn step_forward_by(&self, amount: &Self) -> Self {
316        match self.checked_add(*amount) {
317            Some(ts) => ts,
318            None => panic!("could not step {self} forward by {amount}"),
319        }
320    }
321
322    /// Advance a timestamp by the least amount possible such that `ts.less_than(ts.step_forward())`
323    /// is true. Return `None` if unable to do so.
324    pub fn try_step_forward(&self) -> Option<Self> {
325        self.checked_add(1)
326    }
327
328    /// Advance a timestamp forward by the given `amount`. Return `None` if unable to do so.
329    pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
330        self.checked_add(*amount)
331    }
332
333    /// Retreat a timestamp by the least amount possible such that
334    /// `ts.step_back().unwrap().less_than(ts)` is true. Return `None` if unable,
335    /// which must only happen if the timestamp is `Timestamp::minimum()`.
336    pub fn step_back(&self) -> Option<Self> {
337        self.checked_sub(1)
338    }
339}
340
341impl From<u64> for Timestamp {
342    fn from(internal: u64) -> Self {
343        Self { internal }
344    }
345}
346
347impl From<Timestamp> for u64 {
348    fn from(ts: Timestamp) -> Self {
349        ts.internal
350    }
351}
352
353impl From<Timestamp> for u128 {
354    fn from(ts: Timestamp) -> Self {
355        u128::from(ts.internal)
356    }
357}
358
359impl TryFrom<Timestamp> for i64 {
360    type Error = TryFromIntError;
361
362    fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
363        value.internal.try_into()
364    }
365}
366
367impl From<&Timestamp> for u64 {
368    fn from(ts: &Timestamp) -> Self {
369        ts.internal
370    }
371}
372
373impl From<Timestamp> for Numeric {
374    fn from(ts: Timestamp) -> Self {
375        ts.internal.into()
376    }
377}
378
379impl From<Timestamp> for Duration {
380    fn from(ts: Timestamp) -> Self {
381        Duration::from_millis(ts.internal)
382    }
383}
384
385impl std::ops::Rem<Timestamp> for Timestamp {
386    type Output = Timestamp;
387
388    fn rem(self, rhs: Timestamp) -> Self::Output {
389        Self {
390            internal: self.internal % rhs.internal,
391        }
392    }
393}
394
395impl Serialize for Timestamp {
396    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
397    where
398        S: Serializer,
399    {
400        self.internal.serialize(serializer)
401    }
402}
403
404impl<'de> Deserialize<'de> for Timestamp {
405    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
406    where
407        D: serde::Deserializer<'de>,
408    {
409        Ok(Self {
410            internal: u64::deserialize(deserializer)?,
411        })
412    }
413}
414
415impl timely::order::PartialOrder for Timestamp {
416    fn less_equal(&self, other: &Self) -> bool {
417        self.internal.less_equal(&other.internal)
418    }
419}
420
421impl timely::order::PartialOrder<&Timestamp> for Timestamp {
422    fn less_equal(&self, other: &&Self) -> bool {
423        self.internal.less_equal(&other.internal)
424    }
425}
426
427impl timely::order::PartialOrder<Timestamp> for &Timestamp {
428    fn less_equal(&self, other: &Timestamp) -> bool {
429        self.internal.less_equal(&other.internal)
430    }
431}
432
433impl timely::order::TotalOrder for Timestamp {}
434
435impl timely::progress::Timestamp for Timestamp {
436    type Summary = Timestamp;
437
438    fn minimum() -> Self {
439        Self::MIN
440    }
441}
442
443impl timely::progress::PathSummary<Timestamp> for Timestamp {
444    #[inline]
445    fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
446        self.internal
447            .checked_add(src.internal)
448            .map(|internal| Self { internal })
449    }
450    #[inline]
451    fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
452        self.internal
453            .checked_add(other.internal)
454            .map(|internal| Self { internal })
455    }
456}
457
458impl timely::progress::timestamp::Refines<()> for Timestamp {
459    fn to_inner(_: ()) -> Timestamp {
460        Default::default()
461    }
462    fn to_outer(self) -> () {
463        ()
464    }
465    fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
466        ()
467    }
468}
469
470impl differential_dataflow::lattice::Lattice for Timestamp {
471    #[inline]
472    fn join(&self, other: &Self) -> Self {
473        ::std::cmp::max(*self, *other)
474    }
475    #[inline]
476    fn meet(&self, other: &Self) -> Self {
477        ::std::cmp::min(*self, *other)
478    }
479}
480
481impl mz_persist_types::Codec64 for Timestamp {
482    fn codec_name() -> String {
483        u64::codec_name()
484    }
485
486    fn encode(&self) -> [u8; 8] {
487        self.internal.encode()
488    }
489
490    fn decode(buf: [u8; 8]) -> Self {
491        Self {
492            internal: u64::decode(buf),
493        }
494    }
495}
496
497impl std::fmt::Display for Timestamp {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        std::fmt::Display::fmt(&self.internal, f)
500    }
501}
502
503impl std::fmt::Debug for Timestamp {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        std::fmt::Debug::fmt(&self.internal, f)
506    }
507}
508
509impl std::str::FromStr for Timestamp {
510    type Err = String;
511
512    fn from_str(s: &str) -> Result<Self, Self::Err> {
513        Ok(Self {
514            internal: s
515                .parse::<u64>()
516                .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
517                .or_else(|err_num_of_millis| {
518                    parse_timestamptz(s)
519                        .map_err(|parse_error| {
520                            format!(
521                                "{}; could not parse as date and time: {}",
522                                err_num_of_millis, parse_error
523                            )
524                        })?
525                        .timestamp_millis()
526                        .try_into()
527                        .map_err(|_| "out of range for mz_timestamp".to_string())
528                })
529                .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
530        })
531    }
532}
533
534impl TryFrom<Duration> for Timestamp {
535    type Error = TryFromIntError;
536
537    fn try_from(value: Duration) -> Result<Self, Self::Error> {
538        Ok(Self {
539            internal: value.as_millis().try_into()?,
540        })
541    }
542}
543
544impl TryFrom<u128> for Timestamp {
545    type Error = TryFromIntError;
546
547    fn try_from(value: u128) -> Result<Self, Self::Error> {
548        Ok(Self {
549            internal: value.try_into()?,
550        })
551    }
552}
553
554impl TryFrom<i64> for Timestamp {
555    type Error = TryFromIntError;
556
557    fn try_from(value: i64) -> Result<Self, Self::Error> {
558        Ok(Self {
559            internal: value.try_into()?,
560        })
561    }
562}
563
564impl TryFrom<Numeric> for Timestamp {
565    type Error = TryFromDecimalError;
566
567    fn try_from(value: Numeric) -> Result<Self, Self::Error> {
568        Ok(Self {
569            internal: value.try_into()?,
570        })
571    }
572}
573
574impl columnation::Columnation for Timestamp {
575    type InnerRegion = columnation::CopyRegion<Timestamp>;
576}
577
578mod differential {
579    use differential_dataflow::IntoOwned;
580
581    use crate::Timestamp;
582
583    impl<'a> IntoOwned<'a> for Timestamp {
584        type Owned = Self;
585
586        fn into_owned(self) -> Self::Owned {
587            self
588        }
589
590        fn clone_onto(self, other: &mut Self::Owned) {
591            *other = self;
592        }
593
594        fn borrow_as(owned: &'a Self::Owned) -> Self {
595            *owned
596        }
597    }
598}