1use std::num::NonZeroI64;
13
14use mz_proto::{ProtoType, RustType, TryFromProtoError};
15use proptest::prelude::{Arbitrary, any};
16use proptest::strategy::{BoxedStrategy, Strategy};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19
20include!(concat!(env!("OUT_DIR"), "/mz_cluster_client.client.rs"));
21
22#[derive(PartialEq, Eq, Debug, Copy, Clone, Serialize, Deserialize)]
32pub struct ClusterStartupEpoch {
33 envd: NonZeroI64,
35 replica: u64,
37}
38
39impl ClusterStartupEpoch {
40 pub fn bump_replica(&mut self) {
42 self.replica += 1;
43 }
44}
45
46impl RustType<ProtoClusterStartupEpoch> for ClusterStartupEpoch {
47 fn into_proto(&self) -> ProtoClusterStartupEpoch {
48 let Self { envd, replica } = self;
49 ProtoClusterStartupEpoch {
50 envd: envd.get(),
51 replica: *replica,
52 }
53 }
54
55 fn from_proto(proto: ProtoClusterStartupEpoch) -> Result<Self, TryFromProtoError> {
56 let ProtoClusterStartupEpoch { envd, replica } = proto;
57 Ok(Self {
58 envd: envd.try_into().unwrap(),
59 replica,
60 })
61 }
62}
63
64impl Arbitrary for ClusterStartupEpoch {
65 type Strategy = BoxedStrategy<Self>;
66 type Parameters = ();
67
68 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
69 (any::<i64>(), any::<u64>())
70 .prop_map(|(envd, replica)| ClusterStartupEpoch {
71 envd: NonZeroI64::new(if envd == 0 { envd + 1 } else { envd }).unwrap(),
72 replica,
73 })
74 .boxed()
75 }
76}
77
78impl ClusterStartupEpoch {
79 pub fn new(envd: NonZeroI64, replica: u64) -> Self {
81 Self { envd, replica }
82 }
83
84 pub fn to_bytes(&self) -> [u8; 16] {
86 let mut ret = [0; 16];
87 let mut p = &mut ret[..];
88 use std::io::Write;
89 p.write_all(&self.envd.get().to_be_bytes()[..]).unwrap();
90 p.write_all(&self.replica.to_be_bytes()[..]).unwrap();
91 ret
92 }
93
94 pub fn from_bytes(bytes: [u8; 16]) -> Self {
96 let envd = i64::from_be_bytes((&bytes[0..8]).try_into().unwrap());
97 let replica = u64::from_be_bytes((&bytes[8..16]).try_into().unwrap());
98 Self {
99 envd: envd.try_into().unwrap(),
100 replica,
101 }
102 }
103
104 pub fn envd(&self) -> NonZeroI64 {
106 self.envd
107 }
108
109 pub fn replica(&self) -> u64 {
111 self.replica
112 }
113}
114
115impl std::fmt::Display for ClusterStartupEpoch {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 let Self { envd, replica } = self;
118 write!(f, "({envd}, {replica})")
119 }
120}
121
122impl PartialOrd for ClusterStartupEpoch {
123 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124 Some(self.cmp(other))
125 }
126}
127
128impl Ord for ClusterStartupEpoch {
129 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
130 let Self { envd, replica } = self;
131 let Self {
132 envd: other_envd,
133 replica: other_replica,
134 } = other;
135 (envd, replica).cmp(&(other_envd, other_replica))
136 }
137}
138
139#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
141pub struct TimelyConfig {
142 pub workers: usize,
144 pub process: usize,
146 pub addresses: Vec<String>,
148 pub arrangement_exert_proportionality: u32,
157 pub enable_zero_copy: bool,
159 pub enable_zero_copy_lgalloc: bool,
161 pub zero_copy_limit: Option<usize>,
163 pub enable_create_sockets_v2: bool,
167}
168
169impl RustType<ProtoTimelyConfig> for TimelyConfig {
170 fn into_proto(&self) -> ProtoTimelyConfig {
171 ProtoTimelyConfig {
172 workers: self.workers.into_proto(),
173 addresses: self.addresses.into_proto(),
174 process: self.process.into_proto(),
175 arrangement_exert_proportionality: self.arrangement_exert_proportionality,
176 enable_zero_copy: self.enable_zero_copy,
177 enable_zero_copy_lgalloc: self.enable_zero_copy_lgalloc,
178 zero_copy_limit: self.zero_copy_limit.into_proto(),
179 enable_create_sockets_v2: self.enable_create_sockets_v2,
180 }
181 }
182
183 fn from_proto(proto: ProtoTimelyConfig) -> Result<Self, TryFromProtoError> {
184 Ok(Self {
185 process: proto.process.into_rust()?,
186 workers: proto.workers.into_rust()?,
187 addresses: proto.addresses.into_rust()?,
188 arrangement_exert_proportionality: proto.arrangement_exert_proportionality,
189 enable_zero_copy: proto.enable_zero_copy,
190 enable_zero_copy_lgalloc: proto.enable_zero_copy_lgalloc,
191 zero_copy_limit: proto.zero_copy_limit.into_rust()?,
192 enable_create_sockets_v2: proto.enable_create_sockets_v2,
193 })
194 }
195}
196
197impl TimelyConfig {
198 pub fn split_command(&self, parts: usize) -> Vec<Self> {
200 (0..parts)
201 .map(|part| TimelyConfig {
202 process: part,
203 ..self.clone()
204 })
205 .collect()
206 }
207}
208
209pub trait TryIntoTimelyConfig {
212 fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self>
215 where
216 Self: Sized;
217}
218
219#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClusterReplicaLocation {
222 pub ctl_addrs: Vec<String>,
226 pub dataflow_addrs: Vec<String>,
231 pub workers: usize,
233}
234
235#[cfg(test)]
236mod tests {
237 use mz_ore::assert_ok;
238 use mz_proto::protobuf_roundtrip;
239 use proptest::prelude::ProptestConfig;
240 use proptest::proptest;
241
242 use super::*;
243
244 proptest! {
245 #![proptest_config(ProptestConfig::with_cases(32))]
246
247 #[mz_ore::test]
248 #[cfg_attr(miri, ignore)] fn timely_config_protobuf_roundtrip(expect in any::<TimelyConfig>() ) {
250 let actual = protobuf_roundtrip::<_, ProtoTimelyConfig>(&expect);
251 assert_ok!(actual);
252 assert_eq!(actual.unwrap(), expect);
253 }
254
255 #[mz_ore::test]
256 #[cfg_attr(miri, ignore)] fn cluster_startup_epoch_protobuf_roundtrip(expect in any::<ClusterStartupEpoch>() ) {
258 let actual = protobuf_roundtrip::<_, ProtoClusterStartupEpoch>(&expect);
259 assert_ok!(actual);
260 assert_eq!(actual.unwrap(), expect);
261 }
262 }
263}