1#![warn(missing_docs)]
13
14use std::fmt;
15use std::ops::Deref;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use mz_ore::now::NowFn;
22use serde::{Deserialize, Serialize};
23
24pub mod client;
25pub mod metrics;
26
27#[derive(Clone)]
33pub struct WallclockLagFn<T>(Arc<dyn Fn(T) -> Duration + Send + Sync>);
34
35impl<T: Into<mz_repr::Timestamp>> WallclockLagFn<T> {
36 pub fn new(now: NowFn) -> Self {
38 let inner = Arc::new(move |time: T| {
39 let time_ts: mz_repr::Timestamp = time.into();
40 let time_ms: u64 = time_ts.into();
41 let lag_ms = now().saturating_sub(time_ms);
42 let lag_s = lag_ms.div_ceil(1000);
43 Duration::from_secs(lag_s)
44 });
45 Self(inner)
46 }
47}
48
49impl<T> Deref for WallclockLagFn<T> {
50 type Target = dyn Fn(T) -> Duration + Send + Sync;
51
52 fn deref(&self) -> &Self::Target {
53 &(*self.0)
54 }
55}
56
57#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
59pub enum ReplicaId {
60 User(u64),
62 System(u64),
64}
65
66impl ReplicaId {
67 pub fn inner_id(&self) -> u64 {
69 match self {
70 ReplicaId::User(id) => *id,
71 ReplicaId::System(id) => *id,
72 }
73 }
74
75 pub fn is_user(&self) -> bool {
77 matches!(self, Self::User(_))
78 }
79
80 pub fn is_system(&self) -> bool {
82 matches!(self, Self::System(_))
83 }
84}
85
86impl fmt::Display for ReplicaId {
87 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88 match self {
89 Self::User(id) => write!(f, "u{}", id),
90 Self::System(id) => write!(f, "s{}", id),
91 }
92 }
93}
94
95impl FromStr for ReplicaId {
96 type Err = anyhow::Error;
97
98 fn from_str(s: &str) -> Result<Self, Self::Err> {
99 let first = s.chars().next();
100 let rest = s.get(1..);
101 if let (Some(prefix), Some(num)) = (first, rest) {
102 let id = num.parse()?;
103 match prefix {
104 'u' => return Ok(Self::User(id)),
105 's' => return Ok(Self::System(id)),
106 _ => (),
107 }
108 }
109
110 bail!("invalid replica ID: {}", s);
111 }
112}