1use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17#[cfg(any(test, feature = "proptest"))]
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize, Serializer};
20
21use crate::adt::numeric::Numeric;
22use crate::refresh_schedule::RefreshSchedule;
23use crate::strconv::parse_timestamptz;
24
25include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
26
27#[derive(
29 Clone,
30 Copy,
32 PartialEq,
33 Eq,
34 PartialOrd,
35 Ord,
36 Hash,
37 Default,
38 bytemuck::AnyBitPattern,
39 bytemuck::NoUninit,
40)]
41#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
42#[repr(transparent)]
43pub struct Timestamp {
44 internal: u64,
46}
47
48impl PartialEq<&Timestamp> for Timestamp {
49 fn eq(&self, other: &&Timestamp) -> bool {
50 self.eq(*other)
51 }
52}
53
54impl PartialEq<Timestamp> for &Timestamp {
55 fn eq(&self, other: &Timestamp) -> bool {
56 self.internal.eq(&other.internal)
57 }
58}
59
60impl RustType<ProtoTimestamp> for Timestamp {
61 fn into_proto(&self) -> ProtoTimestamp {
62 ProtoTimestamp {
63 internal: self.into(),
64 }
65 }
66
67 fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
68 Ok(Timestamp::new(proto.internal))
69 }
70}
71
72mod columnar_timestamp {
73 use crate::Timestamp;
74 use columnar::Columnar;
75 use mz_ore::cast::CastFrom;
76 use std::ops::Range;
77
78 #[derive(Clone, Copy, Default, Debug)]
80 pub struct Timestamps<T>(T);
81 impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
82 #[inline(always)]
83 fn push(&mut self, item: D) {
84 self.0.push(item)
85 }
86 }
87 impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
88 #[inline(always)]
89 fn clear(&mut self) {
90 self.0.clear()
91 }
92 }
93 impl<T: columnar::Len> columnar::Len for Timestamps<T> {
94 #[inline(always)]
95 fn len(&self) -> usize {
96 self.0.len()
97 }
98 }
99 impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
100 type Ref = Timestamp;
101
102 #[inline(always)]
103 fn get(&self, index: usize) -> Self::Ref {
104 self.0[index]
105 }
106 }
107
108 impl Columnar for Timestamp {
109 #[inline(always)]
110 fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
111 other
112 }
113 type Container = Timestamps<Vec<Timestamp>>;
114 #[inline(always)]
115 fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
116 where
117 Self: 'a,
118 {
119 thing
120 }
121 }
122
123 impl columnar::Borrow for Timestamps<Vec<Timestamp>> {
124 type Ref<'a> = Timestamp;
125 type Borrowed<'a>
126 = Timestamps<&'a [Timestamp]>
127 where
128 Self: 'a;
129 #[inline(always)]
130 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
131 Timestamps(self.0.as_slice())
132 }
133 #[inline(always)]
134 fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
135 where
136 Self: 'a,
137 {
138 Timestamps(item.0)
139 }
140
141 #[inline(always)]
142 fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
143 where
144 Self: 'a,
145 {
146 item
147 }
148 }
149
150 impl columnar::Container for Timestamps<Vec<Timestamp>> {
151 #[inline(always)]
152 fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
153 self.0.extend_from_self(other.0, range)
154 }
155 #[inline(always)]
156 fn reserve_for<'a, I>(&mut self, selves: I)
157 where
158 Self: 'a,
159 I: Iterator<Item = Self::Borrowed<'a>> + Clone,
160 {
161 self.0.reserve_for(selves.map(|s| s.0));
162 }
163 }
164
165 impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
166 const SLICE_COUNT: usize = 1;
167 #[inline(always)]
168 fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
169 debug_assert!(index < Self::SLICE_COUNT);
170 (
171 u64::cast_from(align_of::<Timestamp>()),
172 bytemuck::cast_slice(self.0),
173 )
174 }
175 #[inline(always)]
176 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
177 std::iter::once((
178 u64::cast_from(align_of::<Timestamp>()),
179 bytemuck::cast_slice(self.0),
180 ))
181 }
182 }
183 impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
184 const SLICE_COUNT: usize = 1;
185 #[inline(always)]
186 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
187 Timestamps(bytemuck::cast_slice(
188 bytes.next().expect("Iterator exhausted prematurely"),
189 ))
190 }
191 }
192}
193
194impl BucketTimestamp for Timestamp {
195 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
196 let rhs = 1_u64.checked_shl(exponent)?;
197 Some(self.internal.checked_add(rhs)?.into())
198 }
199}
200
201pub trait TimestampManipulation:
202 timely::progress::Timestamp
203 + timely::order::TotalOrder
204 + differential_dataflow::lattice::Lattice
205 + std::fmt::Debug
206 + mz_persist_types::StepForward
207 + Sync
208{
209 fn step_forward(&self) -> Self;
212
213 fn step_forward_by(&self, amount: &Self) -> Self;
215
216 fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
218
219 fn try_step_forward(&self) -> Option<Self>;
222
223 fn step_back(&self) -> Option<Self>;
227
228 fn maximum() -> Self;
230
231 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
234
235 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
239}
240
241impl TimestampManipulation for Timestamp {
242 fn step_forward(&self) -> Self {
243 self.step_forward()
244 }
245
246 fn step_forward_by(&self, amount: &Self) -> Self {
247 self.step_forward_by(amount)
248 }
249
250 fn try_step_forward(&self) -> Option<Self> {
251 self.try_step_forward()
252 }
253
254 fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
255 self.try_step_forward_by(amount)
256 }
257
258 fn step_back(&self) -> Option<Self> {
259 self.step_back()
260 }
261
262 fn maximum() -> Self {
263 Self::MAX
264 }
265
266 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
267 schedule.round_up_timestamp(*self)
268 }
269
270 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
271 schedule.round_down_timestamp_m1(*self)
272 }
273}
274
275impl mz_persist_types::StepForward for Timestamp {
276 fn step_forward(&self) -> Self {
277 self.step_forward()
278 }
279}
280
281impl Timestamp {
282 pub const MAX: Self = Self { internal: u64::MAX };
283 pub const MIN: Self = Self { internal: u64::MIN };
284
285 pub const fn new(timestamp: u64) -> Self {
286 Self {
287 internal: timestamp,
288 }
289 }
290
291 pub fn to_bytes(&self) -> [u8; 8] {
292 self.internal.to_le_bytes()
293 }
294
295 pub fn from_bytes(bytes: [u8; 8]) -> Self {
296 Self {
297 internal: u64::from_le_bytes(bytes),
298 }
299 }
300
301 pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
302 Self {
303 internal: self.internal.saturating_sub(rhs.into().internal),
304 }
305 }
306
307 pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
308 Self {
309 internal: self.internal.saturating_add(rhs.into().internal),
310 }
311 }
312
313 pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
314 Self {
315 internal: self.internal.saturating_mul(rhs.into().internal),
316 }
317 }
318
319 pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
320 self.internal
321 .checked_add(rhs.into().internal)
322 .map(|internal| Self { internal })
323 }
324
325 pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
326 self.internal
327 .checked_sub(rhs.into().internal)
328 .map(|internal| Self { internal })
329 }
330
331 pub fn step_forward(&self) -> Self {
334 match self.checked_add(1) {
335 Some(ts) => ts,
336 None => panic!("could not step forward"),
337 }
338 }
339
340 pub fn step_forward_by(&self, amount: &Self) -> Self {
342 match self.checked_add(*amount) {
343 Some(ts) => ts,
344 None => panic!("could not step {self} forward by {amount}"),
345 }
346 }
347
348 pub fn try_step_forward(&self) -> Option<Self> {
351 self.checked_add(1)
352 }
353
354 pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
356 self.checked_add(*amount)
357 }
358
359 pub fn step_back(&self) -> Option<Self> {
363 self.checked_sub(1)
364 }
365}
366
367impl From<u64> for Timestamp {
368 fn from(internal: u64) -> Self {
369 Self { internal }
370 }
371}
372
373impl From<Timestamp> for u64 {
374 fn from(ts: Timestamp) -> Self {
375 ts.internal
376 }
377}
378
379impl From<Timestamp> for u128 {
380 fn from(ts: Timestamp) -> Self {
381 u128::from(ts.internal)
382 }
383}
384
385impl TryFrom<Timestamp> for i64 {
386 type Error = TryFromIntError;
387
388 fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
389 value.internal.try_into()
390 }
391}
392
393impl From<&Timestamp> for u64 {
394 fn from(ts: &Timestamp) -> Self {
395 ts.internal
396 }
397}
398
399impl From<Timestamp> for Numeric {
400 fn from(ts: Timestamp) -> Self {
401 ts.internal.into()
402 }
403}
404
405impl From<Timestamp> for Duration {
406 fn from(ts: Timestamp) -> Self {
407 Duration::from_millis(ts.internal)
408 }
409}
410
411impl std::ops::Rem<Timestamp> for Timestamp {
412 type Output = Timestamp;
413
414 fn rem(self, rhs: Timestamp) -> Self::Output {
415 Self {
416 internal: self.internal % rhs.internal,
417 }
418 }
419}
420
421impl Serialize for Timestamp {
422 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
423 where
424 S: Serializer,
425 {
426 self.internal.serialize(serializer)
427 }
428}
429
430impl<'de> Deserialize<'de> for Timestamp {
431 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
432 where
433 D: serde::Deserializer<'de>,
434 {
435 Ok(Self {
436 internal: u64::deserialize(deserializer)?,
437 })
438 }
439}
440
441impl timely::order::PartialOrder for Timestamp {
442 fn less_equal(&self, other: &Self) -> bool {
443 self.internal.less_equal(&other.internal)
444 }
445}
446
447impl timely::order::PartialOrder<&Timestamp> for Timestamp {
448 fn less_equal(&self, other: &&Self) -> bool {
449 self.internal.less_equal(&other.internal)
450 }
451}
452
453impl timely::order::PartialOrder<Timestamp> for &Timestamp {
454 fn less_equal(&self, other: &Timestamp) -> bool {
455 self.internal.less_equal(&other.internal)
456 }
457}
458
459impl timely::order::TotalOrder for Timestamp {}
460
461impl timely::progress::Timestamp for Timestamp {
462 type Summary = Timestamp;
463
464 fn minimum() -> Self {
465 Self::MIN
466 }
467}
468
469impl timely::progress::PathSummary<Timestamp> for Timestamp {
470 #[inline]
471 fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
472 self.internal
473 .checked_add(src.internal)
474 .map(|internal| Self { internal })
475 }
476 #[inline]
477 fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
478 self.internal
479 .checked_add(other.internal)
480 .map(|internal| Self { internal })
481 }
482}
483
484impl timely::progress::timestamp::Refines<()> for Timestamp {
485 fn to_inner(_: ()) -> Timestamp {
486 Default::default()
487 }
488 fn to_outer(self) -> () {
489 ()
490 }
491 fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
492 ()
493 }
494}
495
496impl differential_dataflow::lattice::Lattice for Timestamp {
497 #[inline]
498 fn join(&self, other: &Self) -> Self {
499 ::std::cmp::max(*self, *other)
500 }
501 #[inline]
502 fn meet(&self, other: &Self) -> Self {
503 ::std::cmp::min(*self, *other)
504 }
505}
506
507impl mz_persist_types::Codec64 for Timestamp {
508 fn codec_name() -> String {
509 u64::codec_name()
510 }
511
512 fn encode(&self) -> [u8; 8] {
513 self.internal.encode()
514 }
515
516 fn decode(buf: [u8; 8]) -> Self {
517 Self {
518 internal: u64::decode(buf),
519 }
520 }
521}
522
523impl std::fmt::Display for Timestamp {
524 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525 std::fmt::Display::fmt(&self.internal, f)
526 }
527}
528
529impl std::fmt::Debug for Timestamp {
530 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531 std::fmt::Debug::fmt(&self.internal, f)
532 }
533}
534
535impl std::str::FromStr for Timestamp {
536 type Err = String;
537
538 fn from_str(s: &str) -> Result<Self, Self::Err> {
539 Ok(Self {
540 internal: s
541 .parse::<u64>()
542 .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
543 .or_else(|err_num_of_millis| {
544 parse_timestamptz(s)
545 .map_err(|parse_error| {
546 format!(
547 "{}; could not parse as date and time: {}",
548 err_num_of_millis, parse_error
549 )
550 })?
551 .timestamp_millis()
552 .try_into()
553 .map_err(|_| "out of range for mz_timestamp".to_string())
554 })
555 .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
556 })
557 }
558}
559
560impl TryFrom<Duration> for Timestamp {
561 type Error = TryFromIntError;
562
563 fn try_from(value: Duration) -> Result<Self, Self::Error> {
564 Ok(Self {
565 internal: value.as_millis().try_into()?,
566 })
567 }
568}
569
570impl TryFrom<u128> for Timestamp {
571 type Error = TryFromIntError;
572
573 fn try_from(value: u128) -> Result<Self, Self::Error> {
574 Ok(Self {
575 internal: value.try_into()?,
576 })
577 }
578}
579
580impl TryFrom<i64> for Timestamp {
581 type Error = TryFromIntError;
582
583 fn try_from(value: i64) -> Result<Self, Self::Error> {
584 Ok(Self {
585 internal: value.try_into()?,
586 })
587 }
588}
589
590impl TryFrom<Numeric> for Timestamp {
591 type Error = TryFromDecimalError;
592
593 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
594 Ok(Self {
595 internal: value.try_into()?,
596 })
597 }
598}
599
600impl columnation::Columnation for Timestamp {
601 type InnerRegion = columnation::CopyRegion<Timestamp>;
602}