mz_repr/
timestamp.rs
1use std::convert::TryFrom;
11use std::num::TryFromIntError;
12use std::time::Duration;
13
14use columnar::Columnar;
15use dec::TryFromDecimalError;
16use mz_proto::{RustType, TryFromProtoError};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize, Serializer};
19
20use crate::adt::numeric::Numeric;
21use crate::refresh_schedule::RefreshSchedule;
22use crate::strconv::parse_timestamptz;
23
24include!(concat!(env!("OUT_DIR"), "/mz_repr.timestamp.rs"));
25
26#[derive(
28 Clone,
29 Copy,
31 PartialEq,
32 Eq,
33 PartialOrd,
34 Ord,
35 Hash,
36 Default,
37 Arbitrary,
38 Columnar,
39)]
40#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
41pub struct Timestamp {
42 internal: u64,
44}
45
46impl PartialEq<&Timestamp> for Timestamp {
47 fn eq(&self, other: &&Timestamp) -> bool {
48 self.eq(*other)
49 }
50}
51
52impl PartialEq<Timestamp> for &Timestamp {
53 fn eq(&self, other: &Timestamp) -> bool {
54 self.internal.eq(&other.internal)
55 }
56}
57
58impl RustType<ProtoTimestamp> for Timestamp {
59 fn into_proto(&self) -> ProtoTimestamp {
60 ProtoTimestamp {
61 internal: self.into(),
62 }
63 }
64
65 fn from_proto(proto: ProtoTimestamp) -> Result<Self, TryFromProtoError> {
66 Ok(Timestamp::new(proto.internal))
67 }
68}
69
70pub trait TimestampManipulation:
71 timely::progress::Timestamp
72 + timely::order::TotalOrder
73 + differential_dataflow::lattice::Lattice
74 + std::fmt::Debug
75 + mz_persist_types::StepForward
76 + Sync
77{
78 fn step_forward(&self) -> Self;
81
82 fn step_forward_by(&self, amount: &Self) -> Self;
84
85 fn try_step_forward_by(&self, amount: &Self) -> Option<Self>;
87
88 fn try_step_forward(&self) -> Option<Self>;
91
92 fn step_back(&self) -> Option<Self>;
96
97 fn maximum() -> Self;
99
100 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self>;
103
104 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self>;
108}
109
110impl TimestampManipulation for Timestamp {
111 fn step_forward(&self) -> Self {
112 self.step_forward()
113 }
114
115 fn step_forward_by(&self, amount: &Self) -> Self {
116 self.step_forward_by(amount)
117 }
118
119 fn try_step_forward(&self) -> Option<Self> {
120 self.try_step_forward()
121 }
122
123 fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
124 self.try_step_forward_by(amount)
125 }
126
127 fn step_back(&self) -> Option<Self> {
128 self.step_back()
129 }
130
131 fn maximum() -> Self {
132 Self::MAX
133 }
134
135 fn round_up(&self, schedule: &RefreshSchedule) -> Option<Self> {
136 schedule.round_up_timestamp(*self)
137 }
138
139 fn round_down_minus_1(&self, schedule: &RefreshSchedule) -> Option<Self> {
140 schedule.round_down_timestamp_m1(*self)
141 }
142}
143
144impl mz_persist_types::StepForward for Timestamp {
145 fn step_forward(&self) -> Self {
146 self.step_forward()
147 }
148}
149
150impl Timestamp {
151 pub const MAX: Self = Self { internal: u64::MAX };
152 pub const MIN: Self = Self { internal: u64::MIN };
153
154 pub const fn new(timestamp: u64) -> Self {
155 Self {
156 internal: timestamp,
157 }
158 }
159
160 pub fn to_bytes(&self) -> [u8; 8] {
161 self.internal.to_le_bytes()
162 }
163
164 pub fn from_bytes(bytes: [u8; 8]) -> Self {
165 Self {
166 internal: u64::from_le_bytes(bytes),
167 }
168 }
169
170 pub fn saturating_sub<I: Into<Self>>(self, rhs: I) -> Self {
171 Self {
172 internal: self.internal.saturating_sub(rhs.into().internal),
173 }
174 }
175
176 pub fn saturating_add<I: Into<Self>>(self, rhs: I) -> Self {
177 Self {
178 internal: self.internal.saturating_add(rhs.into().internal),
179 }
180 }
181
182 pub fn saturating_mul<I: Into<Self>>(self, rhs: I) -> Self {
183 Self {
184 internal: self.internal.saturating_mul(rhs.into().internal),
185 }
186 }
187
188 pub fn checked_add<I: Into<Self>>(self, rhs: I) -> Option<Self> {
189 self.internal
190 .checked_add(rhs.into().internal)
191 .map(|internal| Self { internal })
192 }
193
194 pub fn checked_sub<I: Into<Self>>(self, rhs: I) -> Option<Self> {
195 self.internal
196 .checked_sub(rhs.into().internal)
197 .map(|internal| Self { internal })
198 }
199
200 pub fn step_forward(&self) -> Self {
203 match self.checked_add(1) {
204 Some(ts) => ts,
205 None => panic!("could not step forward"),
206 }
207 }
208
209 pub fn step_forward_by(&self, amount: &Self) -> Self {
211 match self.checked_add(*amount) {
212 Some(ts) => ts,
213 None => panic!("could not step {self} forward by {amount}"),
214 }
215 }
216
217 pub fn try_step_forward(&self) -> Option<Self> {
220 self.checked_add(1)
221 }
222
223 pub fn try_step_forward_by(&self, amount: &Self) -> Option<Self> {
225 self.checked_add(*amount)
226 }
227
228 pub fn step_back(&self) -> Option<Self> {
232 self.checked_sub(1)
233 }
234}
235
236impl From<u64> for Timestamp {
237 fn from(internal: u64) -> Self {
238 Self { internal }
239 }
240}
241
242impl From<Timestamp> for u64 {
243 fn from(ts: Timestamp) -> Self {
244 ts.internal
245 }
246}
247
248impl From<Timestamp> for u128 {
249 fn from(ts: Timestamp) -> Self {
250 u128::from(ts.internal)
251 }
252}
253
254impl TryFrom<Timestamp> for i64 {
255 type Error = TryFromIntError;
256
257 fn try_from(value: Timestamp) -> Result<Self, Self::Error> {
258 value.internal.try_into()
259 }
260}
261
262impl From<&Timestamp> for u64 {
263 fn from(ts: &Timestamp) -> Self {
264 ts.internal
265 }
266}
267
268impl From<Timestamp> for Numeric {
269 fn from(ts: Timestamp) -> Self {
270 ts.internal.into()
271 }
272}
273
274impl From<Timestamp> for Duration {
275 fn from(ts: Timestamp) -> Self {
276 Duration::from_millis(ts.internal)
277 }
278}
279
280impl std::ops::Rem<Timestamp> for Timestamp {
281 type Output = Timestamp;
282
283 fn rem(self, rhs: Timestamp) -> Self::Output {
284 Self {
285 internal: self.internal % rhs.internal,
286 }
287 }
288}
289
290impl Serialize for Timestamp {
291 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
292 where
293 S: Serializer,
294 {
295 self.internal.serialize(serializer)
296 }
297}
298
299impl<'de> Deserialize<'de> for Timestamp {
300 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
301 where
302 D: serde::Deserializer<'de>,
303 {
304 Ok(Self {
305 internal: u64::deserialize(deserializer)?,
306 })
307 }
308}
309
310impl timely::order::PartialOrder for Timestamp {
311 fn less_equal(&self, other: &Self) -> bool {
312 self.internal.less_equal(&other.internal)
313 }
314}
315
316impl timely::order::PartialOrder<&Timestamp> for Timestamp {
317 fn less_equal(&self, other: &&Self) -> bool {
318 self.internal.less_equal(&other.internal)
319 }
320}
321
322impl timely::order::PartialOrder<Timestamp> for &Timestamp {
323 fn less_equal(&self, other: &Timestamp) -> bool {
324 self.internal.less_equal(&other.internal)
325 }
326}
327
328impl timely::order::TotalOrder for Timestamp {}
329
330impl timely::progress::Timestamp for Timestamp {
331 type Summary = Timestamp;
332
333 fn minimum() -> Self {
334 Self::MIN
335 }
336}
337
338impl timely::progress::PathSummary<Timestamp> for Timestamp {
339 #[inline]
340 fn results_in(&self, src: &Timestamp) -> Option<Timestamp> {
341 self.internal
342 .checked_add(src.internal)
343 .map(|internal| Self { internal })
344 }
345 #[inline]
346 fn followed_by(&self, other: &Timestamp) -> Option<Timestamp> {
347 self.internal
348 .checked_add(other.internal)
349 .map(|internal| Self { internal })
350 }
351}
352
353impl timely::progress::timestamp::Refines<()> for Timestamp {
354 fn to_inner(_: ()) -> Timestamp {
355 Default::default()
356 }
357 fn to_outer(self) -> () {
358 ()
359 }
360 fn summarize(_: <Timestamp as timely::progress::timestamp::Timestamp>::Summary) -> () {
361 ()
362 }
363}
364
365impl differential_dataflow::lattice::Lattice for Timestamp {
366 #[inline]
367 fn join(&self, other: &Self) -> Self {
368 ::std::cmp::max(*self, *other)
369 }
370 #[inline]
371 fn meet(&self, other: &Self) -> Self {
372 ::std::cmp::min(*self, *other)
373 }
374}
375
376impl mz_persist_types::Codec64 for Timestamp {
377 fn codec_name() -> String {
378 u64::codec_name()
379 }
380
381 fn encode(&self) -> [u8; 8] {
382 self.internal.encode()
383 }
384
385 fn decode(buf: [u8; 8]) -> Self {
386 Self {
387 internal: u64::decode(buf),
388 }
389 }
390}
391
392impl std::fmt::Display for Timestamp {
393 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
394 std::fmt::Display::fmt(&self.internal, f)
395 }
396}
397
398impl std::fmt::Debug for Timestamp {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 std::fmt::Debug::fmt(&self.internal, f)
401 }
402}
403
404impl std::str::FromStr for Timestamp {
405 type Err = String;
406
407 fn from_str(s: &str) -> Result<Self, Self::Err> {
408 Ok(Self {
409 internal: s
410 .parse::<u64>()
411 .map_err(|_| "could not parse as number of milliseconds since epoch".to_string())
412 .or_else(|err_num_of_millis| {
413 parse_timestamptz(s)
414 .map_err(|parse_error| {
415 format!(
416 "{}; could not parse as date and time: {}",
417 err_num_of_millis, parse_error
418 )
419 })?
420 .timestamp_millis()
421 .try_into()
422 .map_err(|_| "out of range for mz_timestamp".to_string())
423 })
424 .map_err(|e: String| format!("could not parse mz_timestamp: {}", e))?,
425 })
426 }
427}
428
429impl TryFrom<Duration> for Timestamp {
430 type Error = TryFromIntError;
431
432 fn try_from(value: Duration) -> Result<Self, Self::Error> {
433 Ok(Self {
434 internal: value.as_millis().try_into()?,
435 })
436 }
437}
438
439impl TryFrom<u128> for Timestamp {
440 type Error = TryFromIntError;
441
442 fn try_from(value: u128) -> Result<Self, Self::Error> {
443 Ok(Self {
444 internal: value.try_into()?,
445 })
446 }
447}
448
449impl TryFrom<i64> for Timestamp {
450 type Error = TryFromIntError;
451
452 fn try_from(value: i64) -> Result<Self, Self::Error> {
453 Ok(Self {
454 internal: value.try_into()?,
455 })
456 }
457}
458
459impl TryFrom<Numeric> for Timestamp {
460 type Error = TryFromDecimalError;
461
462 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
463 Ok(Self {
464 internal: value.try_into()?,
465 })
466 }
467}
468
469impl columnation::Columnation for Timestamp {
470 type InnerRegion = columnation::CopyRegion<Timestamp>;
471}
472
473mod differential {
474 use differential_dataflow::IntoOwned;
475
476 use crate::Timestamp;
477
478 impl<'a> IntoOwned<'a> for Timestamp {
479 type Owned = Self;
480
481 fn into_owned(self) -> Self::Owned {
482 self
483 }
484
485 fn clone_onto(self, other: &mut Self::Owned) {
486 *other = self;
487 }
488
489 fn borrow_as(owned: &'a Self::Owned) -> Self {
490 *owned
491 }
492 }
493}