mz_orchestrator/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use bytesize::ByteSize;
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use futures_core::stream::BoxStream;
20use mz_ore::cast::CastFrom;
21use serde::de::Unexpected;
22use serde::{Deserialize, Deserializer, Serialize};
23
24/// An orchestrator manages services.
25///
26/// A service is a set of one or more processes running the same image. See
27/// [`ServiceConfig`] for details.
28///
29/// All services live within a namespace. A namespace allows multiple users to
30/// share an orchestrator without conflicting: each user can only create,
31/// delete, and list the services within their namespace. Namespaces are not
32/// isolated at the network level, however: services in one namespace can
33/// communicate with services in another namespace with no restrictions.
34///
35/// Services **must** be tolerant of running as part of a distributed system. In
36/// particular, services **must** be prepared for the possibility that there are
37/// two live processes with the same identity. This can happen, for example,
38/// when the machine hosting a process *appears* to fail, from the perspective
39/// of the orchestrator, and so the orchestrator restarts the process on another
40/// machine, but in fact the original machine is still alive, just on the
41/// opposite side of a network partition. Be sure to design any communication
42/// with other services (e.g., an external database) to correctly handle
43/// competing communication from another incarnation of the service.
44///
45/// The intent is that you can implement `Orchestrator` with pods in Kubernetes,
46/// containers in Docker, or processes on your local machine.
47pub trait Orchestrator: fmt::Debug + Send + Sync {
48 /// Enter a namespace in the orchestrator.
49 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator>;
50}
51
52/// An orchestrator restricted to a single namespace.
53#[async_trait]
54pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync {
55 /// Ensures that a service with the given configuration is running.
56 ///
57 /// If a service with the same ID already exists, its configuration is
58 /// updated to match `config`. This may or may not involve restarting the
59 /// service, depending on whether the existing service matches `config`.
60 fn ensure_service(
61 &self,
62 id: &str,
63 config: ServiceConfig,
64 ) -> Result<Box<dyn Service>, anyhow::Error>;
65
66 /// Drops the identified service, if it exists.
67 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error>;
68
69 /// Lists the identifiers of all known services.
70 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error>;
71
72 /// Watch for status changes of all known services.
73 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>>;
74
75 /// Gets resource usage metrics for all processes associated with a service.
76 ///
77 /// Returns `Err` if the entire process failed. Returns `Ok(v)` otherwise,
78 /// with one element in `v` for each process of the service,
79 /// even in not all metrics could be collected for all processes.
80 /// In such a case, the corresponding fields of `ServiceProcessMetrics` will be `None`.
81 async fn fetch_service_metrics(
82 &self,
83 id: &str,
84 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error>;
85
86 fn update_scheduling_config(&self, config: scheduling_config::ServiceSchedulingConfig);
87}
88
89/// An event describing a status change of an orchestrated service.
90#[derive(Debug, Clone, Serialize)]
91pub struct ServiceEvent {
92 pub service_id: String,
93 pub process_id: u64,
94 pub status: ServiceStatus,
95 pub time: DateTime<Utc>,
96}
97
98/// Why the service is not ready, if known
99#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
100pub enum OfflineReason {
101 OomKilled,
102 Initializing,
103}
104
105impl fmt::Display for OfflineReason {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 match self {
108 OfflineReason::OomKilled => f.write_str("oom-killed"),
109 OfflineReason::Initializing => f.write_str("initializing"),
110 }
111 }
112}
113
114/// Describes the status of an orchestrated service.
115#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
116pub enum ServiceStatus {
117 /// Service is ready to accept requests.
118 Online,
119 /// Service is not ready to accept requests.
120 /// The inner element is `None` if the reason
121 /// is unknown
122 Offline(Option<OfflineReason>),
123}
124
125impl ServiceStatus {
126 /// Returns the service status as a kebab-case string.
127 pub fn as_kebab_case_str(&self) -> &'static str {
128 match self {
129 ServiceStatus::Online => "online",
130 ServiceStatus::Offline(_) => "offline",
131 }
132 }
133}
134
135/// Describes a running service managed by an `Orchestrator`.
136pub trait Service: fmt::Debug + Send + Sync {
137 /// Given the name of a port, returns the addresses for each of the
138 /// service's processes, in order.
139 ///
140 /// Panics if `port` does not name a valid port.
141 fn addresses(&self, port: &str) -> Vec<String>;
142}
143
144#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
145pub struct ServiceProcessMetrics {
146 pub cpu_nano_cores: Option<u64>,
147 pub memory_bytes: Option<u64>,
148 pub disk_bytes: Option<u64>,
149 pub heap_bytes: Option<u64>,
150 pub heap_limit: Option<u64>,
151}
152
153/// A simple language for describing assertions about a label's existence and value.
154///
155/// Used by [`LabelSelector`].
156#[derive(Clone, Debug)]
157pub enum LabelSelectionLogic {
158 /// The label exists and its value equals the given value.
159 /// Equivalent to `InSet { values: vec![value] }`
160 Eq { value: String },
161 /// Either the label does not exist, or it exists
162 /// but its value does not equal the given value.
163 /// Equivalent to `NotInSet { values: vec![value] }`
164 NotEq { value: String },
165 /// The label exists.
166 Exists,
167 /// The label does not exist.
168 NotExists,
169 /// The label exists and its value is one of the given values.
170 InSet { values: Vec<String> },
171 /// Either the label does not exist, or it exists
172 /// but its value is not one of the given values.
173 NotInSet { values: Vec<String> },
174}
175
176/// A simple language for describing whether a label
177/// exists and whether the value corresponding to it is in some set.
178/// Intended to correspond to the capabilities offered by Kubernetes label selectors,
179/// but without directly exposing Kubernetes API code to consumers of this module.
180#[derive(Clone, Debug)]
181pub struct LabelSelector {
182 /// The name of the label
183 pub label_name: String,
184 /// An assertion about the existence and value of a label
185 /// named `label_name`
186 pub logic: LabelSelectionLogic,
187}
188
189/// Describes the desired state of a service.
190#[derive(Derivative)]
191#[derivative(Debug)]
192pub struct ServiceConfig {
193 /// An opaque identifier for the executable or container image to run.
194 ///
195 /// Often names a container on Docker Hub or a path on the local machine.
196 pub image: String,
197 /// For the Kubernetes orchestrator, this is an init container to
198 /// configure for the pod running the service.
199 pub init_container_image: Option<String>,
200 /// A function that generates the arguments for each process of the service
201 /// given the assigned listen addresses for each named port.
202 #[derivative(Debug = "ignore")]
203 pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
204 /// Ports to expose.
205 pub ports: Vec<ServicePort>,
206 /// An optional limit on the memory that the service can use.
207 pub memory_limit: Option<MemoryLimit>,
208 /// An optional request on the memory that the service can use. If unspecified,
209 /// use the same value as `memory_limit`.
210 pub memory_request: Option<MemoryLimit>,
211 /// An optional limit on the CPU that the service can use.
212 pub cpu_limit: Option<CpuLimit>,
213 /// The number of copies of this service to run.
214 pub scale: u16,
215 /// Arbitrary key–value pairs to attach to the service in the orchestrator
216 /// backend.
217 ///
218 /// The orchestrator backend may apply a prefix to the key if appropriate.
219 pub labels: BTreeMap<String, String>,
220 /// Arbitrary key–value pairs to attach to the service as annotations in the
221 /// orchestrator backend.
222 ///
223 /// The orchestrator backend may apply a prefix to the key if appropriate.
224 pub annotations: BTreeMap<String, String>,
225 /// The availability zones the service can be run in. If no availability
226 /// zones are specified, the orchestrator is free to choose one.
227 pub availability_zones: Option<Vec<String>>,
228 /// A set of label selectors selecting all _other_ services that are replicas of this one.
229 ///
230 /// This may be used to implement anti-affinity. If _all_ such selectors
231 /// match for a given service, this service should not be co-scheduled on
232 /// a machine with that service.
233 ///
234 /// The orchestrator backend may or may not actually implement anti-affinity functionality.
235 pub other_replicas_selector: Vec<LabelSelector>,
236 /// A set of label selectors selecting all services that are replicas of this one,
237 /// including itself.
238 ///
239 /// This may be used to implement placement spread.
240 ///
241 /// The orchestrator backend may or may not actually implement placement spread functionality.
242 pub replicas_selector: Vec<LabelSelector>,
243
244 /// The maximum amount of scratch disk space that the service is allowed to consume.
245 pub disk_limit: Option<DiskLimit>,
246 /// Node selector for this service.
247 pub node_selector: BTreeMap<String, String>,
248}
249
250/// A named port associated with a service.
251#[derive(Debug, Clone, PartialEq, Eq)]
252pub struct ServicePort {
253 /// A descriptive name for the port.
254 ///
255 /// Note that not all orchestrator backends make use of port names.
256 pub name: String,
257 /// The desired port number.
258 ///
259 /// Not all orchestrator backends will make use of the hint.
260 pub port_hint: u16,
261}
262
263/// Assignments that the orchestrator has made for a process in a service.
264#[derive(Clone, Debug)]
265pub struct ServiceAssignments<'a> {
266 /// For each specified [`ServicePort`] name, a listen address.
267 pub listen_addrs: &'a BTreeMap<String, String>,
268 /// The listen addresses of each peer in the service.
269 ///
270 /// The order of peers is significant. Each peer is uniquely identified by its position in the
271 /// list.
272 pub peer_addrs: &'a [BTreeMap<String, String>],
273}
274
275impl ServiceAssignments<'_> {
276 /// Return the peer addresses for the specified [`ServicePort`] name.
277 pub fn peer_addresses(&self, name: &str) -> Vec<String> {
278 self.peer_addrs.iter().map(|a| a[name].clone()).collect()
279 }
280}
281
282/// Describes a limit on memory.
283#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
284pub struct MemoryLimit(pub ByteSize);
285
286impl MemoryLimit {
287 pub const MAX: Self = Self(ByteSize(u64::MAX));
288}
289
290impl<'de> Deserialize<'de> for MemoryLimit {
291 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
292 where
293 D: Deserializer<'de>,
294 {
295 <String as Deserialize>::deserialize(deserializer)
296 .and_then(|s| {
297 ByteSize::from_str(&s).map_err(|_e| {
298 use serde::de::Error;
299 D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
300 })
301 })
302 .map(MemoryLimit)
303 }
304}
305
306impl Serialize for MemoryLimit {
307 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
308 where
309 S: serde::Serializer,
310 {
311 <String as Serialize>::serialize(&self.0.to_string(), serializer)
312 }
313}
314
315/// Describes a limit on CPU resources.
316#[derive(Debug, Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
317pub struct CpuLimit {
318 millicpus: usize,
319}
320
321impl CpuLimit {
322 pub const MAX: Self = Self::from_millicpus(usize::MAX / 1_000_000);
323
324 /// Constructs a new CPU limit from a number of millicpus.
325 pub const fn from_millicpus(millicpus: usize) -> CpuLimit {
326 CpuLimit { millicpus }
327 }
328
329 /// Returns the CPU limit in millicpus.
330 pub fn as_millicpus(&self) -> usize {
331 self.millicpus
332 }
333
334 /// Returns the CPU limit in nanocpus.
335 pub fn as_nanocpus(&self) -> u64 {
336 // The largest possible value of a u64 is
337 // 18_446_744_073_709_551_615,
338 // so we won't overflow this
339 // unless we have an instance with
340 // ~18.45 billion cores.
341 //
342 // Such an instance seems unrealistic,
343 // at least until we raise another few rounds
344 // of funding ...
345
346 u64::cast_from(self.millicpus)
347 .checked_mul(1_000_000)
348 .expect("Nano-CPUs must be representable")
349 }
350}
351
352impl<'de> Deserialize<'de> for CpuLimit {
353 // TODO(benesch): remove this once this function no longer makes use of
354 // potentially dangerous `as` conversions.
355 #[allow(clippy::as_conversions)]
356 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
357 where
358 D: serde::Deserializer<'de>,
359 {
360 // Note -- we just round off any precision beyond 0.001 here.
361 let float = f64::deserialize(deserializer)?;
362 let millicpus = (float * 1000.).round();
363 if millicpus < 0. || millicpus > (std::usize::MAX as f64) {
364 use serde::de::Error;
365 Err(D::Error::invalid_value(
366 Unexpected::Float(float),
367 &"a float representing a plausible number of CPUs",
368 ))
369 } else {
370 Ok(Self {
371 millicpus: millicpus as usize,
372 })
373 }
374 }
375}
376
377impl Serialize for CpuLimit {
378 // TODO(benesch): remove this once this function no longer makes use of
379 // potentially dangerous `as` conversions.
380 #[allow(clippy::as_conversions)]
381 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
382 where
383 S: serde::Serializer,
384 {
385 <f64 as Serialize>::serialize(&(self.millicpus as f64 / 1000.0), serializer)
386 }
387}
388
389/// Describes a limit on disk usage.
390#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
391pub struct DiskLimit(pub ByteSize);
392
393impl DiskLimit {
394 pub const ZERO: Self = Self(ByteSize(0));
395 pub const MAX: Self = Self(ByteSize(u64::MAX));
396 pub const ARBITRARY: Self = Self(ByteSize::gib(1));
397}
398
399impl<'de> Deserialize<'de> for DiskLimit {
400 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
401 where
402 D: Deserializer<'de>,
403 {
404 <String as Deserialize>::deserialize(deserializer)
405 .and_then(|s| {
406 ByteSize::from_str(&s).map_err(|_e| {
407 use serde::de::Error;
408 D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
409 })
410 })
411 .map(DiskLimit)
412 }
413}
414
415impl Serialize for DiskLimit {
416 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
417 where
418 S: serde::Serializer,
419 {
420 <String as Serialize>::serialize(&self.0.to_string(), serializer)
421 }
422}
423
424/// Configuration for how services are scheduled. These may be ignored by orchestrator
425/// implementations.
426pub mod scheduling_config {
427 #[derive(Debug, Clone)]
428 pub struct ServiceTopologySpreadConfig {
429 /// If `true`, enable spread for replicated services.
430 ///
431 /// Defaults to `true`.
432 pub enabled: bool,
433 /// If `true`, ignore services with `scale` > 1 when expressing
434 /// spread constraints.
435 ///
436 /// Default to `true`.
437 pub ignore_non_singular_scale: bool,
438 /// The `maxSkew` for spread constraints.
439 /// See
440 /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
441 /// for more details.
442 ///
443 /// Defaults to `1`.
444 pub max_skew: i32,
445 /// The `minDomains` for spread constraints.
446 /// See
447 /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
448 /// for more details.
449 ///
450 /// Defaults to None.
451 pub min_domains: Option<i32>,
452 /// If `true`, make the spread constraints into a preference.
453 ///
454 /// Defaults to `false`.
455 pub soft: bool,
456 }
457
458 #[derive(Debug, Clone)]
459 pub struct ServiceSchedulingConfig {
460 /// If `Some`, add a affinity preference with the given
461 /// weight for services that horizontally scale.
462 ///
463 /// Defaults to `Some(100)`.
464 pub multi_pod_az_affinity_weight: Option<i32>,
465 /// If `true`, make the node-scope anti-affinity between
466 /// replicated services a preference over a constraint.
467 ///
468 /// Defaults to `false`.
469 pub soften_replication_anti_affinity: bool,
470 /// The weight for `soften_replication_anti_affinity.
471 ///
472 /// Defaults to `100`.
473 pub soften_replication_anti_affinity_weight: i32,
474 /// Configuration for `TopologySpreadConstraint`'s
475 pub topology_spread: ServiceTopologySpreadConfig,
476 /// If `true`, make the az-scope node affinity soft.
477 ///
478 /// Defaults to `false`.
479 pub soften_az_affinity: bool,
480 /// The weight for `soften_replication_anti_affinity.
481 ///
482 /// Defaults to `100`.
483 pub soften_az_affinity_weight: i32,
484 // Whether to enable security context for the service.
485 pub security_context_enabled: bool,
486 }
487
488 pub const DEFAULT_POD_AZ_AFFINITY_WEIGHT: Option<i32> = Some(100);
489 pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY: bool = false;
490 pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: i32 = 100;
491
492 pub const DEFAULT_TOPOLOGY_SPREAD_ENABLED: bool = true;
493 pub const DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: bool = true;
494 pub const DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW: i32 = 1;
495 pub const DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN: Option<i32> = None;
496 pub const DEFAULT_TOPOLOGY_SPREAD_SOFT: bool = false;
497
498 pub const DEFAULT_SOFTEN_AZ_AFFINITY: bool = false;
499 pub const DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT: i32 = 100;
500 pub const DEFAULT_SECURITY_CONTEXT_ENABLED: bool = true;
501
502 impl Default for ServiceSchedulingConfig {
503 fn default() -> Self {
504 ServiceSchedulingConfig {
505 multi_pod_az_affinity_weight: DEFAULT_POD_AZ_AFFINITY_WEIGHT,
506 soften_replication_anti_affinity: DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY,
507 soften_replication_anti_affinity_weight:
508 DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
509 topology_spread: ServiceTopologySpreadConfig {
510 enabled: DEFAULT_TOPOLOGY_SPREAD_ENABLED,
511 ignore_non_singular_scale: DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
512 max_skew: DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW,
513 min_domains: DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN,
514 soft: DEFAULT_TOPOLOGY_SPREAD_SOFT,
515 },
516 soften_az_affinity: DEFAULT_SOFTEN_AZ_AFFINITY,
517 soften_az_affinity_weight: DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT,
518 security_context_enabled: DEFAULT_SECURITY_CONTEXT_ENABLED,
519 }
520 }
521 }
522}