#![allow(missing_docs)]
use std::num::NonZeroI64;
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
include!(concat!(env!("OUT_DIR"), "/mz_cluster_client.client.rs"));
#[derive(PartialEq, Eq, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ClusterStartupEpoch {
envd: NonZeroI64,
replica: u64,
}
impl RustType<ProtoClusterStartupEpoch> for ClusterStartupEpoch {
fn into_proto(&self) -> ProtoClusterStartupEpoch {
let Self { envd, replica } = self;
ProtoClusterStartupEpoch {
envd: envd.get(),
replica: *replica,
}
}
fn from_proto(proto: ProtoClusterStartupEpoch) -> Result<Self, TryFromProtoError> {
let ProtoClusterStartupEpoch { envd, replica } = proto;
Ok(Self {
envd: envd.try_into().unwrap(),
replica,
})
}
}
impl Arbitrary for ClusterStartupEpoch {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(any::<i64>(), any::<u64>())
.prop_map(|(envd, replica)| ClusterStartupEpoch {
envd: NonZeroI64::new(if envd == 0 { envd + 1 } else { envd }).unwrap(),
replica,
})
.boxed()
}
}
impl ClusterStartupEpoch {
pub fn new(envd: NonZeroI64, replica: u64) -> Self {
Self { envd, replica }
}
pub fn to_bytes(&self) -> [u8; 16] {
let mut ret = [0; 16];
let mut p = &mut ret[..];
use std::io::Write;
p.write_all(&self.envd.get().to_be_bytes()[..]).unwrap();
p.write_all(&self.replica.to_be_bytes()[..]).unwrap();
ret
}
pub fn from_bytes(bytes: [u8; 16]) -> Self {
let envd = i64::from_be_bytes((&bytes[0..8]).try_into().unwrap());
let replica = u64::from_be_bytes((&bytes[8..16]).try_into().unwrap());
Self {
envd: envd.try_into().unwrap(),
replica,
}
}
pub fn envd(&self) -> NonZeroI64 {
self.envd
}
pub fn replica(&self) -> u64 {
self.replica
}
}
impl std::fmt::Display for ClusterStartupEpoch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { envd, replica } = self;
write!(f, "({envd}, {replica})")
}
}
impl PartialOrd for ClusterStartupEpoch {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ClusterStartupEpoch {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
let Self { envd, replica } = self;
let Self {
envd: other_envd,
replica: other_replica,
} = other;
(envd, replica).cmp(&(other_envd, other_replica))
}
}
#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct TimelyConfig {
pub workers: usize,
pub process: usize,
pub addresses: Vec<String>,
pub arrangement_exert_proportionality: u32,
}
impl RustType<ProtoTimelyConfig> for TimelyConfig {
fn into_proto(&self) -> ProtoTimelyConfig {
ProtoTimelyConfig {
workers: self.workers.into_proto(),
addresses: self.addresses.into_proto(),
process: self.process.into_proto(),
arrangement_exert_proportionality: self.arrangement_exert_proportionality,
}
}
fn from_proto(proto: ProtoTimelyConfig) -> Result<Self, TryFromProtoError> {
Ok(Self {
process: proto.process.into_rust()?,
workers: proto.workers.into_rust()?,
addresses: proto.addresses.into_rust()?,
arrangement_exert_proportionality: proto.arrangement_exert_proportionality,
})
}
}
impl TimelyConfig {
pub fn split_command(&self, parts: usize) -> Vec<Self> {
(0..parts)
.map(|part| TimelyConfig {
process: part,
..self.clone()
})
.collect()
}
}
pub trait TryIntoTimelyConfig {
fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self>
where
Self: Sized;
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterReplicaLocation {
pub ctl_addrs: Vec<String>,
pub dataflow_addrs: Vec<String>,
pub workers: usize,
}
#[cfg(test)]
mod tests {
use mz_proto::protobuf_roundtrip;
use proptest::prelude::ProptestConfig;
use proptest::proptest;
use super::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(32))]
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn timely_config_protobuf_roundtrip(expect in any::<TimelyConfig>() ) {
let actual = protobuf_roundtrip::<_, ProtoTimelyConfig>(&expect);
assert!(actual.is_ok());
assert_eq!(actual.unwrap(), expect);
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn cluster_startup_epoch_protobuf_roundtrip(expect in any::<ClusterStartupEpoch>() ) {
let actual = protobuf_roundtrip::<_, ProtoClusterStartupEpoch>(&expect);
assert!(actual.is_ok());
assert_eq!(actual.unwrap(), expect);
}
}
}