mz_orchestrator/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use bytesize::ByteSize;
17use chrono::{DateTime, Utc};
18use derivative::Derivative;
19use futures_core::stream::BoxStream;
20use mz_ore::cast::CastFrom;
21use serde::de::Unexpected;
22use serde::{Deserialize, Deserializer, Serialize};
23
24/// An orchestrator manages services.
25///
26/// A service is a set of one or more processes running the same image. See
27/// [`ServiceConfig`] for details.
28///
29/// All services live within a namespace. A namespace allows multiple users to
30/// share an orchestrator without conflicting: each user can only create,
31/// delete, and list the services within their namespace. Namespaces are not
32/// isolated at the network level, however: services in one namespace can
33/// communicate with services in another namespace with no restrictions.
34///
35/// Services **must** be tolerant of running as part of a distributed system. In
36/// particular, services **must** be prepared for the possibility that there are
37/// two live processes with the same identity. This can happen, for example,
38/// when the machine hosting a process *appears* to fail, from the perspective
39/// of the orchestrator, and so the orchestrator restarts the process on another
40/// machine, but in fact the original machine is still alive, just on the
41/// opposite side of a network partition. Be sure to design any communication
42/// with other services (e.g., an external database) to correctly handle
43/// competing communication from another incarnation of the service.
44///
45/// The intent is that you can implement `Orchestrator` with pods in Kubernetes,
46/// containers in Docker, or processes on your local machine.
47pub trait Orchestrator: fmt::Debug + Send + Sync {
48    /// Enter a namespace in the orchestrator.
49    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator>;
50}
51
52/// An orchestrator restricted to a single namespace.
53#[async_trait]
54pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync {
55    /// Ensures that a service with the given configuration is running.
56    ///
57    /// If a service with the same ID already exists, its configuration is
58    /// updated to match `config`. This may or may not involve restarting the
59    /// service, depending on whether the existing service matches `config`.
60    fn ensure_service(
61        &self,
62        id: &str,
63        config: ServiceConfig,
64    ) -> Result<Box<dyn Service>, anyhow::Error>;
65
66    /// Drops the identified service, if it exists.
67    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error>;
68
69    /// Lists the identifiers of all known services.
70    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error>;
71
72    /// Watch for status changes of all known services.
73    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>>;
74
75    /// Gets resource usage metrics for all processes associated with a service.
76    ///
77    /// Returns `Err` if the entire process failed. Returns `Ok(v)` otherwise,
78    /// with one element in `v` for each process of the service,
79    /// even in not all metrics could be collected for all processes.
80    /// In such a case, the corresponding fields of `ServiceProcessMetrics` will be `None`.
81    async fn fetch_service_metrics(
82        &self,
83        id: &str,
84    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error>;
85
86    fn update_scheduling_config(&self, config: scheduling_config::ServiceSchedulingConfig);
87}
88
89/// An event describing a status change of an orchestrated service.
90#[derive(Debug, Clone, Serialize)]
91pub struct ServiceEvent {
92    pub service_id: String,
93    pub process_id: u64,
94    pub status: ServiceStatus,
95    pub time: DateTime<Utc>,
96}
97
98/// Why the service is not ready, if known
99#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
100pub enum OfflineReason {
101    OomKilled,
102    Initializing,
103}
104
105impl fmt::Display for OfflineReason {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        match self {
108            OfflineReason::OomKilled => f.write_str("oom-killed"),
109            OfflineReason::Initializing => f.write_str("initializing"),
110        }
111    }
112}
113
114/// Describes the status of an orchestrated service.
115#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
116pub enum ServiceStatus {
117    /// Service is ready to accept requests.
118    Online,
119    /// Service is not ready to accept requests.
120    /// The inner element is `None` if the reason
121    /// is unknown
122    Offline(Option<OfflineReason>),
123}
124
125impl ServiceStatus {
126    /// Returns the service status as a kebab-case string.
127    pub fn as_kebab_case_str(&self) -> &'static str {
128        match self {
129            ServiceStatus::Online => "online",
130            ServiceStatus::Offline(_) => "offline",
131        }
132    }
133}
134
135/// Describes a running service managed by an `Orchestrator`.
136pub trait Service: fmt::Debug + Send + Sync {
137    /// Given the name of a port, returns the addresses for each of the
138    /// service's processes, in order.
139    ///
140    /// Panics if `port` does not name a valid port.
141    fn addresses(&self, port: &str) -> Vec<String>;
142}
143
144#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
145pub struct ServiceProcessMetrics {
146    pub cpu_nano_cores: Option<u64>,
147    pub memory_bytes: Option<u64>,
148    pub disk_usage_bytes: Option<u64>,
149}
150
151/// A simple language for describing assertions about a label's existence and value.
152///
153/// Used by [`LabelSelector`].
154#[derive(Clone, Debug)]
155pub enum LabelSelectionLogic {
156    /// The label exists and its value equals the given value.
157    /// Equivalent to `InSet { values: vec![value] }`
158    Eq { value: String },
159    /// Either the label does not exist, or it exists
160    /// but its value does not equal the given value.
161    /// Equivalent to `NotInSet { values: vec![value] }`
162    NotEq { value: String },
163    /// The label exists.
164    Exists,
165    /// The label does not exist.
166    NotExists,
167    /// The label exists and its value is one of the given values.
168    InSet { values: Vec<String> },
169    /// Either the label does not exist, or it exists
170    /// but its value is not one of the given values.
171    NotInSet { values: Vec<String> },
172}
173
174/// A simple language for describing whether a label
175/// exists and whether the value corresponding to it is in some set.
176/// Intended to correspond to the capabilities offered by Kubernetes label selectors,
177/// but without directly exposing Kubernetes API code to consumers of this module.
178#[derive(Clone, Debug)]
179pub struct LabelSelector {
180    /// The name of the label
181    pub label_name: String,
182    /// An assertion about the existence and value of a label
183    /// named `label_name`
184    pub logic: LabelSelectionLogic,
185}
186
187/// Describes the desired state of a service.
188#[derive(Derivative)]
189#[derivative(Debug)]
190pub struct ServiceConfig {
191    /// An opaque identifier for the executable or container image to run.
192    ///
193    /// Often names a container on Docker Hub or a path on the local machine.
194    pub image: String,
195    /// For the Kubernetes orchestrator, this is an init container to
196    /// configure for the pod running the service.
197    pub init_container_image: Option<String>,
198    /// A function that generates the arguments for each process of the service
199    /// given the assigned listen addresses for each named port.
200    #[derivative(Debug = "ignore")]
201    pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
202    /// Ports to expose.
203    pub ports: Vec<ServicePort>,
204    /// An optional limit on the memory that the service can use.
205    pub memory_limit: Option<MemoryLimit>,
206    /// An optional request on the memory that the service can use. If unspecified,
207    /// use the same value as `memory_limit`.
208    pub memory_request: Option<MemoryLimit>,
209    /// An optional limit on the CPU that the service can use.
210    pub cpu_limit: Option<CpuLimit>,
211    /// The number of copies of this service to run.
212    pub scale: u16,
213    /// Arbitrary key–value pairs to attach to the service in the orchestrator
214    /// backend.
215    ///
216    /// The orchestrator backend may apply a prefix to the key if appropriate.
217    pub labels: BTreeMap<String, String>,
218    /// The availability zones the service can be run in. If no availability
219    /// zones are specified, the orchestrator is free to choose one.
220    pub availability_zones: Option<Vec<String>>,
221    /// A set of label selectors selecting all _other_ services that are replicas of this one.
222    ///
223    /// This may be used to implement anti-affinity. If _all_ such selectors
224    /// match for a given service, this service should not be co-scheduled on
225    /// a machine with that service.
226    ///
227    /// The orchestrator backend may or may not actually implement anti-affinity functionality.
228    pub other_replicas_selector: Vec<LabelSelector>,
229    /// A set of label selectors selecting all services that are replicas of this one,
230    /// including itself.
231    ///
232    /// This may be used to implement placement spread.
233    ///
234    /// The orchestrator backend may or may not actually implement placement spread functionality.
235    pub replicas_selector: Vec<LabelSelector>,
236
237    /// Whether scratch disk space should be allocated for the service.
238    pub disk: bool,
239    /// The maximum amount of scratch disk space that the service is allowed to consume.
240    pub disk_limit: Option<DiskLimit>,
241    /// Node selector for this service.
242    pub node_selector: BTreeMap<String, String>,
243}
244
245/// A named port associated with a service.
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct ServicePort {
248    /// A descriptive name for the port.
249    ///
250    /// Note that not all orchestrator backends make use of port names.
251    pub name: String,
252    /// The desired port number.
253    ///
254    /// Not all orchestrator backends will make use of the hint.
255    pub port_hint: u16,
256}
257
258/// Assignments that the orchestrator has made for a process in a service.
259#[derive(Clone, Debug)]
260pub struct ServiceAssignments<'a> {
261    /// For each specified [`ServicePort`] name, a listen address.
262    pub listen_addrs: &'a BTreeMap<String, String>,
263    /// The listen addresses of each peer in the service.
264    ///
265    /// The order of peers is significant. Each peer is uniquely identified by its position in the
266    /// list.
267    pub peer_addrs: &'a [BTreeMap<String, String>],
268}
269
270impl ServiceAssignments<'_> {
271    /// Return the peer addresses for the specified [`ServicePort`] name.
272    pub fn peer_addresses(&self, name: &str) -> Vec<String> {
273        self.peer_addrs.iter().map(|a| a[name].clone()).collect()
274    }
275}
276
277/// Describes a limit on memory.
278#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
279pub struct MemoryLimit(pub ByteSize);
280
281impl MemoryLimit {
282    pub const MAX: Self = Self(ByteSize(u64::MAX));
283}
284
285impl<'de> Deserialize<'de> for MemoryLimit {
286    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
287    where
288        D: Deserializer<'de>,
289    {
290        <String as Deserialize>::deserialize(deserializer)
291            .and_then(|s| {
292                ByteSize::from_str(&s).map_err(|_e| {
293                    use serde::de::Error;
294                    D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
295                })
296            })
297            .map(MemoryLimit)
298    }
299}
300
301impl Serialize for MemoryLimit {
302    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
303    where
304        S: serde::Serializer,
305    {
306        <String as Serialize>::serialize(&self.0.to_string(), serializer)
307    }
308}
309
310/// Describes a limit on CPU resources.
311#[derive(Debug, Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
312pub struct CpuLimit {
313    millicpus: usize,
314}
315
316impl CpuLimit {
317    pub const MAX: Self = Self::from_millicpus(usize::MAX / 1_000_000);
318
319    /// Constructs a new CPU limit from a number of millicpus.
320    pub const fn from_millicpus(millicpus: usize) -> CpuLimit {
321        CpuLimit { millicpus }
322    }
323
324    /// Returns the CPU limit in millicpus.
325    pub fn as_millicpus(&self) -> usize {
326        self.millicpus
327    }
328
329    /// Returns the CPU limit in nanocpus.
330    pub fn as_nanocpus(&self) -> u64 {
331        // The largest possible value of a u64 is
332        // 18_446_744_073_709_551_615,
333        // so we won't overflow this
334        // unless we have an instance with
335        // ~18.45 billion cores.
336        //
337        // Such an instance seems unrealistic,
338        // at least until we raise another few rounds
339        // of funding ...
340
341        u64::cast_from(self.millicpus)
342            .checked_mul(1_000_000)
343            .expect("Nano-CPUs must be representable")
344    }
345}
346
347impl<'de> Deserialize<'de> for CpuLimit {
348    // TODO(benesch): remove this once this function no longer makes use of
349    // potentially dangerous `as` conversions.
350    #[allow(clippy::as_conversions)]
351    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
352    where
353        D: serde::Deserializer<'de>,
354    {
355        // Note -- we just round off any precision beyond 0.001 here.
356        let float = f64::deserialize(deserializer)?;
357        let millicpus = (float * 1000.).round();
358        if millicpus < 0. || millicpus > (std::usize::MAX as f64) {
359            use serde::de::Error;
360            Err(D::Error::invalid_value(
361                Unexpected::Float(float),
362                &"a float representing a plausible number of CPUs",
363            ))
364        } else {
365            Ok(Self {
366                millicpus: millicpus as usize,
367            })
368        }
369    }
370}
371
372impl Serialize for CpuLimit {
373    // TODO(benesch): remove this once this function no longer makes use of
374    // potentially dangerous `as` conversions.
375    #[allow(clippy::as_conversions)]
376    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
377    where
378        S: serde::Serializer,
379    {
380        <f64 as Serialize>::serialize(&(self.millicpus as f64 / 1000.0), serializer)
381    }
382}
383
384/// Describes a limit on disk usage.
385#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
386pub struct DiskLimit(pub ByteSize);
387
388impl DiskLimit {
389    pub const ZERO: Self = Self(ByteSize(0));
390    pub const MAX: Self = Self(ByteSize(u64::MAX));
391    pub const ARBITRARY: Self = Self(ByteSize::gib(1));
392}
393
394impl<'de> Deserialize<'de> for DiskLimit {
395    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
396    where
397        D: Deserializer<'de>,
398    {
399        <String as Deserialize>::deserialize(deserializer)
400            .and_then(|s| {
401                ByteSize::from_str(&s).map_err(|_e| {
402                    use serde::de::Error;
403                    D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
404                })
405            })
406            .map(DiskLimit)
407    }
408}
409
410impl Serialize for DiskLimit {
411    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
412    where
413        S: serde::Serializer,
414    {
415        <String as Serialize>::serialize(&self.0.to_string(), serializer)
416    }
417}
418
419/// Configuration for how services are scheduled. These may be ignored by orchestrator
420/// implementations.
421pub mod scheduling_config {
422    #[derive(Debug, Clone)]
423    pub struct ServiceTopologySpreadConfig {
424        /// If `true`, enable spread for replicated services.
425        ///
426        /// Defaults to `true`.
427        pub enabled: bool,
428        /// If `true`, ignore services with `scale` > 1 when expressing
429        /// spread constraints.
430        ///
431        /// Default to `true`.
432        pub ignore_non_singular_scale: bool,
433        /// The `maxSkew` for spread constraints.
434        /// See
435        /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
436        /// for more details.
437        ///
438        /// Defaults to `1`.
439        pub max_skew: i32,
440        /// The `minDomains` for spread constraints.
441        /// See
442        /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
443        /// for more details.
444        ///
445        /// Defaults to None.
446        pub min_domains: Option<i32>,
447        /// If `true`, make the spread constraints into a preference.
448        ///
449        /// Defaults to `false`.
450        pub soft: bool,
451    }
452
453    #[derive(Debug, Clone)]
454    pub struct ServiceSchedulingConfig {
455        /// If `Some`, add a affinity preference with the given
456        /// weight for services that horizontally scale.
457        ///
458        /// Defaults to `Some(100)`.
459        pub multi_pod_az_affinity_weight: Option<i32>,
460        /// If `true`, make the node-scope anti-affinity between
461        /// replicated services a preference over a constraint.
462        ///
463        /// Defaults to `false`.
464        pub soften_replication_anti_affinity: bool,
465        /// The weight for `soften_replication_anti_affinity.
466        ///
467        /// Defaults to `100`.
468        pub soften_replication_anti_affinity_weight: i32,
469        /// Configuration for `TopologySpreadConstraint`'s
470        pub topology_spread: ServiceTopologySpreadConfig,
471        /// If `true`, make the az-scope node affinity soft.
472        ///
473        /// Defaults to `false`.
474        pub soften_az_affinity: bool,
475        /// The weight for `soften_replication_anti_affinity.
476        ///
477        /// Defaults to `100`.
478        pub soften_az_affinity_weight: i32,
479        /// Whether to always provision a replica with disk,
480        /// regardless of `DISK` DDL option.
481        ///
482        /// Defaults to `false`.
483        pub always_use_disk: bool,
484        // Whether to enable security context for the service.
485        pub security_context_enabled: bool,
486    }
487
488    pub const DEFAULT_POD_AZ_AFFINITY_WEIGHT: Option<i32> = Some(100);
489    pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY: bool = false;
490    pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: i32 = 100;
491
492    pub const DEFAULT_TOPOLOGY_SPREAD_ENABLED: bool = true;
493    pub const DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: bool = true;
494    pub const DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW: i32 = 1;
495    pub const DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN: Option<i32> = None;
496    pub const DEFAULT_TOPOLOGY_SPREAD_SOFT: bool = false;
497
498    pub const DEFAULT_SOFTEN_AZ_AFFINITY: bool = false;
499    pub const DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT: i32 = 100;
500    pub const DEFAULT_ALWAYS_USE_DISK: bool = false;
501    pub const DEFAULT_SECURITY_CONTEXT_ENABLED: bool = true;
502
503    impl Default for ServiceSchedulingConfig {
504        fn default() -> Self {
505            ServiceSchedulingConfig {
506                multi_pod_az_affinity_weight: DEFAULT_POD_AZ_AFFINITY_WEIGHT,
507                soften_replication_anti_affinity: DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY,
508                soften_replication_anti_affinity_weight:
509                    DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
510                topology_spread: ServiceTopologySpreadConfig {
511                    enabled: DEFAULT_TOPOLOGY_SPREAD_ENABLED,
512                    ignore_non_singular_scale: DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
513                    max_skew: DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW,
514                    min_domains: DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN,
515                    soft: DEFAULT_TOPOLOGY_SPREAD_SOFT,
516                },
517                soften_az_affinity: DEFAULT_SOFTEN_AZ_AFFINITY,
518                soften_az_affinity_weight: DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT,
519                always_use_disk: DEFAULT_ALWAYS_USE_DISK,
520                security_context_enabled: DEFAULT_SECURITY_CONTEXT_ENABLED,
521            }
522        }
523    }
524}