Skip to main content

mz_orchestrator/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12use std::num::NonZero;
13use std::str::FromStr;
14use std::sync::Arc;
15
16use async_trait::async_trait;
17use bytesize::ByteSize;
18use chrono::{DateTime, Utc};
19use derivative::Derivative;
20use futures_core::stream::BoxStream;
21use mz_ore::cast::CastFrom;
22use serde::de::Unexpected;
23use serde::{Deserialize, Deserializer, Serialize};
24
25/// An orchestrator manages services.
26///
27/// A service is a set of one or more processes running the same image. See
28/// [`ServiceConfig`] for details.
29///
30/// All services live within a namespace. A namespace allows multiple users to
31/// share an orchestrator without conflicting: each user can only create,
32/// delete, and list the services within their namespace. Namespaces are not
33/// isolated at the network level, however: services in one namespace can
34/// communicate with services in another namespace with no restrictions.
35///
36/// Services **must** be tolerant of running as part of a distributed system. In
37/// particular, services **must** be prepared for the possibility that there are
38/// two live processes with the same identity. This can happen, for example,
39/// when the machine hosting a process *appears* to fail, from the perspective
40/// of the orchestrator, and so the orchestrator restarts the process on another
41/// machine, but in fact the original machine is still alive, just on the
42/// opposite side of a network partition. Be sure to design any communication
43/// with other services (e.g., an external database) to correctly handle
44/// competing communication from another incarnation of the service.
45///
46/// The intent is that you can implement `Orchestrator` with pods in Kubernetes,
47/// containers in Docker, or processes on your local machine.
48pub trait Orchestrator: fmt::Debug + Send + Sync {
49    /// Enter a namespace in the orchestrator.
50    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator>;
51}
52
53/// An orchestrator restricted to a single namespace.
54#[async_trait]
55pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync {
56    /// Ensures that a service with the given configuration is running.
57    ///
58    /// If a service with the same ID already exists, its configuration is
59    /// updated to match `config`. This may or may not involve restarting the
60    /// service, depending on whether the existing service matches `config`.
61    fn ensure_service(
62        &self,
63        id: &str,
64        config: ServiceConfig,
65    ) -> Result<Box<dyn Service>, anyhow::Error>;
66
67    /// Drops the identified service, if it exists.
68    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error>;
69
70    /// Lists the identifiers of all known services.
71    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error>;
72
73    /// Watch for status changes of all known services.
74    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>>;
75
76    /// Gets resource usage metrics for all processes associated with a service.
77    ///
78    /// Returns `Err` if the entire process failed. Returns `Ok(v)` otherwise,
79    /// with one element in `v` for each process of the service,
80    /// even in not all metrics could be collected for all processes.
81    /// In such a case, the corresponding fields of `ServiceProcessMetrics` will be `None`.
82    async fn fetch_service_metrics(
83        &self,
84        id: &str,
85    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error>;
86
87    fn update_scheduling_config(&self, config: scheduling_config::ServiceSchedulingConfig);
88}
89
90/// An event describing a status change of an orchestrated service.
91#[derive(Debug, Clone, Serialize)]
92pub struct ServiceEvent {
93    pub service_id: String,
94    pub process_id: u64,
95    pub status: ServiceStatus,
96    pub time: DateTime<Utc>,
97}
98
99/// Why the service is not ready, if known
100#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
101pub enum OfflineReason {
102    OomKilled,
103    Initializing,
104}
105
106impl fmt::Display for OfflineReason {
107    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108        match self {
109            OfflineReason::OomKilled => f.write_str("oom-killed"),
110            OfflineReason::Initializing => f.write_str("initializing"),
111        }
112    }
113}
114
115/// Describes the status of an orchestrated service.
116#[derive(Debug, Clone, Copy, Serialize, Eq, PartialEq)]
117pub enum ServiceStatus {
118    /// Service is ready to accept requests.
119    Online,
120    /// Service is not ready to accept requests.
121    /// The inner element is `None` if the reason
122    /// is unknown
123    Offline(Option<OfflineReason>),
124}
125
126impl ServiceStatus {
127    /// Returns the service status as a kebab-case string.
128    pub fn as_kebab_case_str(&self) -> &'static str {
129        match self {
130            ServiceStatus::Online => "online",
131            ServiceStatus::Offline(_) => "offline",
132        }
133    }
134}
135
136/// Describes a running service managed by an `Orchestrator`.
137pub trait Service: fmt::Debug + Send + Sync {
138    /// Given the name of a port, returns the addresses for each of the
139    /// service's processes, in order.
140    ///
141    /// Panics if `port` does not name a valid port.
142    fn addresses(&self, port: &str) -> Vec<String>;
143
144    /// Given the name of a port, returns TCP-accessible addresses for each of
145    /// the service's processes, in order.
146    ///
147    /// This method is used when HTTP/TCP connectivity is needed (e.g., for
148    /// proxying HTTP requests). For orchestrators that use Unix sockets
149    /// internally (like the process orchestrator), this returns the TCP proxy
150    /// addresses instead of the socket paths.
151    ///
152    /// The default implementation returns the same as `addresses()`, which is
153    /// appropriate for orchestrators that already use TCP addresses (like
154    /// Kubernetes).
155    ///
156    /// Panics if `port` does not name a valid port.
157    fn tcp_addresses(&self, port: &str) -> Vec<String> {
158        self.addresses(port)
159    }
160}
161
162#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
163pub struct ServiceProcessMetrics {
164    pub cpu_nano_cores: Option<u64>,
165    pub memory_bytes: Option<u64>,
166    pub disk_bytes: Option<u64>,
167    pub heap_bytes: Option<u64>,
168    pub heap_limit: Option<u64>,
169}
170
171/// A simple language for describing assertions about a label's existence and value.
172///
173/// Used by [`LabelSelector`].
174#[derive(Clone, Debug)]
175pub enum LabelSelectionLogic {
176    /// The label exists and its value equals the given value.
177    /// Equivalent to `InSet { values: vec![value] }`
178    Eq { value: String },
179    /// Either the label does not exist, or it exists
180    /// but its value does not equal the given value.
181    /// Equivalent to `NotInSet { values: vec![value] }`
182    NotEq { value: String },
183    /// The label exists.
184    Exists,
185    /// The label does not exist.
186    NotExists,
187    /// The label exists and its value is one of the given values.
188    InSet { values: Vec<String> },
189    /// Either the label does not exist, or it exists
190    /// but its value is not one of the given values.
191    NotInSet { values: Vec<String> },
192}
193
194/// A simple language for describing whether a label
195/// exists and whether the value corresponding to it is in some set.
196/// Intended to correspond to the capabilities offered by Kubernetes label selectors,
197/// but without directly exposing Kubernetes API code to consumers of this module.
198#[derive(Clone, Debug)]
199pub struct LabelSelector {
200    /// The name of the label
201    pub label_name: String,
202    /// An assertion about the existence and value of a label
203    /// named `label_name`
204    pub logic: LabelSelectionLogic,
205}
206
207/// Describes the desired state of a service.
208#[derive(Derivative)]
209#[derivative(Debug)]
210pub struct ServiceConfig {
211    /// An opaque identifier for the executable or container image to run.
212    ///
213    /// Often names a container on Docker Hub or a path on the local machine.
214    pub image: String,
215    /// For the Kubernetes orchestrator, this is an init container to
216    /// configure for the pod running the service.
217    pub init_container_image: Option<String>,
218    /// A function that generates the arguments for each process of the service
219    /// given the assigned listen addresses for each named port.
220    #[derivative(Debug = "ignore")]
221    pub args: Box<dyn Fn(ServiceAssignments) -> Vec<String> + Send + Sync>,
222    /// Ports to expose.
223    pub ports: Vec<ServicePort>,
224    /// An optional limit on the memory that the service can use.
225    pub memory_limit: Option<MemoryLimit>,
226    /// An optional request on the memory that the service can use. If unspecified,
227    /// use the same value as `memory_limit`.
228    pub memory_request: Option<MemoryLimit>,
229    /// An optional limit on the CPU that the service can use.
230    pub cpu_limit: Option<CpuLimit>,
231    /// The number of copies of this service to run.
232    pub scale: NonZero<u16>,
233    /// Arbitrary key–value pairs to attach to the service in the orchestrator
234    /// backend.
235    ///
236    /// The orchestrator backend may apply a prefix to the key if appropriate.
237    pub labels: BTreeMap<String, String>,
238    /// Arbitrary key–value pairs to attach to the service as annotations in the
239    /// orchestrator backend.
240    ///
241    /// The orchestrator backend may apply a prefix to the key if appropriate.
242    pub annotations: BTreeMap<String, String>,
243    /// The availability zones the service can be run in. If no availability
244    /// zones are specified, the orchestrator is free to choose one.
245    pub availability_zones: Option<Vec<String>>,
246    /// A set of label selectors selecting all _other_ services that are replicas of this one.
247    ///
248    /// This may be used to implement anti-affinity. If _all_ such selectors
249    /// match for a given service, this service should not be co-scheduled on
250    /// a machine with that service.
251    ///
252    /// The orchestrator backend may or may not actually implement anti-affinity functionality.
253    pub other_replicas_selector: Vec<LabelSelector>,
254    /// A set of label selectors selecting all services that are replicas of this one,
255    /// including itself.
256    ///
257    /// This may be used to implement placement spread.
258    ///
259    /// The orchestrator backend may or may not actually implement placement spread functionality.
260    pub replicas_selector: Vec<LabelSelector>,
261
262    /// The maximum amount of scratch disk space that the service is allowed to consume.
263    pub disk_limit: Option<DiskLimit>,
264    /// Node selector for this service.
265    pub node_selector: BTreeMap<String, String>,
266}
267
268/// A named port associated with a service.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct ServicePort {
271    /// A descriptive name for the port.
272    ///
273    /// Note that not all orchestrator backends make use of port names.
274    pub name: String,
275    /// The desired port number.
276    ///
277    /// Not all orchestrator backends will make use of the hint.
278    pub port_hint: u16,
279}
280
281/// Assignments that the orchestrator has made for a process in a service.
282#[derive(Clone, Debug)]
283pub struct ServiceAssignments<'a> {
284    /// For each specified [`ServicePort`] name, a listen address.
285    pub listen_addrs: &'a BTreeMap<String, String>,
286    /// The listen addresses of each peer in the service.
287    ///
288    /// The order of peers is significant. Each peer is uniquely identified by its position in the
289    /// list.
290    pub peer_addrs: &'a [BTreeMap<String, String>],
291}
292
293impl ServiceAssignments<'_> {
294    /// Return the peer addresses for the specified [`ServicePort`] name.
295    pub fn peer_addresses(&self, name: &str) -> Vec<String> {
296        self.peer_addrs.iter().map(|a| a[name].clone()).collect()
297    }
298}
299
300/// Describes a limit on memory.
301#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
302pub struct MemoryLimit(pub ByteSize);
303
304impl MemoryLimit {
305    pub const MAX: Self = Self(ByteSize(u64::MAX));
306}
307
308impl<'de> Deserialize<'de> for MemoryLimit {
309    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
310    where
311        D: Deserializer<'de>,
312    {
313        <String as Deserialize>::deserialize(deserializer)
314            .and_then(|s| {
315                ByteSize::from_str(&s).map_err(|_e| {
316                    use serde::de::Error;
317                    D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
318                })
319            })
320            .map(MemoryLimit)
321    }
322}
323
324impl Serialize for MemoryLimit {
325    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
326    where
327        S: serde::Serializer,
328    {
329        <String as Serialize>::serialize(&self.0.to_string(), serializer)
330    }
331}
332
333/// Describes a limit on CPU resources.
334#[derive(Debug, Copy, Clone, Eq, Ord, PartialEq, PartialOrd)]
335pub struct CpuLimit {
336    millicpus: usize,
337}
338
339impl CpuLimit {
340    pub const MAX: Self = Self::from_millicpus(usize::MAX / 1_000_000);
341
342    /// Constructs a new CPU limit from a number of millicpus.
343    pub const fn from_millicpus(millicpus: usize) -> CpuLimit {
344        CpuLimit { millicpus }
345    }
346
347    /// Returns the CPU limit in millicpus.
348    pub fn as_millicpus(&self) -> usize {
349        self.millicpus
350    }
351
352    /// Returns the CPU limit in nanocpus.
353    pub fn as_nanocpus(&self) -> u64 {
354        // The largest possible value of a u64 is
355        // 18_446_744_073_709_551_615,
356        // so we won't overflow this
357        // unless we have an instance with
358        // ~18.45 billion cores.
359        //
360        // Such an instance seems unrealistic,
361        // at least until we raise another few rounds
362        // of funding ...
363
364        u64::cast_from(self.millicpus)
365            .checked_mul(1_000_000)
366            .expect("Nano-CPUs must be representable")
367    }
368}
369
370impl<'de> Deserialize<'de> for CpuLimit {
371    // TODO(benesch): remove this once this function no longer makes use of
372    // potentially dangerous `as` conversions.
373    #[allow(clippy::as_conversions)]
374    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
375    where
376        D: serde::Deserializer<'de>,
377    {
378        // Note -- we just round off any precision beyond 0.001 here.
379        let float = f64::deserialize(deserializer)?;
380        let millicpus = (float * 1000.).round();
381        if millicpus < 0. || millicpus > (std::usize::MAX as f64) {
382            use serde::de::Error;
383            Err(D::Error::invalid_value(
384                Unexpected::Float(float),
385                &"a float representing a plausible number of CPUs",
386            ))
387        } else {
388            Ok(Self {
389                millicpus: millicpus as usize,
390            })
391        }
392    }
393}
394
395impl Serialize for CpuLimit {
396    // TODO(benesch): remove this once this function no longer makes use of
397    // potentially dangerous `as` conversions.
398    #[allow(clippy::as_conversions)]
399    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
400    where
401        S: serde::Serializer,
402    {
403        <f64 as Serialize>::serialize(&(self.millicpus as f64 / 1000.0), serializer)
404    }
405}
406
407/// Describes a limit on disk usage.
408#[derive(Copy, Clone, Debug, PartialOrd, Eq, Ord, PartialEq)]
409pub struct DiskLimit(pub ByteSize);
410
411impl DiskLimit {
412    pub const ZERO: Self = Self(ByteSize(0));
413    pub const MAX: Self = Self(ByteSize(u64::MAX));
414    pub const ARBITRARY: Self = Self(ByteSize::gib(1));
415}
416
417impl<'de> Deserialize<'de> for DiskLimit {
418    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
419    where
420        D: Deserializer<'de>,
421    {
422        <String as Deserialize>::deserialize(deserializer)
423            .and_then(|s| {
424                ByteSize::from_str(&s).map_err(|_e| {
425                    use serde::de::Error;
426                    D::Error::invalid_value(serde::de::Unexpected::Str(&s), &"valid size in bytes")
427                })
428            })
429            .map(DiskLimit)
430    }
431}
432
433impl Serialize for DiskLimit {
434    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
435    where
436        S: serde::Serializer,
437    {
438        <String as Serialize>::serialize(&self.0.to_string(), serializer)
439    }
440}
441
442/// Configuration for how services are scheduled. These may be ignored by orchestrator
443/// implementations.
444pub mod scheduling_config {
445    #[derive(Debug, Clone)]
446    pub struct ServiceTopologySpreadConfig {
447        /// If `true`, enable spread for replicated services.
448        ///
449        /// Defaults to `true`.
450        pub enabled: bool,
451        /// If `true`, ignore services with `scale` > 1 when expressing
452        /// spread constraints.
453        ///
454        /// Default to `true`.
455        pub ignore_non_singular_scale: bool,
456        /// The `maxSkew` for spread constraints.
457        /// See
458        /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
459        /// for more details.
460        ///
461        /// Defaults to `1`.
462        pub max_skew: i32,
463        /// The `minDomains` for spread constraints.
464        /// See
465        /// <https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/>
466        /// for more details.
467        ///
468        /// Defaults to None.
469        pub min_domains: Option<i32>,
470        /// If `true`, make the spread constraints into a preference.
471        ///
472        /// Defaults to `false`.
473        pub soft: bool,
474    }
475
476    #[derive(Debug, Clone)]
477    pub struct ServiceSchedulingConfig {
478        /// If `Some`, add a affinity preference with the given
479        /// weight for services that horizontally scale.
480        ///
481        /// Defaults to `Some(100)`.
482        pub multi_pod_az_affinity_weight: Option<i32>,
483        /// If `true`, make the node-scope anti-affinity between
484        /// replicated services a preference over a constraint.
485        ///
486        /// Defaults to `false`.
487        pub soften_replication_anti_affinity: bool,
488        /// The weight for `soften_replication_anti_affinity.
489        ///
490        /// Defaults to `100`.
491        pub soften_replication_anti_affinity_weight: i32,
492        /// Configuration for `TopologySpreadConstraint`'s
493        pub topology_spread: ServiceTopologySpreadConfig,
494        /// If `true`, make the az-scope node affinity soft.
495        ///
496        /// Defaults to `false`.
497        pub soften_az_affinity: bool,
498        /// The weight for `soften_replication_anti_affinity.
499        ///
500        /// Defaults to `100`.
501        pub soften_az_affinity_weight: i32,
502        // Whether to enable security context for the service.
503        pub security_context_enabled: bool,
504    }
505
506    pub const DEFAULT_POD_AZ_AFFINITY_WEIGHT: Option<i32> = Some(100);
507    pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY: bool = false;
508    pub const DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT: i32 = 100;
509
510    pub const DEFAULT_TOPOLOGY_SPREAD_ENABLED: bool = true;
511    pub const DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE: bool = true;
512    pub const DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW: i32 = 1;
513    pub const DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN: Option<i32> = None;
514    pub const DEFAULT_TOPOLOGY_SPREAD_SOFT: bool = false;
515
516    pub const DEFAULT_SOFTEN_AZ_AFFINITY: bool = false;
517    pub const DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT: i32 = 100;
518    pub const DEFAULT_SECURITY_CONTEXT_ENABLED: bool = true;
519
520    impl Default for ServiceSchedulingConfig {
521        fn default() -> Self {
522            ServiceSchedulingConfig {
523                multi_pod_az_affinity_weight: DEFAULT_POD_AZ_AFFINITY_WEIGHT,
524                soften_replication_anti_affinity: DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY,
525                soften_replication_anti_affinity_weight:
526                    DEFAULT_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
527                topology_spread: ServiceTopologySpreadConfig {
528                    enabled: DEFAULT_TOPOLOGY_SPREAD_ENABLED,
529                    ignore_non_singular_scale: DEFAULT_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
530                    max_skew: DEFAULT_TOPOLOGY_SPREAD_MAX_SKEW,
531                    min_domains: DEFAULT_TOPOLOGY_SPREAD_MIN_DOMAIN,
532                    soft: DEFAULT_TOPOLOGY_SPREAD_SOFT,
533                },
534                soften_az_affinity: DEFAULT_SOFTEN_AZ_AFFINITY,
535                soften_az_affinity_weight: DEFAULT_SOFTEN_AZ_AFFINITY_WEIGHT,
536                security_context_enabled: DEFAULT_SECURITY_CONTEXT_ENABLED,
537            }
538        }
539    }
540}