Skip to main content

mz_cluster_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The public API for both compute and storage.
11
12#![warn(missing_docs)]
13
14use std::fmt;
15use std::ops::Deref;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use mz_ore::now::NowFn;
22use serde::{Deserialize, Serialize};
23
24pub mod client;
25pub mod metrics;
26
27/// A function that computes the lag between the given time and wallclock time.
28///
29/// Because sources usually tick once per second and we collect wallclock lag measurements once per
30/// second, the measured lag can be off by up to 1s. To reflect this uncertainty, we report lag
31/// values rounded to seconds. We always round up to avoid underreporting.
32#[derive(Clone)]
33pub struct WallclockLagFn<T>(Arc<dyn Fn(T) -> Duration + Send + Sync>);
34
35impl<T: Into<mz_repr::Timestamp>> WallclockLagFn<T> {
36    /// Create a new [`WallclockLagFn`].
37    pub fn new(now: NowFn) -> Self {
38        let inner = Arc::new(move |time: T| {
39            let time_ts: mz_repr::Timestamp = time.into();
40            let time_ms: u64 = time_ts.into();
41            let lag_ms = now().saturating_sub(time_ms);
42            let lag_s = lag_ms.div_ceil(1000);
43            Duration::from_secs(lag_s)
44        });
45        Self(inner)
46    }
47}
48
49impl<T> Deref for WallclockLagFn<T> {
50    type Target = dyn Fn(T) -> Duration + Send + Sync;
51
52    fn deref(&self) -> &Self::Target {
53        &(*self.0)
54    }
55}
56
57/// Identifier of a replica.
58#[derive(
59    Clone,
60    Copy,
61    Debug,
62    Eq,
63    PartialEq,
64    Ord,
65    PartialOrd,
66    Hash,
67    Serialize,
68    Deserialize
69)]
70pub enum ReplicaId {
71    /// A user replica.
72    User(u64),
73    /// A system replica.
74    System(u64),
75}
76
77impl ReplicaId {
78    /// Return the inner numeric ID value.
79    pub fn inner_id(&self) -> u64 {
80        match self {
81            ReplicaId::User(id) => *id,
82            ReplicaId::System(id) => *id,
83        }
84    }
85
86    /// Whether this value identifies a user replica.
87    pub fn is_user(&self) -> bool {
88        matches!(self, Self::User(_))
89    }
90
91    /// Whether this value identifies a system replica.
92    pub fn is_system(&self) -> bool {
93        matches!(self, Self::System(_))
94    }
95}
96
97impl fmt::Display for ReplicaId {
98    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99        match self {
100            Self::User(id) => write!(f, "u{}", id),
101            Self::System(id) => write!(f, "s{}", id),
102        }
103    }
104}
105
106impl FromStr for ReplicaId {
107    type Err = anyhow::Error;
108
109    fn from_str(s: &str) -> Result<Self, Self::Err> {
110        let first = s.chars().next();
111        let rest = s.get(1..);
112        if let (Some(prefix), Some(num)) = (first, rest) {
113            let id = num.parse()?;
114            match prefix {
115                'u' => return Ok(Self::User(id)),
116                's' => return Ok(Self::System(id)),
117                _ => (),
118            }
119        }
120
121        bail!("invalid replica ID: {}", s);
122    }
123}