mz_orchestrator/lib.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12use std::num::NonZero;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bytesize::ByteSize;
18use chrono::{DateTime, Utc};
19use derivative::Derivative;
20use futures_core::stream::BoxStream;
21use mz_ore::cast::CastFrom;
22use serde::de::Unexpected;
23use serde::{Deserialize, Deserializer, Serialize};
24
25/// An orchestrator manages services.
26///
27/// A service is a set of one or more processes running the same image. See
28/// [`ServiceConfig`] for details.
29///
30/// All services live within a namespace. A namespace allows multiple users to
31/// share an orchestrator without conflicting: each user can only create,
32/// delete, and list the services within their namespace. Namespaces are not
33/// isolated at the network level, however: services in one namespace can
34/// communicate with services in another namespace with no restrictions.
35///
36/// Services **must** be tolerant of running as part of a distributed system. In
37/// particular, services **must** be prepared for the possibility that there are
38/// two live processes with the same identity. This can happen, for example,
39/// when the machine hosting a process *appears* to fail, from the perspective
40/// of the orchestrator, and so the orchestrator restarts the process on another
41/// machine, but in fact the original machine is still alive, just on the
42/// opposite side of a network partition. Be sure to design any communication
43/// with other services (e.g., an external database) to correctly handle
44/// competing communication from another incarnation of the service.
45///
46/// The intent is that you can implement `Orchestrator` with pods in Kubernetes,
47/// containers in Docker, or processes on your local machine.
48pub trait Orchestrator: fmt::Debug + Send + Sync {
49 /// Enter a namespace in the orchestrator.
50 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator>;
51}
52
53/// An orchestrator restricted to a single namespace.
54#[async_trait]
55pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync {
56 /// Ensures that a service with the given configuration is running.
57 ///
58 /// If a service with the same ID already exists, its configuration is
59 /// updated to match `config`. This may or may not involve restarting the
60 /// service, depending on whether the existing service matches `config`.
61 fn ensure_service(
62 &self,
63 id: &str,
64 config: ServiceConfig,
65 ) -> Result<Box<dyn Service>, anyhow::Error>;
66
67 /// Drops the identified service, if it exists.
68 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error>;
69
70 /// Lists the identifiers of all known services.
71 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error>;
72
73 /// Watch for status changes of all known services.
74 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>>;
75
76 /// Gets resource usage metrics for all processes associated with a service.
77 ///
78 /// Returns `Err` if the entire process failed. Returns `Ok(v)` otherwise,
79 /// with one element in `v` for each process of the service,
80 /// even in not all metrics could be collected for all processes.
81 /// In such a case, the corresponding fields of `ServiceProcessMetrics` will be `None`.
82 async fn fetch_service_metrics(
83 &self,
84 id: &str,
85 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error>;
86
87 fn update_scheduling_config(&self, config: scheduling_config::ServiceSchedulingConfig);
88}
89
90/// An event describing a status change of an orchestrated service.
91#[derive(Debug, Clone, Serialize)]
92pub struct ServiceEvent {
93 pub service_id: String,
94 pub process_id: u64,
95 pub status: ServiceStatus,
96 pub time: DateTime<Utc>,
97}
98
99/// Why the service is not ready, if known
100#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
101pub enum OfflineReason {
102 OomKilled,
103 Initializing,
104}
105
106impl fmt::Display for OfflineReason {
107 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108 match self {
109 OfflineReason::OomKilled => f.write_str("oom-killed"),
110 OfflineReason::Initializing => f.write_str("initializing"),
111 }
112 }
113}
114
115/// Describes the status of an orchestrated service.
116#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
117pub enum ServiceStatus {
118 /// Service is ready to accept requests.
119 Online,
120 /// Service is not ready to accept requests.
121 /// The inner element is `None` if the reason
122 /// is unknown
123 Offline(Option<OfflineReason>),
124}
125
126impl ServiceStatus {
127 /// Returns the service status as a kebab-case string.
128 pub fn as_kebab_case_str(&self) -> &'static str {
129 match self {
130 ServiceStatus::Online => "online",
131 ServiceStatus::Offline(_) => "offline",
132 }
133 }
134}
135
136/// Describes a running service managed by an `Orchestrator`.
137pub trait Service: fmt::Debug + Send + Sync {
138 /// Given the name of a port, returns the addresses for each of the
139 /// service's processes, in order.
140 ///
141 /// Panics if `port` does not name a valid port.
142 fn addresses(&self, port: &str) -> Vec<String>;
143}
144
145#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
146pub struct ServiceProcessMetrics {
147 pub cpu_nano_cores: Option<u64>,
148 pub memory_bytes: Option<u64>,
149 pub disk_bytes: Option<u64>,
150 pub heap_bytes: Option<u64>,
151 pub heap_limit: Option<u64>,
152}
153
154/// A simple language for describing assertions about a label's existence and value.
155///
156/// Used by [`LabelSelector`].
157#[derive(Clone, Debug)]
158pub enum LabelSelectionLogic {
159 /// The label exists and its value equals the given value.
160 /// Equivalent to `InSet { values: vec![value] }`
161 Eq { value: String },
162 /// Either the label does not exist, or it exists
163 /// but its value does not equal the given value.
164 /// Equivalent to `NotInSet { values: vec![value] }`
165 NotEq { value: String },
166 /// The label exists.
167 Exists,
168 /// The label does not exist.
169 NotExists,
170 /// The label exists and its value is one of the given values.
171 InSet { values: Vec<String> },
172 /// Either the label does not exist, or it exists
173 /// but its value is not one of the given values.
174 NotInSet { values: Vec<String> },
175}
176
177/// A simple language for describing whether a label
178/// exists and whether the value corresponding to it is in some set.
179/// Intended to correspond to the capabilities offered by Kubernetes label selectors,
180/// but without directly exposing Kubernetes API code to consumers of this module.
181#[derive(Clone, Debug)]
182pub struct LabelSelector {
183 /// The name of the label
184 pub label_name: String,
185 /// An assertion about the existence and value of a label
186 /// named `label_name`
187 pub logic: LabelSelectionLogic,
188}
189
190/// Describes the desired state of a service.
191#[derive(Derivative)]
192#[derivative(Debug)]
193pub struct ServiceConfig {
194 /// An opaque identifier for the executable or container image to run.
195 ///
196 /// Often names a container on Docker Hub or a path on the local machine.
197 pub image: String,
198 /// For the Kubernetes orchestrator, this is an init container to
199 /// configure for the pod running the service.
200 pub init_container_image: Option<String>,
201 /// A function that generates the arguments for each process of the service
202 /// given the assigned listen addresses for each named port.
203 #[derivative(Debug = "ignore")]
204 pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
205 /// Ports to expose.
206 pub ports: Vec<ServicePort>,
207 /// An optional limit on the memory that the service can use.
208 pub memory_limit: Option<MemoryLimit>,
209 /// An optional request on the memory that the service can use. If unspecified,
210 /// use the same value as `memory_limit`.
211 pub memory_request: Option<MemoryLimit>,
212 /// An optional limit on the CPU that the service can use.
213 pub cpu_limit: Option<CpuLimit>,
214 /// An optional request on the CPU that the service can use.
215 pub cpu_request: Option<CpuLimit>,
216 /// The number of copies of this service to run.
217 pub scale: NonZero<u16>,
218 /// Arbitrary key–value pairs to attach to the service in the orchestrator
219 /// backend.
220 ///
221 /// The orchestrator backend may apply a prefix to the key if appropriate.
222 pub labels: BTreeMap<String, String>,
223 /// Arbitrary key–value pairs to attach to the service as annotations in the
224 /// orchestrator backend.
225 ///
226 /// The orchestrator backend may apply a prefix to the key if appropriate.
227 pub annotations: BTreeMap<String, String>,
228 /// The availability zones the service can be run in. If no availability
229 /// zones are specified, the orchestrator is free to choose one.
230 pub availability_zones: Option<Vec<String>>,
231 /// A set of label selectors selecting all _other_ services that are replicas of this one.
232 ///
233 /// This may be used to implement anti-affinity. If _all_ such selectors
234 /// match for a given service, this service should not be co-scheduled on
235 /// a machine with that service.
236 ///
237 /// The orchestrator backend may or may not actually implement anti-affinity functionality.
238 pub other_replicas_selector: Vec<LabelSelector>,
239 /// A set of label selectors selecting all services that are replicas of this one,
240 /// including itself.
241 ///
242 /// This may be used to implement placement spread.
243 ///
244 /// The orchestrator backend may or may not actually implement placement spread functionality.
245 pub replicas_selector: Vec<LabelSelector>,
246
247 /// The maximum amount of scratch disk space that the service is allowed to consume.
248 pub disk_limit: Option<DiskLimit>,
249 /// Node selector for this service.
250 pub node_selector: BTreeMap<String, String>,
251}
252
253/// A named port associated with a service.
254#[derive(Debug, Clone, PartialEq, Eq)]
255pub struct ServicePort {
256 /// A descriptive name for the port.
257 ///
258 /// Note that not all orchestrator backends make use of port names.
259 pub name: String,
260 /// The desired port number.
261 ///
262 /// Not all orchestrator backends will make use of the hint.
263 pub port_hint: u16,
264}
265
266/// Assignments that the orchestrator has made for a process in a service.
267#[derive(Clone, Debug)]
268pub struct ServiceAssignments<'a> {
269 /// For each specified [`ServicePort`] name, a listen address.
270 pub listen_addrs: &'a BTreeMap<String, String>,
271 /// The listen addresses of each peer in the service.
272 ///
273 /// The order of peers is significant. Each peer is uniquely identified by its position in the
274 /// list.
275 pub peer_addrs: &'a [BTreeMap<String, String>],
276}
277
278impl ServiceAssignments<'_> {
279 /// Return the peer addresses for the specified [`ServicePort`] name.
280 pub fn peer_addresses(&self, name: &str) -> Vec<String> {
281 self.peer_addrs.iter().map(|a| a[name].clone()).collect()
282 }
283}
284
285/// Describes a limit on memory.
286#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
287pub struct MemoryLimit(pub ByteSize);
288
289impl MemoryLimit {
290 pub const MAX: Self = Self(ByteSize(u64::MAX));
291}
292
293impl<'de> Deserialize<'de> for MemoryLimit {
294 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
295 where
296 D: Deserializer<'de>,
297 {
298 <String as Deserialize>::deserialize(deserializer)
299 .and_then(|s| {
300 ByteSize::from_str(&s).map_err(|_e| {
301 use serde::de::Error;
302 D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
303 })
304 })
305 .map(MemoryLimit)
306 }
307}
308
309impl Serialize for MemoryLimit {
310 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
311 where
312 S: serde::Serializer,
313 {
314 <String as Serialize>::serialize(&self.0.to_string(), serializer)
315 }
316}
317
318/// Describes a limit on CPU resources.
319#[derive(Debug, Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
320pub struct CpuLimit {
321 millicpus: usize,
322}
323
324impl CpuLimit {
325 pub const MAX: Self = Self::from_millicpus(usize::MAX / 1_000_000);
326
327 /// Constructs a new CPU limit from a number of millicpus.
328 pub const fn from_millicpus(millicpus: usize) -> CpuLimit {
329 CpuLimit { millicpus }
330 }
331
332 /// Returns the CPU limit in millicpus.
333 pub fn as_millicpus(&self) -> usize {
334 self.millicpus
335 }
336
337 /// Returns the CPU limit in nanocpus.
338 pub fn as_nanocpus(&self) -> u64 {
339 // The largest possible value of a u64 is
340 // 18_446_744_073_709_551_615,
341 // so we won't overflow this
342 // unless we have an instance with
343 // ~18.45 billion cores.
344 //
345 // Such an instance seems unrealistic,
346 // at least until we raise another few rounds
347 // of funding ...
348
349 u64::cast_from(self.millicpus)
350 .checked_mul(1_000_000)
351 .expect("Nano-CPUs must be representable")
352 }
353}
354
355impl<'de> Deserialize<'de> for CpuLimit {
356 // TODO(benesch): remove this once this function no longer makes use of
357 // potentially dangerous `as` conversions.
358 #[allow(clippy::as_conversions)]
359 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
360 where
361 D: serde::Deserializer<'de>,
362 {
363 // Note -- we just round off any precision beyond 0.001 here.
364 let float = f64::deserialize(deserializer)?;
365 let millicpus = (float * 1000.).round();
366 if millicpus < 0. || millicpus > (std::usize::MAX as f64) {
367 use serde::de::Error;
368 Err(D::Error::invalid_value(
369 Unexpected::Float(float),
370 &"a float representing a plausible number of CPUs",
371 ))
372 } else {
373 Ok(Self {
374 millicpus: millicpus as usize,
375 })
376 }
377 }
378}
379
380impl Serialize for CpuLimit {
381 // TODO(benesch): remove this once this function no longer makes use of
382 // potentially dangerous `as` conversions.
383 #[allow(clippy::as_conversions)]
384 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
385 where
386 S: serde::Serializer,
387 {
388 <f64 as Serialize>::serialize(&(self.millicpus as f64 / 1000.0), serializer)
389 }
390}
391
392/// Describes a limit on disk usage.
393#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
394pub struct DiskLimit(pub ByteSize);
395
396impl DiskLimit {
397 pub const ZERO: Self = Self(ByteSize(0));
398 pub const MAX: Self = Self(ByteSize(u64::MAX));
399 pub const ARBITRARY: Self = Self(ByteSize::gib(1));
400}
401
402impl<'de> Deserialize<'de> for DiskLimit {
403 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
404 where
405 D: Deserializer<'de>,
406 {
407 <String as Deserialize>::deserialize(deserializer)
408 .and_then(|s| {
409 ByteSize::from_str(&s).map_err(|_e| {
410 use serde::de::Error;
411 D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
412 })
413 })
414 .map(DiskLimit)
415 }
416}
417
418impl Serialize for DiskLimit {
419 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
420 where
421 S: serde::Serializer,
422 {
423 <String as Serialize>::serialize(&self.0.to_string(), serializer)
424 }
425}
426
427/// Configuration for how services are scheduled. These may be ignored by orchestrator
428/// implementations.
429pub mod scheduling_config {
430 #[derive(Debug, Clone)]
431 pub struct ServiceTopologySpreadConfig {
432 /// If `true`, enable spread for replicated services.
433 ///
434 /// Defaults to `true`.
435 pub enabled: bool,
436 /// If `true`, ignore services with `scale` > 1 when expressing
437 /// spread constraints.
438 ///
439 /// Default to `true`.
440 pub ignore_non_singular_scale: bool,
441 /// The `maxSkew` for spread constraints.
442 /// See
443 /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
444 /// for more details.
445 ///
446 /// Defaults to `1`.
447 pub max_skew: i32,
448 /// The `minDomains` for spread constraints.
449 /// See
450 /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
451 /// for more details.
452 ///
453 /// Defaults to None.
454 pub min_domains: Option<i32>,
455 /// If `true`, make the spread constraints into a preference.
456 ///
457 /// Defaults to `false`.
458 pub soft: bool,
459 }
460
461 #[derive(Debug, Clone)]
462 pub struct ServiceSchedulingConfig {
463 /// If `Some`, add a affinity preference with the given
464 /// weight for services that horizontally scale.
465 ///
466 /// Defaults to `Some(100)`.
467 pub multi_pod_az_affinity_weight: Option<i32>,
468 /// If `true`, make the node-scope anti-affinity between
469 /// replicated services a preference over a constraint.
470 ///
471 /// Defaults to `false`.
472 pub soften_replication_anti_affinity: bool,
473 /// The weight for `soften_replication_anti_affinity.
474 ///
475 /// Defaults to `100`.
476 pub soften_replication_anti_affinity_weight: i32,
477 /// Configuration for `TopologySpreadConstraint`'s
478 pub topology_spread: ServiceTopologySpreadConfig,
479 /// If `true`, make the az-scope node affinity soft.
480 ///
481 /// Defaults to `false`.
482 pub soften_az_affinity: bool,
483 /// The weight for `soften_replication_anti_affinity.
484 ///
485 /// Defaults to `100`.
486 pub soften_az_affinity_weight: i32,
487 // Whether to enable security context for the service.
488 pub security_context_enabled: bool,
489 }
490
491 pub const DEFAULT_POD_AZ_AFFINITY_WEIGHT: Option<i32> = Some(100);
492 pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY: bool = false;
493 pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: i32 = 100;
494
495 pub const DEFAULT_TOPOLOGY_SPREAD_ENABLED: bool = true;
496 pub const DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: bool = true;
497 pub const DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW: i32 = 1;
498 pub const DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN: Option<i32> = None;
499 pub const DEFAULT_TOPOLOGY_SPREAD_SOFT: bool = false;
500
501 pub const DEFAULT_SOFTEN_AZ_AFFINITY: bool = false;
502 pub const DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT: i32 = 100;
503 pub const DEFAULT_SECURITY_CONTEXT_ENABLED: bool = true;
504
505 impl Default for ServiceSchedulingConfig {
506 fn default() -> Self {
507 ServiceSchedulingConfig {
508 multi_pod_az_affinity_weight: DEFAULT_POD_AZ_AFFINITY_WEIGHT,
509 soften_replication_anti_affinity: DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY,
510 soften_replication_anti_affinity_weight:
511 DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
512 topology_spread: ServiceTopologySpreadConfig {
513 enabled: DEFAULT_TOPOLOGY_SPREAD_ENABLED,
514 ignore_non_singular_scale: DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
515 max_skew: DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW,
516 min_domains: DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN,
517 soft: DEFAULT_TOPOLOGY_SPREAD_SOFT,
518 },
519 soften_az_affinity: DEFAULT_SOFTEN_AZ_AFFINITY,
520 soften_az_affinity_weight: DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT,
521 security_context_enabled: DEFAULT_SECURITY_CONTEXT_ENABLED,
522 }
523 }
524 }
525}