mz_cluster_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The public API for both compute and storage.
11
12#![warn(missing_docs)]
13
14use std::fmt;
15use std::ops::Deref;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use mz_ore::now::NowFn;
22use serde::{Deserialize, Serialize};
23
24pub mod client;
25pub mod metrics;
26
27/// A function that computes the lag between the given time and wallclock time.
28///
29/// Because sources usually tick once per second and we collect wallclock lag measurements once per
30/// second, the measured lag can be off by up to 1s. To reflect this uncertainty, we report lag
31/// values rounded to seconds. We always round up to avoid underreporting.
32#[derive(Clone)]
33pub struct WallclockLagFn<T>(Arc<dyn Fn(T) -> Duration + Send + Sync>);
34
35impl<T: Into<mz_repr::Timestamp>> WallclockLagFn<T> {
36    /// Create a new [`WallclockLagFn`].
37    pub fn new(now: NowFn) -> Self {
38        let inner = Arc::new(move |time: T| {
39            let time_ts: mz_repr::Timestamp = time.into();
40            let time_ms: u64 = time_ts.into();
41            let lag_ms = now().saturating_sub(time_ms);
42            let lag_s = lag_ms.div_ceil(1000);
43            Duration::from_secs(lag_s)
44        });
45        Self(inner)
46    }
47}
48
49impl<T> Deref for WallclockLagFn<T> {
50    type Target = dyn Fn(T) -> Duration + Send + Sync;
51
52    fn deref(&self) -> &Self::Target {
53        &(*self.0)
54    }
55}
56
57/// Identifier of a replica.
58#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
59pub enum ReplicaId {
60    /// A user replica.
61    User(u64),
62    /// A system replica.
63    System(u64),
64}
65
66impl ReplicaId {
67    /// Return the inner numeric ID value.
68    pub fn inner_id(&self) -> u64 {
69        match self {
70            ReplicaId::User(id) => *id,
71            ReplicaId::System(id) => *id,
72        }
73    }
74
75    /// Whether this value identifies a user replica.
76    pub fn is_user(&self) -> bool {
77        matches!(self, Self::User(_))
78    }
79
80    /// Whether this value identifies a system replica.
81    pub fn is_system(&self) -> bool {
82        matches!(self, Self::System(_))
83    }
84}
85
86impl fmt::Display for ReplicaId {
87    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88        match self {
89            Self::User(id) => write!(f, "u{}", id),
90            Self::System(id) => write!(f, "s{}", id),
91        }
92    }
93}
94
95impl FromStr for ReplicaId {
96    type Err = anyhow::Error;
97
98    fn from_str(s: &str) -> Result<Self, Self::Err> {
99        let first = s.chars().next();
100        let rest = s.get(1..);
101        if let (Some(prefix), Some(num)) = (first, rest) {
102            let id = num.parse()?;
103            match prefix {
104                'u' => return Ok(Self::User(id)),
105                's' => return Ok(Self::System(id)),
106                _ => (),
107            }
108        }
109
110        bail!("invalid replica ID: {}", s);
111    }
112}