1use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26#[derive(
28 Clone,
29 Copy,
31 PartialEq,
32 Eq,
33 PartialOrd,
34 Ord,
35 Hash,
36 Default,
37 Arbitrary,
38 bytemuck::AnyBitPattern,
39 bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43 internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48 fn eq(&self, other: &&Timestamp) -> bool {
49 self.eq(*other)
50 }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54 fn eq(&self, other: &Timestamp) -> bool {
55 self.internal.eq(&other.internal)
56 }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60 fn into_proto(&self) -> ProtoTimestamp {
61 ProtoTimestamp {
62 internal: self.into(),
63 }
64 }
65
66 fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67 Ok(Timestamp::new(proto.internal))
68 }
69}
70
71mod columnar_timestamp {
72 use crate::Timestamp;
73 use columnar::Columnar;
74 use mz_ore::cast::CastFrom;
75 use std::ops::Range;
76
77 #[derive(Clone, Copy, Default, Debug)]
79 pub struct Timestamps<T>(T);
80 impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
81 #[inline(always)]
82 fn push(&mut self, item: D) {
83 self.0.push(item)
84 }
85 }
86 impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
87 #[inline(always)]
88 fn clear(&mut self) {
89 self.0.clear()
90 }
91 }
92 impl<T: columnar::Len> columnar::Len for Timestamps<T> {
93 #[inline(always)]
94 fn len(&self) -> usize {
95 self.0.len()
96 }
97 }
98 impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
99 type Ref = Timestamp;
100
101 #[inline(always)]
102 fn get(&self, index: usize) -> Self::Ref {
103 self.0[index]
104 }
105 }
106
107 impl Columnar for Timestamp {
108 #[inline(always)]
109 fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
110 other
111 }
112 type Container = Timestamps<Vec<Timestamp>>;
113 #[inline(always)]
114 fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
115 where
116 Self: 'a,
117 {
118 thing
119 }
120 }
121
122 impl columnar::Borrow for Timestamps<Vec<Timestamp>> {
123 type Ref<'a> = Timestamp;
124 type Borrowed<'a>
125 = Timestamps<&'a [Timestamp]>
126 where
127 Self: 'a;
128 #[inline(always)]
129 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
130 Timestamps(self.0.as_slice())
131 }
132 #[inline(always)]
133 fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
134 where
135 Self: 'a,
136 {
137 Timestamps(item.0)
138 }
139
140 #[inline(always)]
141 fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
142 where
143 Self: 'a,
144 {
145 item
146 }
147 }
148
149 impl columnar::Container for Timestamps<Vec<Timestamp>> {
150 #[inline(always)]
151 fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
152 self.0.extend_from_self(other.0, range)
153 }
154 #[inline(always)]
155 fn reserve_for<'a, I>(&mut self, selves: I)
156 where
157 Self: 'a,
158 I: Iterator<Item = Self::Borrowed<'a>> + Clone,
159 {
160 self.0.reserve_for(selves.map(|s| s.0));
161 }
162 }
163
164 impl columnar::HeapSize for Timestamp {}
165
166 impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
167 #[inline(always)]
168 fn heap_size(&self) -> (usize, usize) {
169 self.0.heap_size()
170 }
171 }
172
173 impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
174 #[inline(always)]
175 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
176 std::iter::once((
177 u64::cast_from(align_of::<Timestamp>()),
178 bytemuck::cast_slice(self.0),
179 ))
180 }
181 }
182 impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
183 #[inline(always)]
184 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
185 Timestamps(bytemuck::cast_slice(
186 bytes.next().expect("Iterator exhausted prematurely"),
187 ))
188 }
189 }
190}
191
192impl BucketTimestamp for Timestamp {
193 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
194 let rhs = 1_u64.checked_shl(exponent)?;
195 Some(self.internal.checked_add(rhs)?.into())
196 }
197}
198
199pub trait TimestampManipulation:
200 timely::progress::Timestamp
201 + timely::order::TotalOrder
202 + differential_dataflow::lattice::Lattice
203 + std::fmt::Debug
204 + mz_persist_types::StepForward
205 + Sync
206{
207 fn step_forward(&self) -> Self;
210
211 fn step_forward_by(&self, amount: &Self) -> Self;
213
214 fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
216
217 fn try_step_forward(&self) -> Option<Self>;
220
221 fn step_back(&self) -> Option<Self>;
225
226 fn maximum() -> Self;
228
229 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
232
233 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
237}
238
239impl TimestampManipulation for Timestamp {
240 fn step_forward(&self) -> Self {
241 self.step_forward()
242 }
243
244 fn step_forward_by(&self, amount: &Self) -> Self {
245 self.step_forward_by(amount)
246 }
247
248 fn try_step_forward(&self) -> Option<Self> {
249 self.try_step_forward()
250 }
251
252 fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
253 self.try_step_forward_by(amount)
254 }
255
256 fn step_back(&self) -> Option<Self> {
257 self.step_back()
258 }
259
260 fn maximum() -> Self {
261 Self::MAX
262 }
263
264 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
265 schedule.round_up_timestamp(*self)
266 }
267
268 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
269 schedule.round_down_timestamp_m1(*self)
270 }
271}
272
273impl mz_persist_types::StepForward for Timestamp {
274 fn step_forward(&self) -> Self {
275 self.step_forward()
276 }
277}
278
279impl Timestamp {
280 pub const MAX: Self = Self { internal: u64::MAX };
281 pub const MIN: Self = Self { internal: u64::MIN };
282
283 pub const fn new(timestamp: u64) -> Self {
284 Self {
285 internal: timestamp,
286 }
287 }
288
289 pub fn to_bytes(&self) -> [u8; 8] {
290 self.internal.to_le_bytes()
291 }
292
293 pub fn from_bytes(bytes: [u8; 8]) -> Self {
294 Self {
295 internal: u64::from_le_bytes(bytes),
296 }
297 }
298
299 pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
300 Self {
301 internal: self.internal.saturating_sub(rhs.into().internal),
302 }
303 }
304
305 pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
306 Self {
307 internal: self.internal.saturating_add(rhs.into().internal),
308 }
309 }
310
311 pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
312 Self {
313 internal: self.internal.saturating_mul(rhs.into().internal),
314 }
315 }
316
317 pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
318 self.internal
319 .checked_add(rhs.into().internal)
320 .map(|internal| Self { internal })
321 }
322
323 pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
324 self.internal
325 .checked_sub(rhs.into().internal)
326 .map(|internal| Self { internal })
327 }
328
329 pub fn step_forward(&self) -> Self {
332 match self.checked_add(1) {
333 Some(ts) => ts,
334 None => panic!("could not step forward"),
335 }
336 }
337
338 pub fn step_forward_by(&self, amount: &Self) -> Self {
340 match self.checked_add(*amount) {
341 Some(ts) => ts,
342 None => panic!("could not step {self} forward by {amount}"),
343 }
344 }
345
346 pub fn try_step_forward(&self) -> Option<Self> {
349 self.checked_add(1)
350 }
351
352 pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
354 self.checked_add(*amount)
355 }
356
357 pub fn step_back(&self) -> Option<Self> {
361 self.checked_sub(1)
362 }
363}
364
365impl From<u64> for Timestamp {
366 fn from(internal: u64) -> Self {
367 Self { internal }
368 }
369}
370
371impl From<Timestamp> for u64 {
372 fn from(ts: Timestamp) -> Self {
373 ts.internal
374 }
375}
376
377impl From<Timestamp> for u128 {
378 fn from(ts: Timestamp) -> Self {
379 u128::from(ts.internal)
380 }
381}
382
383impl TryFrom<Timestamp> for i64 {
384 type Error = TryFromIntError;
385
386 fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
387 value.internal.try_into()
388 }
389}
390
391impl From<&Timestamp> for u64 {
392 fn from(ts: &Timestamp) -> Self {
393 ts.internal
394 }
395}
396
397impl From<Timestamp> for Numeric {
398 fn from(ts: Timestamp) -> Self {
399 ts.internal.into()
400 }
401}
402
403impl From<Timestamp> for Duration {
404 fn from(ts: Timestamp) -> Self {
405 Duration::from_millis(ts.internal)
406 }
407}
408
409impl std::ops::Rem<Timestamp> for Timestamp {
410 type Output = Timestamp;
411
412 fn rem(self, rhs: Timestamp) -> Self::Output {
413 Self {
414 internal: self.internal % rhs.internal,
415 }
416 }
417}
418
419impl Serialize for Timestamp {
420 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421 where
422 S: Serializer,
423 {
424 self.internal.serialize(serializer)
425 }
426}
427
428impl<'de> Deserialize<'de> for Timestamp {
429 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
430 where
431 D: serde::Deserializer<'de>,
432 {
433 Ok(Self {
434 internal: u64::deserialize(deserializer)?,
435 })
436 }
437}
438
439impl timely::order::PartialOrder for Timestamp {
440 fn less_equal(&self, other: &Self) -> bool {
441 self.internal.less_equal(&other.internal)
442 }
443}
444
445impl timely::order::PartialOrder<&Timestamp> for Timestamp {
446 fn less_equal(&self, other: &&Self) -> bool {
447 self.internal.less_equal(&other.internal)
448 }
449}
450
451impl timely::order::PartialOrder<Timestamp> for &Timestamp {
452 fn less_equal(&self, other: &Timestamp) -> bool {
453 self.internal.less_equal(&other.internal)
454 }
455}
456
457impl timely::order::TotalOrder for Timestamp {}
458
459impl timely::progress::Timestamp for Timestamp {
460 type Summary = Timestamp;
461
462 fn minimum() -> Self {
463 Self::MIN
464 }
465}
466
467impl timely::progress::PathSummary<Timestamp> for Timestamp {
468 #[inline]
469 fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
470 self.internal
471 .checked_add(src.internal)
472 .map(|internal| Self { internal })
473 }
474 #[inline]
475 fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
476 self.internal
477 .checked_add(other.internal)
478 .map(|internal| Self { internal })
479 }
480}
481
482impl timely::progress::timestamp::Refines<()> for Timestamp {
483 fn to_inner(_: ()) -> Timestamp {
484 Default::default()
485 }
486 fn to_outer(self) -> () {
487 ()
488 }
489 fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
490 ()
491 }
492}
493
494impl differential_dataflow::lattice::Lattice for Timestamp {
495 #[inline]
496 fn join(&self, other: &Self) -> Self {
497 ::std::cmp::max(*self, *other)
498 }
499 #[inline]
500 fn meet(&self, other: &Self) -> Self {
501 ::std::cmp::min(*self, *other)
502 }
503}
504
505impl mz_persist_types::Codec64 for Timestamp {
506 fn codec_name() -> String {
507 u64::codec_name()
508 }
509
510 fn encode(&self) -> [u8; 8] {
511 self.internal.encode()
512 }
513
514 fn decode(buf: [u8; 8]) -> Self {
515 Self {
516 internal: u64::decode(buf),
517 }
518 }
519}
520
521impl std::fmt::Display for Timestamp {
522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523 std::fmt::Display::fmt(&self.internal, f)
524 }
525}
526
527impl std::fmt::Debug for Timestamp {
528 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529 std::fmt::Debug::fmt(&self.internal, f)
530 }
531}
532
533impl std::str::FromStr for Timestamp {
534 type Err = String;
535
536 fn from_str(s: &str) -> Result<Self, Self::Err> {
537 Ok(Self {
538 internal: s
539 .parse::<u64>()
540 .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
541 .or_else(|err_num_of_millis| {
542 parse_timestamptz(s)
543 .map_err(|parse_error| {
544 format!(
545 "{}; could not parse as date and time: {}",
546 err_num_of_millis, parse_error
547 )
548 })?
549 .timestamp_millis()
550 .try_into()
551 .map_err(|_| "out of range for mz_timestamp".to_string())
552 })
553 .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
554 })
555 }
556}
557
558impl TryFrom<Duration> for Timestamp {
559 type Error = TryFromIntError;
560
561 fn try_from(value: Duration) -> Result<Self, Self::Error> {
562 Ok(Self {
563 internal: value.as_millis().try_into()?,
564 })
565 }
566}
567
568impl TryFrom<u128> for Timestamp {
569 type Error = TryFromIntError;
570
571 fn try_from(value: u128) -> Result<Self, Self::Error> {
572 Ok(Self {
573 internal: value.try_into()?,
574 })
575 }
576}
577
578impl TryFrom<i64> for Timestamp {
579 type Error = TryFromIntError;
580
581 fn try_from(value: i64) -> Result<Self, Self::Error> {
582 Ok(Self {
583 internal: value.try_into()?,
584 })
585 }
586}
587
588impl TryFrom<Numeric> for Timestamp {
589 type Error = TryFromDecimalError;
590
591 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
592 Ok(Self {
593 internal: value.try_into()?,
594 })
595 }
596}
597
598impl columnation::Columnation for Timestamp {
599 type InnerRegion = columnation::CopyRegion<Timestamp>;
600}