mz_cluster_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for commands to clusters.
11
12use std::num::NonZeroI64;
13
14use mz_proto::{ProtoType, RustType, TryFromProtoError};
15use proptest::prelude::{Arbitrary, any};
16use proptest::strategy::{BoxedStrategy, Strategy};
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19
20include!(concat!(env!("OUT_DIR"), "/mz_cluster_client.client.rs"));
21
22/// A value generated by environmentd and passed to the clusterd processes
23/// to help them disambiguate different `CreateTimely` commands.
24///
25/// The semantics of this value are not important, except that they
26/// must be totally ordered, and any value (for a given replica) must
27/// be greater than any that were generated before (for that replica).
28/// This is the reason for having two
29/// components (one from the catalog storage that increases on every environmentd restart,
30/// another in-memory and local to the current incarnation of environmentd)
31#[derive(PartialEq, Eq, Debug, Copy, Clone, Serialize, Deserialize)]
32pub struct ClusterStartupEpoch {
33    /// The environment incarnation.
34    envd: NonZeroI64,
35    /// The replica incarnation.
36    replica: u64,
37}
38
39impl ClusterStartupEpoch {
40    /// Increases the replica incarnation counter.
41    pub fn bump_replica(&mut self) {
42        self.replica += 1;
43    }
44}
45
46impl RustType<ProtoClusterStartupEpoch> for ClusterStartupEpoch {
47    fn into_proto(&self) -> ProtoClusterStartupEpoch {
48        let Self { envd, replica } = self;
49        ProtoClusterStartupEpoch {
50            envd: envd.get(),
51            replica: *replica,
52        }
53    }
54
55    fn from_proto(proto: ProtoClusterStartupEpoch) -> Result<Self, TryFromProtoError> {
56        let ProtoClusterStartupEpoch { envd, replica } = proto;
57        Ok(Self {
58            envd: envd.try_into().unwrap(),
59            replica,
60        })
61    }
62}
63
64impl Arbitrary for ClusterStartupEpoch {
65    type Strategy = BoxedStrategy<Self>;
66    type Parameters = ();
67
68    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
69        (any::<i64>(), any::<u64>())
70            .prop_map(|(envd, replica)| ClusterStartupEpoch {
71                envd: NonZeroI64::new(if envd == 0 { envd + 1 } else { envd }).unwrap(),
72                replica,
73            })
74            .boxed()
75    }
76}
77
78impl ClusterStartupEpoch {
79    /// Construct a new cluster startup epoch, from the environment epoch and replica incarnation.
80    pub fn new(envd: NonZeroI64, replica: u64) -> Self {
81        Self { envd, replica }
82    }
83
84    /// Serialize for transfer over the network
85    pub fn to_bytes(&self) -> [u8; 16] {
86        let mut ret = [0; 16];
87        let mut p = &mut ret[..];
88        use std::io::Write;
89        p.write_all(&self.envd.get().to_be_bytes()[..]).unwrap();
90        p.write_all(&self.replica.to_be_bytes()[..]).unwrap();
91        ret
92    }
93
94    /// Inverse of `to_bytes`
95    pub fn from_bytes(bytes: [u8; 16]) -> Self {
96        let envd = i64::from_be_bytes((&bytes[0..8]).try_into().unwrap());
97        let replica = u64::from_be_bytes((&bytes[8..16]).try_into().unwrap());
98        Self {
99            envd: envd.try_into().unwrap(),
100            replica,
101        }
102    }
103
104    /// The environment epoch.
105    pub fn envd(&self) -> NonZeroI64 {
106        self.envd
107    }
108
109    /// The replica incarnation.
110    pub fn replica(&self) -> u64 {
111        self.replica
112    }
113}
114
115impl std::fmt::Display for ClusterStartupEpoch {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        let Self { envd, replica } = self;
118        write!(f, "({envd}, {replica})")
119    }
120}
121
122impl PartialOrd for ClusterStartupEpoch {
123    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
124        Some(self.cmp(other))
125    }
126}
127
128impl Ord for ClusterStartupEpoch {
129    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
130        let Self { envd, replica } = self;
131        let Self {
132            envd: other_envd,
133            replica: other_replica,
134        } = other;
135        (envd, replica).cmp(&(other_envd, other_replica))
136    }
137}
138
139/// Configuration of the cluster we will spin up
140#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
141pub struct TimelyConfig {
142    /// Number of per-process worker threads
143    pub workers: usize,
144    /// Identity of this process
145    pub process: usize,
146    /// Addresses of all processes
147    pub addresses: Vec<String>,
148    /// Proportionality value that decides whether to exert additional arrangement merge effort.
149    ///
150    /// Specifically, additional merge effort is exerted when the size of the second-largest batch
151    /// in an arrangement is within a factor of `arrangement_exert_proportionality` of the size of
152    /// the largest batch, or when a merge is already in progress.
153    ///
154    /// The higher the proportionality value, the more eagerly arrangement batches are merged. A
155    /// value of `0` (or `1`) disables eager merging.
156    pub arrangement_exert_proportionality: u32,
157    /// Whether to use the zero copy allocator.
158    pub enable_zero_copy: bool,
159    /// Whether to use lgalloc to back the zero copy allocator.
160    pub enable_zero_copy_lgalloc: bool,
161    /// Optional limit on the number of empty buffers retained by the zero copy allocator.
162    pub zero_copy_limit: Option<usize>,
163    /// Whether to enable the new version of `create_sockets`.
164    ///
165    /// This flag exists to facilitate a slow rollout and is expected to be temporary.
166    pub enable_create_sockets_v2: bool,
167}
168
169impl RustType<ProtoTimelyConfig> for TimelyConfig {
170    fn into_proto(&self) -> ProtoTimelyConfig {
171        ProtoTimelyConfig {
172            workers: self.workers.into_proto(),
173            addresses: self.addresses.into_proto(),
174            process: self.process.into_proto(),
175            arrangement_exert_proportionality: self.arrangement_exert_proportionality,
176            enable_zero_copy: self.enable_zero_copy,
177            enable_zero_copy_lgalloc: self.enable_zero_copy_lgalloc,
178            zero_copy_limit: self.zero_copy_limit.into_proto(),
179            enable_create_sockets_v2: self.enable_create_sockets_v2,
180        }
181    }
182
183    fn from_proto(proto: ProtoTimelyConfig) -> Result<Self, TryFromProtoError> {
184        Ok(Self {
185            process: proto.process.into_rust()?,
186            workers: proto.workers.into_rust()?,
187            addresses: proto.addresses.into_rust()?,
188            arrangement_exert_proportionality: proto.arrangement_exert_proportionality,
189            enable_zero_copy: proto.enable_zero_copy,
190            enable_zero_copy_lgalloc: proto.enable_zero_copy_lgalloc,
191            zero_copy_limit: proto.zero_copy_limit.into_rust()?,
192            enable_create_sockets_v2: proto.enable_create_sockets_v2,
193        })
194    }
195}
196
197impl TimelyConfig {
198    /// Split the timely configuration into `parts` pieces, each with a different `process` number.
199    pub fn split_command(&self, parts: usize) -> Vec<Self> {
200        (0..parts)
201            .map(|part| TimelyConfig {
202                process: part,
203                ..self.clone()
204            })
205            .collect()
206    }
207}
208
209/// A trait for specific cluster commands that can be unpacked into
210/// `CreateTimely` variants.
211pub trait TryIntoTimelyConfig {
212    /// Attempt to unpack `self` into a `(TimelyConfig, ClusterStartupEpoch)`. Otherwise,
213    /// fail and return `self` back.
214    fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self>
215    where
216        Self: Sized;
217}
218
219/// Specifies the location of a cluster replica.
220#[derive(Clone, Debug, Serialize, Deserialize)]
221pub struct ClusterReplicaLocation {
222    /// The network addresses of the cluster control endpoints for each process in
223    /// the replica. Connections from the controller to these addresses
224    /// are sent commands, and send responses back.
225    pub ctl_addrs: Vec<String>,
226    /// The network addresses of the dataflow (Timely) endpoints for
227    /// each process in the replica. These are used for _internal_
228    /// networking, that is, timely worker communicating messages
229    /// between themselves.
230    pub dataflow_addrs: Vec<String>,
231    /// The workers per process in the replica.
232    pub workers: usize,
233}
234
235#[cfg(test)]
236mod tests {
237    use mz_ore::assert_ok;
238    use mz_proto::protobuf_roundtrip;
239    use proptest::prelude::ProptestConfig;
240    use proptest::proptest;
241
242    use super::*;
243
244    proptest! {
245        #![proptest_config(ProptestConfig::with_cases(32))]
246
247        #[mz_ore::test]
248        #[cfg_attr(miri, ignore)] // slow
249        fn timely_config_protobuf_roundtrip(expect in any::<TimelyConfig>() ) {
250            let actual = protobuf_roundtrip::<_, ProtoTimelyConfig>(&expect);
251            assert_ok!(actual);
252            assert_eq!(actual.unwrap(), expect);
253        }
254
255        #[mz_ore::test]
256        #[cfg_attr(miri, ignore)] // slow
257        fn cluster_startup_epoch_protobuf_roundtrip(expect in any::<ClusterStartupEpoch>() ) {
258            let actual = protobuf_roundtrip::<_, ProtoClusterStartupEpoch>(&expect);
259            assert_ok!(actual);
260            assert_eq!(actual.unwrap(), expect);
261        }
262    }
263}