1#![warn(missing_docs)]
13
14use std::fmt;
15use std::ops::Deref;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use mz_ore::now::NowFn;
22use serde::{Deserialize, Serialize};
23
24pub mod client;
25pub mod metrics;
26
27#[derive(Clone)]
33pub struct WallclockLagFn<T>(Arc<dyn Fn(T) -> Duration + Send + Sync>);
34
35impl<T: Into<mz_repr::Timestamp>> WallclockLagFn<T> {
36 pub fn new(now: NowFn) -> Self {
38 let inner = Arc::new(move |time: T| {
39 let time_ts: mz_repr::Timestamp = time.into();
40 let time_ms: u64 = time_ts.into();
41 let lag_ms = now().saturating_sub(time_ms);
42 let lag_s = lag_ms.div_ceil(1000);
43 Duration::from_secs(lag_s)
44 });
45 Self(inner)
46 }
47}
48
49impl<T> Deref for WallclockLagFn<T> {
50 type Target = dyn Fn(T) -> Duration + Send + Sync;
51
52 fn deref(&self) -> &Self::Target {
53 &(*self.0)
54 }
55}
56
57#[derive(
59 Clone,
60 Copy,
61 Debug,
62 Eq,
63 PartialEq,
64 Ord,
65 PartialOrd,
66 Hash,
67 Serialize,
68 Deserialize
69)]
70pub enum ReplicaId {
71 User(u64),
73 System(u64),
75}
76
77impl ReplicaId {
78 pub fn inner_id(&self) -> u64 {
80 match self {
81 ReplicaId::User(id) => *id,
82 ReplicaId::System(id) => *id,
83 }
84 }
85
86 pub fn is_user(&self) -> bool {
88 matches!(self, Self::User(_))
89 }
90
91 pub fn is_system(&self) -> bool {
93 matches!(self, Self::System(_))
94 }
95}
96
97impl fmt::Display for ReplicaId {
98 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99 match self {
100 Self::User(id) => write!(f, "u{}", id),
101 Self::System(id) => write!(f, "s{}", id),
102 }
103 }
104}
105
106impl FromStr for ReplicaId {
107 type Err = anyhow::Error;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 let first = s.chars().next();
111 let rest = s.get(1..);
112 if let (Some(prefix), Some(num)) = (first, rest) {
113 let id = num.parse()?;
114 match prefix {
115 'u' => return Ok(Self::User(id)),
116 's' => return Ok(Self::System(id)),
117 _ => (),
118 }
119 }
120
121 bail!("invalid replica ID: {}", s);
122 }
123}