1use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26#[derive(
28 Clone,
29 Copy,
31 PartialEq,
32 Eq,
33 PartialOrd,
34 Ord,
35 Hash,
36 Default,
37 Arbitrary,
38 bytemuck::AnyBitPattern,
39 bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43 internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48 fn eq(&self, other: &&Timestamp) -> bool {
49 self.eq(*other)
50 }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54 fn eq(&self, other: &Timestamp) -> bool {
55 self.internal.eq(&other.internal)
56 }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60 fn into_proto(&self) -> ProtoTimestamp {
61 ProtoTimestamp {
62 internal: self.into(),
63 }
64 }
65
66 fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67 Ok(Timestamp::new(proto.internal))
68 }
69}
70
71mod columnar_timestamp {
72 use crate::Timestamp;
73 use columnar::Columnar;
74 use mz_ore::cast::CastFrom;
75
76 #[derive(Clone, Copy, Default, Debug)]
78 pub struct Timestamps<T>(T);
79 impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
80 #[inline(always)]
81 fn push(&mut self, item: D) {
82 self.0.push(item)
83 }
84 }
85 impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
86 #[inline(always)]
87 fn clear(&mut self) {
88 self.0.clear()
89 }
90 }
91 impl<T: columnar::Len> columnar::Len for Timestamps<T> {
92 #[inline(always)]
93 fn len(&self) -> usize {
94 self.0.len()
95 }
96 }
97 impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
98 type Ref = Timestamp;
99
100 #[inline(always)]
101 fn get(&self, index: usize) -> Self::Ref {
102 self.0[index]
103 }
104 }
105
106 impl Columnar for Timestamp {
107 type Ref<'a> = Timestamp;
108 #[inline(always)]
109 fn into_owned<'a>(other: Self::Ref<'a>) -> Self {
110 other
111 }
112 type Container = Timestamps<Vec<Timestamp>>;
113 #[inline(always)]
114 fn reborrow<'b, 'a: 'b>(thing: Self::Ref<'a>) -> Self::Ref<'b>
115 where
116 Self: 'a,
117 {
118 thing
119 }
120 }
121
122 impl columnar::Container<Timestamp> for Timestamps<Vec<Timestamp>> {
123 type Borrowed<'a>
124 = Timestamps<&'a [Timestamp]>
125 where
126 Self: 'a;
127 #[inline(always)]
128 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
129 Timestamps(self.0.as_slice())
130 }
131 #[inline(always)]
132 fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
133 where
134 Self: 'a,
135 {
136 Timestamps(item.0)
137 }
138 }
139
140 impl columnar::HeapSize for Timestamp {}
141
142 impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
143 #[inline(always)]
144 fn heap_size(&self) -> (usize, usize) {
145 self.0.heap_size()
146 }
147 }
148
149 impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
150 #[inline(always)]
151 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
152 std::iter::once((
153 u64::cast_from(align_of::<Timestamp>()),
154 bytemuck::cast_slice(self.0),
155 ))
156 }
157 }
158 impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
159 #[inline(always)]
160 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
161 Timestamps(bytemuck::cast_slice(
162 bytes.next().expect("Iterator exhausted prematurely"),
163 ))
164 }
165 }
166}
167
168impl BucketTimestamp for Timestamp {
169 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
170 let rhs = 1_u64.checked_shl(exponent)?;
171 Some(self.internal.checked_add(rhs)?.into())
172 }
173}
174
175pub trait TimestampManipulation:
176 timely::progress::Timestamp
177 + timely::order::TotalOrder
178 + differential_dataflow::lattice::Lattice
179 + std::fmt::Debug
180 + mz_persist_types::StepForward
181 + Sync
182{
183 fn step_forward(&self) -> Self;
186
187 fn step_forward_by(&self, amount: &Self) -> Self;
189
190 fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
192
193 fn try_step_forward(&self) -> Option<Self>;
196
197 fn step_back(&self) -> Option<Self>;
201
202 fn maximum() -> Self;
204
205 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
208
209 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
213}
214
215impl TimestampManipulation for Timestamp {
216 fn step_forward(&self) -> Self {
217 self.step_forward()
218 }
219
220 fn step_forward_by(&self, amount: &Self) -> Self {
221 self.step_forward_by(amount)
222 }
223
224 fn try_step_forward(&self) -> Option<Self> {
225 self.try_step_forward()
226 }
227
228 fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
229 self.try_step_forward_by(amount)
230 }
231
232 fn step_back(&self) -> Option<Self> {
233 self.step_back()
234 }
235
236 fn maximum() -> Self {
237 Self::MAX
238 }
239
240 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
241 schedule.round_up_timestamp(*self)
242 }
243
244 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
245 schedule.round_down_timestamp_m1(*self)
246 }
247}
248
249impl mz_persist_types::StepForward for Timestamp {
250 fn step_forward(&self) -> Self {
251 self.step_forward()
252 }
253}
254
255impl Timestamp {
256 pub const MAX: Self = Self { internal: u64::MAX };
257 pub const MIN: Self = Self { internal: u64::MIN };
258
259 pub const fn new(timestamp: u64) -> Self {
260 Self {
261 internal: timestamp,
262 }
263 }
264
265 pub fn to_bytes(&self) -> [u8; 8] {
266 self.internal.to_le_bytes()
267 }
268
269 pub fn from_bytes(bytes: [u8; 8]) -> Self {
270 Self {
271 internal: u64::from_le_bytes(bytes),
272 }
273 }
274
275 pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
276 Self {
277 internal: self.internal.saturating_sub(rhs.into().internal),
278 }
279 }
280
281 pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
282 Self {
283 internal: self.internal.saturating_add(rhs.into().internal),
284 }
285 }
286
287 pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
288 Self {
289 internal: self.internal.saturating_mul(rhs.into().internal),
290 }
291 }
292
293 pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
294 self.internal
295 .checked_add(rhs.into().internal)
296 .map(|internal| Self { internal })
297 }
298
299 pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
300 self.internal
301 .checked_sub(rhs.into().internal)
302 .map(|internal| Self { internal })
303 }
304
305 pub fn step_forward(&self) -> Self {
308 match self.checked_add(1) {
309 Some(ts) => ts,
310 None => panic!("could not step forward"),
311 }
312 }
313
314 pub fn step_forward_by(&self, amount: &Self) -> Self {
316 match self.checked_add(*amount) {
317 Some(ts) => ts,
318 None => panic!("could not step {self} forward by {amount}"),
319 }
320 }
321
322 pub fn try_step_forward(&self) -> Option<Self> {
325 self.checked_add(1)
326 }
327
328 pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
330 self.checked_add(*amount)
331 }
332
333 pub fn step_back(&self) -> Option<Self> {
337 self.checked_sub(1)
338 }
339}
340
341impl From<u64> for Timestamp {
342 fn from(internal: u64) -> Self {
343 Self { internal }
344 }
345}
346
347impl From<Timestamp> for u64 {
348 fn from(ts: Timestamp) -> Self {
349 ts.internal
350 }
351}
352
353impl From<Timestamp> for u128 {
354 fn from(ts: Timestamp) -> Self {
355 u128::from(ts.internal)
356 }
357}
358
359impl TryFrom<Timestamp> for i64 {
360 type Error = TryFromIntError;
361
362 fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
363 value.internal.try_into()
364 }
365}
366
367impl From<&Timestamp> for u64 {
368 fn from(ts: &Timestamp) -> Self {
369 ts.internal
370 }
371}
372
373impl From<Timestamp> for Numeric {
374 fn from(ts: Timestamp) -> Self {
375 ts.internal.into()
376 }
377}
378
379impl From<Timestamp> for Duration {
380 fn from(ts: Timestamp) -> Self {
381 Duration::from_millis(ts.internal)
382 }
383}
384
385impl std::ops::Rem<Timestamp> for Timestamp {
386 type Output = Timestamp;
387
388 fn rem(self, rhs: Timestamp) -> Self::Output {
389 Self {
390 internal: self.internal % rhs.internal,
391 }
392 }
393}
394
395impl Serialize for Timestamp {
396 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
397 where
398 S: Serializer,
399 {
400 self.internal.serialize(serializer)
401 }
402}
403
404impl<'de> Deserialize<'de> for Timestamp {
405 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
406 where
407 D: serde::Deserializer<'de>,
408 {
409 Ok(Self {
410 internal: u64::deserialize(deserializer)?,
411 })
412 }
413}
414
415impl timely::order::PartialOrder for Timestamp {
416 fn less_equal(&self, other: &Self) -> bool {
417 self.internal.less_equal(&other.internal)
418 }
419}
420
421impl timely::order::PartialOrder<&Timestamp> for Timestamp {
422 fn less_equal(&self, other: &&Self) -> bool {
423 self.internal.less_equal(&other.internal)
424 }
425}
426
427impl timely::order::PartialOrder<Timestamp> for &Timestamp {
428 fn less_equal(&self, other: &Timestamp) -> bool {
429 self.internal.less_equal(&other.internal)
430 }
431}
432
433impl timely::order::TotalOrder for Timestamp {}
434
435impl timely::progress::Timestamp for Timestamp {
436 type Summary = Timestamp;
437
438 fn minimum() -> Self {
439 Self::MIN
440 }
441}
442
443impl timely::progress::PathSummary<Timestamp> for Timestamp {
444 #[inline]
445 fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
446 self.internal
447 .checked_add(src.internal)
448 .map(|internal| Self { internal })
449 }
450 #[inline]
451 fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
452 self.internal
453 .checked_add(other.internal)
454 .map(|internal| Self { internal })
455 }
456}
457
458impl timely::progress::timestamp::Refines<()> for Timestamp {
459 fn to_inner(_: ()) -> Timestamp {
460 Default::default()
461 }
462 fn to_outer(self) -> () {
463 ()
464 }
465 fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
466 ()
467 }
468}
469
470impl differential_dataflow::lattice::Lattice for Timestamp {
471 #[inline]
472 fn join(&self, other: &Self) -> Self {
473 ::std::cmp::max(*self, *other)
474 }
475 #[inline]
476 fn meet(&self, other: &Self) -> Self {
477 ::std::cmp::min(*self, *other)
478 }
479}
480
481impl mz_persist_types::Codec64 for Timestamp {
482 fn codec_name() -> String {
483 u64::codec_name()
484 }
485
486 fn encode(&self) -> [u8; 8] {
487 self.internal.encode()
488 }
489
490 fn decode(buf: [u8; 8]) -> Self {
491 Self {
492 internal: u64::decode(buf),
493 }
494 }
495}
496
497impl std::fmt::Display for Timestamp {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 std::fmt::Display::fmt(&self.internal, f)
500 }
501}
502
503impl std::fmt::Debug for Timestamp {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 std::fmt::Debug::fmt(&self.internal, f)
506 }
507}
508
509impl std::str::FromStr for Timestamp {
510 type Err = String;
511
512 fn from_str(s: &str) -> Result<Self, Self::Err> {
513 Ok(Self {
514 internal: s
515 .parse::<u64>()
516 .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
517 .or_else(|err_num_of_millis| {
518 parse_timestamptz(s)
519 .map_err(|parse_error| {
520 format!(
521 "{}; could not parse as date and time: {}",
522 err_num_of_millis, parse_error
523 )
524 })?
525 .timestamp_millis()
526 .try_into()
527 .map_err(|_| "out of range for mz_timestamp".to_string())
528 })
529 .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
530 })
531 }
532}
533
534impl TryFrom<Duration> for Timestamp {
535 type Error = TryFromIntError;
536
537 fn try_from(value: Duration) -> Result<Self, Self::Error> {
538 Ok(Self {
539 internal: value.as_millis().try_into()?,
540 })
541 }
542}
543
544impl TryFrom<u128> for Timestamp {
545 type Error = TryFromIntError;
546
547 fn try_from(value: u128) -> Result<Self, Self::Error> {
548 Ok(Self {
549 internal: value.try_into()?,
550 })
551 }
552}
553
554impl TryFrom<i64> for Timestamp {
555 type Error = TryFromIntError;
556
557 fn try_from(value: i64) -> Result<Self, Self::Error> {
558 Ok(Self {
559 internal: value.try_into()?,
560 })
561 }
562}
563
564impl TryFrom<Numeric> for Timestamp {
565 type Error = TryFromDecimalError;
566
567 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
568 Ok(Self {
569 internal: value.try_into()?,
570 })
571 }
572}
573
574impl columnation::Columnation for Timestamp {
575 type InnerRegion = columnation::CopyRegion<Timestamp>;
576}
577
578mod differential {
579 use differential_dataflow::IntoOwned;
580
581 use crate::Timestamp;
582
583 impl<'a> IntoOwned<'a> for Timestamp {
584 type Owned = Self;
585
586 fn into_owned(self) -> Self::Owned {
587 self
588 }
589
590 fn clone_onto(self, other: &mut Self::Owned) {
591 *other = self;
592 }
593
594 fn borrow_as(owned: &'a Self::Owned) -> Self {
595 *owned
596 }
597 }
598}