1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! The public API for both compute and storage.
1112#![warn(missing_docs)]
1314use std::fmt;
15use std::ops::Deref;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
1920use anyhow::bail;
21use mz_ore::now::NowFn;
22use serde::{Deserialize, Serialize};
2324pub mod client;
25pub mod metrics;
2627/// A function that computes the lag between the given time and wallclock time.
28///
29/// Because sources usually tick once per second and we collect wallclock lag measurements once per
30/// second, the measured lag can be off by up to 1s. To reflect this uncertainty, we report lag
31/// values rounded to seconds. We always round up to avoid underreporting.
32#[derive(Clone)]
33pub struct WallclockLagFn<T>(Arc<dyn Fn(T) -> Duration + Send + Sync>);
3435impl<T: Into<mz_repr::Timestamp>> WallclockLagFn<T> {
36/// Create a new [`WallclockLagFn`].
37pub fn new(now: NowFn) -> Self {
38let inner = Arc::new(move |time: T| {
39let time_ts: mz_repr::Timestamp = time.into();
40let time_ms: u64 = time_ts.into();
41let lag_ms = now().saturating_sub(time_ms);
42let lag_s = lag_ms.div_ceil(1000);
43 Duration::from_secs(lag_s)
44 });
45Self(inner)
46 }
47}
4849impl<T> Deref for WallclockLagFn<T> {
50type Target = dyn Fn(T) -> Duration + Send + Sync;
5152fn deref(&self) -> &Self::Target {
53&(*self.0)
54 }
55}
5657/// Identifier of a replica.
58#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
59pub enum ReplicaId {
60/// A user replica.
61User(u64),
62/// A system replica.
63System(u64),
64}
6566impl ReplicaId {
67/// Return the inner numeric ID value.
68pub fn inner_id(&self) -> u64 {
69match self {
70 ReplicaId::User(id) => *id,
71 ReplicaId::System(id) => *id,
72 }
73 }
7475/// Whether this value identifies a user replica.
76pub fn is_user(&self) -> bool {
77matches!(self, Self::User(_))
78 }
7980/// Whether this value identifies a system replica.
81pub fn is_system(&self) -> bool {
82matches!(self, Self::System(_))
83 }
84}
8586impl fmt::Display for ReplicaId {
87fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
88match self {
89Self::User(id) => write!(f, "u{}", id),
90Self::System(id) => write!(f, "s{}", id),
91 }
92 }
93}
9495impl FromStr for ReplicaId {
96type Err = anyhow::Error;
9798fn from_str(s: &str) -> Result<Self, Self::Err> {
99let first = s.chars().next();
100let rest = s.get(1..);
101if let (Some(prefix), Some(num)) = (first, rest) {
102let id = num.parse()?;
103match prefix {
104'u' => return Ok(Self::User(id)),
105's' => return Ok(Self::System(id)),
106_ => (),
107 }
108 }
109110bail!("invalid replica ID: {}", s);
111 }
112}