1use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use dec::TryFromDecimalError;
15use mz_proto::{RustType, TryFromProtoError};
16use mz_timely_util::temporal::BucketTimestamp;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26#[derive(
28 Clone,
29 Copy,
31 PartialEq,
32 Eq,
33 PartialOrd,
34 Ord,
35 Hash,
36 Default,
37 Arbitrary,
38 bytemuck::AnyBitPattern,
39 bytemuck::NoUninit,
40)]
41#[repr(transparent)]
42pub struct Timestamp {
43 internal: u64,
45}
46
47impl PartialEq<&Timestamp> for Timestamp {
48 fn eq(&self, other: &&Timestamp) -> bool {
49 self.eq(*other)
50 }
51}
52
53impl PartialEq<Timestamp> for &Timestamp {
54 fn eq(&self, other: &Timestamp) -> bool {
55 self.internal.eq(&other.internal)
56 }
57}
58
59impl RustType<ProtoTimestamp> for Timestamp {
60 fn into_proto(&self) -> ProtoTimestamp {
61 ProtoTimestamp {
62 internal: self.into(),
63 }
64 }
65
66 fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
67 Ok(Timestamp::new(proto.internal))
68 }
69}
70
71mod columnar_timestamp {
72 use crate::Timestamp;
73 use columnar::Columnar;
74 use mz_ore::cast::CastFrom;
75
76 #[derive(Clone, Copy, Default, Debug)]
78 pub struct Timestamps<T>(T);
79 impl<D, T: columnar::Push<D>> columnar::Push<D> for Timestamps<T> {
80 #[inline(always)]
81 fn push(&mut self, item: D) {
82 self.0.push(item)
83 }
84 }
85 impl<T: columnar::Clear> columnar::Clear for Timestamps<T> {
86 #[inline(always)]
87 fn clear(&mut self) {
88 self.0.clear()
89 }
90 }
91 impl<T: columnar::Len> columnar::Len for Timestamps<T> {
92 #[inline(always)]
93 fn len(&self) -> usize {
94 self.0.len()
95 }
96 }
97 impl<'a> columnar::Index for Timestamps<&'a [Timestamp]> {
98 type Ref = Timestamp;
99
100 #[inline(always)]
101 fn get(&self, index: usize) -> Self::Ref {
102 self.0[index]
103 }
104 }
105
106 impl Columnar for Timestamp {
107 #[inline(always)]
108 fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
109 other
110 }
111 type Container = Timestamps<Vec<Timestamp>>;
112 #[inline(always)]
113 fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
114 where
115 Self: 'a,
116 {
117 thing
118 }
119 }
120
121 impl columnar::Container for Timestamps<Vec<Timestamp>> {
122 type Ref<'a> = Timestamp;
123 type Borrowed<'a>
124 = Timestamps<&'a [Timestamp]>
125 where
126 Self: 'a;
127 #[inline(always)]
128 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
129 Timestamps(self.0.as_slice())
130 }
131 #[inline(always)]
132 fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
133 where
134 Self: 'a,
135 {
136 Timestamps(item.0)
137 }
138
139 #[inline(always)]
140 fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
141 where
142 Self: 'a,
143 {
144 item
145 }
146
147 #[inline(always)]
148 fn reserve_for<'a, I>(&mut self, selves: I)
149 where
150 Self: 'a,
151 I: Iterator<Item = Self::Borrowed<'a>> + Clone,
152 {
153 self.0.reserve_for(selves.map(|s| s.0));
154 }
155 }
156
157 impl columnar::HeapSize for Timestamp {}
158
159 impl<T: columnar::HeapSize> columnar::HeapSize for Timestamps<T> {
160 #[inline(always)]
161 fn heap_size(&self) -> (usize, usize) {
162 self.0.heap_size()
163 }
164 }
165
166 impl<'a> columnar::AsBytes<'a> for Timestamps<&'a [Timestamp]> {
167 #[inline(always)]
168 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
169 std::iter::once((
170 u64::cast_from(align_of::<Timestamp>()),
171 bytemuck::cast_slice(self.0),
172 ))
173 }
174 }
175 impl<'a> columnar::FromBytes<'a> for Timestamps<&'a [Timestamp]> {
176 #[inline(always)]
177 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
178 Timestamps(bytemuck::cast_slice(
179 bytes.next().expect("Iterator exhausted prematurely"),
180 ))
181 }
182 }
183}
184
185impl BucketTimestamp for Timestamp {
186 fn advance_by_power_of_two(&self, exponent: u32) -> Option<Self> {
187 let rhs = 1_u64.checked_shl(exponent)?;
188 Some(self.internal.checked_add(rhs)?.into())
189 }
190}
191
192pub trait TimestampManipulation:
193 timely::progress::Timestamp
194 + timely::order::TotalOrder
195 + differential_dataflow::lattice::Lattice
196 + std::fmt::Debug
197 + mz_persist_types::StepForward
198 + Sync
199{
200 fn step_forward(&self) -> Self;
203
204 fn step_forward_by(&self, amount: &Self) -> Self;
206
207 fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
209
210 fn try_step_forward(&self) -> Option<Self>;
213
214 fn step_back(&self) -> Option<Self>;
218
219 fn maximum() -> Self;
221
222 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
225
226 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
230}
231
232impl TimestampManipulation for Timestamp {
233 fn step_forward(&self) -> Self {
234 self.step_forward()
235 }
236
237 fn step_forward_by(&self, amount: &Self) -> Self {
238 self.step_forward_by(amount)
239 }
240
241 fn try_step_forward(&self) -> Option<Self> {
242 self.try_step_forward()
243 }
244
245 fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
246 self.try_step_forward_by(amount)
247 }
248
249 fn step_back(&self) -> Option<Self> {
250 self.step_back()
251 }
252
253 fn maximum() -> Self {
254 Self::MAX
255 }
256
257 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
258 schedule.round_up_timestamp(*self)
259 }
260
261 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
262 schedule.round_down_timestamp_m1(*self)
263 }
264}
265
266impl mz_persist_types::StepForward for Timestamp {
267 fn step_forward(&self) -> Self {
268 self.step_forward()
269 }
270}
271
272impl Timestamp {
273 pub const MAX: Self = Self { internal: u64::MAX };
274 pub const MIN: Self = Self { internal: u64::MIN };
275
276 pub const fn new(timestamp: u64) -> Self {
277 Self {
278 internal: timestamp,
279 }
280 }
281
282 pub fn to_bytes(&self) -> [u8; 8] {
283 self.internal.to_le_bytes()
284 }
285
286 pub fn from_bytes(bytes: [u8; 8]) -> Self {
287 Self {
288 internal: u64::from_le_bytes(bytes),
289 }
290 }
291
292 pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
293 Self {
294 internal: self.internal.saturating_sub(rhs.into().internal),
295 }
296 }
297
298 pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
299 Self {
300 internal: self.internal.saturating_add(rhs.into().internal),
301 }
302 }
303
304 pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
305 Self {
306 internal: self.internal.saturating_mul(rhs.into().internal),
307 }
308 }
309
310 pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
311 self.internal
312 .checked_add(rhs.into().internal)
313 .map(|internal| Self { internal })
314 }
315
316 pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
317 self.internal
318 .checked_sub(rhs.into().internal)
319 .map(|internal| Self { internal })
320 }
321
322 pub fn step_forward(&self) -> Self {
325 match self.checked_add(1) {
326 Some(ts) => ts,
327 None => panic!("could not step forward"),
328 }
329 }
330
331 pub fn step_forward_by(&self, amount: &Self) -> Self {
333 match self.checked_add(*amount) {
334 Some(ts) => ts,
335 None => panic!("could not step {self} forward by {amount}"),
336 }
337 }
338
339 pub fn try_step_forward(&self) -> Option<Self> {
342 self.checked_add(1)
343 }
344
345 pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
347 self.checked_add(*amount)
348 }
349
350 pub fn step_back(&self) -> Option<Self> {
354 self.checked_sub(1)
355 }
356}
357
358impl From<u64> for Timestamp {
359 fn from(internal: u64) -> Self {
360 Self { internal }
361 }
362}
363
364impl From<Timestamp> for u64 {
365 fn from(ts: Timestamp) -> Self {
366 ts.internal
367 }
368}
369
370impl From<Timestamp> for u128 {
371 fn from(ts: Timestamp) -> Self {
372 u128::from(ts.internal)
373 }
374}
375
376impl TryFrom<Timestamp> for i64 {
377 type Error = TryFromIntError;
378
379 fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
380 value.internal.try_into()
381 }
382}
383
384impl From<&Timestamp> for u64 {
385 fn from(ts: &Timestamp) -> Self {
386 ts.internal
387 }
388}
389
390impl From<Timestamp> for Numeric {
391 fn from(ts: Timestamp) -> Self {
392 ts.internal.into()
393 }
394}
395
396impl From<Timestamp> for Duration {
397 fn from(ts: Timestamp) -> Self {
398 Duration::from_millis(ts.internal)
399 }
400}
401
402impl std::ops::Rem<Timestamp> for Timestamp {
403 type Output = Timestamp;
404
405 fn rem(self, rhs: Timestamp) -> Self::Output {
406 Self {
407 internal: self.internal % rhs.internal,
408 }
409 }
410}
411
412impl Serialize for Timestamp {
413 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
414 where
415 S: Serializer,
416 {
417 self.internal.serialize(serializer)
418 }
419}
420
421impl<'de> Deserialize<'de> for Timestamp {
422 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423 where
424 D: serde::Deserializer<'de>,
425 {
426 Ok(Self {
427 internal: u64::deserialize(deserializer)?,
428 })
429 }
430}
431
432impl timely::order::PartialOrder for Timestamp {
433 fn less_equal(&self, other: &Self) -> bool {
434 self.internal.less_equal(&other.internal)
435 }
436}
437
438impl timely::order::PartialOrder<&Timestamp> for Timestamp {
439 fn less_equal(&self, other: &&Self) -> bool {
440 self.internal.less_equal(&other.internal)
441 }
442}
443
444impl timely::order::PartialOrder<Timestamp> for &Timestamp {
445 fn less_equal(&self, other: &Timestamp) -> bool {
446 self.internal.less_equal(&other.internal)
447 }
448}
449
450impl timely::order::TotalOrder for Timestamp {}
451
452impl timely::progress::Timestamp for Timestamp {
453 type Summary = Timestamp;
454
455 fn minimum() -> Self {
456 Self::MIN
457 }
458}
459
460impl timely::progress::PathSummary<Timestamp> for Timestamp {
461 #[inline]
462 fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
463 self.internal
464 .checked_add(src.internal)
465 .map(|internal| Self { internal })
466 }
467 #[inline]
468 fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
469 self.internal
470 .checked_add(other.internal)
471 .map(|internal| Self { internal })
472 }
473}
474
475impl timely::progress::timestamp::Refines<()> for Timestamp {
476 fn to_inner(_: ()) -> Timestamp {
477 Default::default()
478 }
479 fn to_outer(self) -> () {
480 ()
481 }
482 fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
483 ()
484 }
485}
486
487impl differential_dataflow::lattice::Lattice for Timestamp {
488 #[inline]
489 fn join(&self, other: &Self) -> Self {
490 ::std::cmp::max(*self, *other)
491 }
492 #[inline]
493 fn meet(&self, other: &Self) -> Self {
494 ::std::cmp::min(*self, *other)
495 }
496}
497
498impl mz_persist_types::Codec64 for Timestamp {
499 fn codec_name() -> String {
500 u64::codec_name()
501 }
502
503 fn encode(&self) -> [u8; 8] {
504 self.internal.encode()
505 }
506
507 fn decode(buf: [u8; 8]) -> Self {
508 Self {
509 internal: u64::decode(buf),
510 }
511 }
512}
513
514impl std::fmt::Display for Timestamp {
515 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516 std::fmt::Display::fmt(&self.internal, f)
517 }
518}
519
520impl std::fmt::Debug for Timestamp {
521 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522 std::fmt::Debug::fmt(&self.internal, f)
523 }
524}
525
526impl std::str::FromStr for Timestamp {
527 type Err = String;
528
529 fn from_str(s: &str) -> Result<Self, Self::Err> {
530 Ok(Self {
531 internal: s
532 .parse::<u64>()
533 .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
534 .or_else(|err_num_of_millis| {
535 parse_timestamptz(s)
536 .map_err(|parse_error| {
537 format!(
538 "{}; could not parse as date and time: {}",
539 err_num_of_millis, parse_error
540 )
541 })?
542 .timestamp_millis()
543 .try_into()
544 .map_err(|_| "out of range for mz_timestamp".to_string())
545 })
546 .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
547 })
548 }
549}
550
551impl TryFrom<Duration> for Timestamp {
552 type Error = TryFromIntError;
553
554 fn try_from(value: Duration) -> Result<Self, Self::Error> {
555 Ok(Self {
556 internal: value.as_millis().try_into()?,
557 })
558 }
559}
560
561impl TryFrom<u128> for Timestamp {
562 type Error = TryFromIntError;
563
564 fn try_from(value: u128) -> Result<Self, Self::Error> {
565 Ok(Self {
566 internal: value.try_into()?,
567 })
568 }
569}
570
571impl TryFrom<i64> for Timestamp {
572 type Error = TryFromIntError;
573
574 fn try_from(value: i64) -> Result<Self, Self::Error> {
575 Ok(Self {
576 internal: value.try_into()?,
577 })
578 }
579}
580
581impl TryFrom<Numeric> for Timestamp {
582 type Error = TryFromDecimalError;
583
584 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
585 Ok(Self {
586 internal: value.try_into()?,
587 })
588 }
589}
590
591impl columnation::Columnation for Timestamp {
592 type InnerRegion = columnation::CopyRegion<Timestamp>;
593}