mz_orchestrator_kubernetes/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::Utc;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27    Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28    NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29    ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30    PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31    PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32    SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33    Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34    WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38    LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use kube::ResourceExt;
41use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
42use kube::client::Client;
43use kube::error::Error as K8sError;
44use kube::runtime::{WatchStreamExt, watcher};
45use maplit::btreemap;
46use mz_cloud_resources::AwsExternalIdPrefix;
47use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
48use mz_orchestrator::{
49    DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
50    OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
51    ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
52};
53use mz_ore::cast::CastInto;
54use mz_ore::retry::Retry;
55use mz_ore::task::AbortOnDropHandle;
56use serde::Deserialize;
57use sha2::{Digest, Sha256};
58use tokio::sync::{mpsc, oneshot};
59use tracing::{error, info, warn};
60
61pub mod cloud_resource_controller;
62pub mod secrets;
63pub mod util;
64
65const FIELD_MANAGER: &str = "environmentd";
66const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
67
68const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
69
70/// Configures a [`KubernetesOrchestrator`].
71#[derive(Debug, Clone)]
72pub struct KubernetesOrchestratorConfig {
73    /// The name of a Kubernetes context to use, if the Kubernetes configuration
74    /// is loaded from the local kubeconfig.
75    pub context: String,
76    /// The name of a non-default Kubernetes scheduler to use, if any.
77    pub scheduler_name: Option<String>,
78    /// Annotations to install on every service created by the orchestrator.
79    pub service_annotations: BTreeMap<String, String>,
80    /// Labels to install on every service created by the orchestrator.
81    pub service_labels: BTreeMap<String, String>,
82    /// Node selector to install on every service created by the orchestrator.
83    pub service_node_selector: BTreeMap<String, String>,
84    /// Affinity to install on every service created by the orchestrator.
85    pub service_affinity: Option<String>,
86    /// Tolerations to install on every service created by the orchestrator.
87    pub service_tolerations: Option<String>,
88    /// The service account that each service should run as, if any.
89    pub service_account: Option<String>,
90    /// The image pull policy to set for services created by the orchestrator.
91    pub image_pull_policy: KubernetesImagePullPolicy,
92    /// An AWS external ID prefix to use when making AWS operations on behalf
93    /// of the environment.
94    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
95    /// Whether to use code coverage mode or not. Always false for production.
96    pub coverage: bool,
97    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
98    /// services that request disk.
99    ///
100    /// If unspecified, the orchestrator will refuse to create services that
101    /// request disk.
102    pub ephemeral_volume_storage_class: Option<String>,
103    /// The optional fs group for service's pods' `securityContext`.
104    pub service_fs_group: Option<i64>,
105    /// The prefix to prepend to all object names
106    pub name_prefix: Option<String>,
107    /// Whether we should attempt to collect metrics from kubernetes
108    pub collect_pod_metrics: bool,
109    /// Whether to annotate pods for prometheus service discovery.
110    pub enable_prometheus_scrape_annotations: bool,
111}
112
113impl KubernetesOrchestratorConfig {
114    pub fn name_prefix(&self) -> String {
115        self.name_prefix.clone().unwrap_or_default()
116    }
117}
118
119/// Specifies whether Kubernetes should pull Docker images when creating pods.
120#[derive(ValueEnum, Debug, Clone, Copy)]
121pub enum KubernetesImagePullPolicy {
122    /// Always pull the Docker image from the registry.
123    Always,
124    /// Pull the Docker image only if the image is not present.
125    IfNotPresent,
126    /// Never pull the Docker image.
127    Never,
128}
129
130impl fmt::Display for KubernetesImagePullPolicy {
131    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132        match self {
133            KubernetesImagePullPolicy::Always => f.write_str("Always"),
134            KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
135            KubernetesImagePullPolicy::Never => f.write_str("Never"),
136        }
137    }
138}
139
140impl KubernetesImagePullPolicy {
141    pub fn as_kebab_case_str(&self) -> &'static str {
142        match self {
143            Self::Always => "always",
144            Self::IfNotPresent => "if-not-present",
145            Self::Never => "never",
146        }
147    }
148}
149
150/// An orchestrator backed by Kubernetes.
151pub struct KubernetesOrchestrator {
152    client: Client,
153    kubernetes_namespace: String,
154    config: KubernetesOrchestratorConfig,
155    secret_api: Api<Secret>,
156    vpc_endpoint_api: Api<VpcEndpoint>,
157    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
158    resource_reader: Arc<KubernetesResourceReader>,
159}
160
161impl fmt::Debug for KubernetesOrchestrator {
162    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163        f.debug_struct("KubernetesOrchestrator").finish()
164    }
165}
166
167impl KubernetesOrchestrator {
168    /// Creates a new Kubernetes orchestrator from the provided configuration.
169    pub async fn new(
170        config: KubernetesOrchestratorConfig,
171    ) -> Result<KubernetesOrchestrator, anyhow::Error> {
172        let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
173        let resource_reader =
174            Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
175        Ok(KubernetesOrchestrator {
176            client: client.clone(),
177            kubernetes_namespace,
178            config,
179            secret_api: Api::default_namespaced(client.clone()),
180            vpc_endpoint_api: Api::default_namespaced(client),
181            namespaces: Mutex::new(BTreeMap::new()),
182            resource_reader,
183        })
184    }
185}
186
187impl Orchestrator for KubernetesOrchestrator {
188    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
189        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
190        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
191            let (command_tx, command_rx) = mpsc::unbounded_channel();
192            let worker = OrchestratorWorker {
193                metrics_api: Api::default_namespaced(self.client.clone()),
194                service_api: Api::default_namespaced(self.client.clone()),
195                stateful_set_api: Api::default_namespaced(self.client.clone()),
196                pod_api: Api::default_namespaced(self.client.clone()),
197                owner_references: vec![],
198                command_rx,
199                name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
200                collect_pod_metrics: self.config.collect_pod_metrics,
201            }
202            .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
203
204            Arc::new(NamespacedKubernetesOrchestrator {
205                pod_api: Api::default_namespaced(self.client.clone()),
206                kubernetes_namespace: self.kubernetes_namespace.clone(),
207                namespace: namespace.into(),
208                config: self.config.clone(),
209                // TODO(guswynn): make this configurable.
210                scheduling_config: Default::default(),
211                service_infos: std::sync::Mutex::new(BTreeMap::new()),
212                command_tx,
213                _worker: worker,
214            })
215        }))
216    }
217}
218
219#[derive(Clone, Copy)]
220struct ServiceInfo {
221    scale: NonZero<u16>,
222}
223
224struct NamespacedKubernetesOrchestrator {
225    pod_api: Api<Pod>,
226    kubernetes_namespace: String,
227    namespace: String,
228    config: KubernetesOrchestratorConfig,
229    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
230    service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
231    command_tx: mpsc::UnboundedSender<WorkerCommand>,
232    _worker: AbortOnDropHandle<()>,
233}
234
235impl fmt::Debug for NamespacedKubernetesOrchestrator {
236    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
237        f.debug_struct("NamespacedKubernetesOrchestrator")
238            .field("kubernetes_namespace", &self.kubernetes_namespace)
239            .field("namespace", &self.namespace)
240            .field("config", &self.config)
241            .finish()
242    }
243}
244
245/// Commands sent from a [`NamespacedKubernetesOrchestrator`] to its
246/// [`OrchestratorWorker`].
247///
248/// Commands for which the caller expects a result include a `result_tx` on which the
249/// [`OrchestratorWorker`] will deliver the result.
250enum WorkerCommand {
251    EnsureService {
252        desc: ServiceDescription,
253    },
254    DropService {
255        name: String,
256    },
257    ListServices {
258        namespace: String,
259        result_tx: oneshot::Sender<Vec<String>>,
260    },
261    FetchServiceMetrics {
262        name: String,
263        info: ServiceInfo,
264        result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
265    },
266}
267
268/// A description of a service to be created by an [`OrchestratorWorker`].
269#[derive(Debug, Clone)]
270struct ServiceDescription {
271    name: String,
272    scale: NonZero<u16>,
273    service: K8sService,
274    stateful_set: StatefulSet,
275    pod_template_hash: String,
276}
277
278/// A task executing blocking work for a [`NamespacedKubernetesOrchestrator`] in the background.
279///
280/// This type exists to enable making [`NamespacedKubernetesOrchestrator::ensure_service`] and
281/// [`NamespacedKubernetesOrchestrator::drop_service`] non-blocking, allowing invocation of these
282/// methods in latency-sensitive contexts.
283///
284/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
285/// orchestrator calls that query service state (such as `list_services`). These need to be
286/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
287/// ensure that a `list_services` result contains exactly those services that were previously
288/// created with `ensure_service` and not yet dropped with `drop_service`.
289struct OrchestratorWorker {
290    metrics_api: Api<PodMetrics>,
291    service_api: Api<K8sService>,
292    stateful_set_api: Api<StatefulSet>,
293    pod_api: Api<Pod>,
294    owner_references: Vec<OwnerReference>,
295    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
296    name_prefix: String,
297    collect_pod_metrics: bool,
298}
299
300#[derive(Deserialize, Clone, Debug)]
301pub struct PodMetricsContainer {
302    pub name: String,
303    pub usage: PodMetricsContainerUsage,
304}
305
306#[derive(Deserialize, Clone, Debug)]
307pub struct PodMetricsContainerUsage {
308    pub cpu: Quantity,
309    pub memory: Quantity,
310}
311
312#[derive(Deserialize, Clone, Debug)]
313pub struct PodMetrics {
314    pub metadata: ObjectMeta,
315    pub timestamp: String,
316    pub window: String,
317    pub containers: Vec<PodMetricsContainer>,
318}
319
320impl k8s_openapi::Resource for PodMetrics {
321    const GROUP: &'static str = "metrics.k8s.io";
322    const KIND: &'static str = "PodMetrics";
323    const VERSION: &'static str = "v1beta1";
324    const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
325    const URL_PATH_SEGMENT: &'static str = "pods";
326
327    type Scope = k8s_openapi::NamespaceResourceScope;
328}
329
330impl k8s_openapi::Metadata for PodMetrics {
331    type Ty = ObjectMeta;
332
333    fn metadata(&self) -> &Self::Ty {
334        &self.metadata
335    }
336
337    fn metadata_mut(&mut self) -> &mut Self::Ty {
338        &mut self.metadata
339    }
340}
341
342// Note that these types are very weird. We are `get`-ing a
343// `List` object, and lying about it having an `ObjectMeta`
344// (it deserializes as empty, but we don't need it). The custom
345// metrics API is designed this way, which is very non-standard.
346// A discussion in the `kube` channel in the `tokio` discord
347// confirmed that this layout + using `get_subresource` is the
348// best way to handle this.
349
350#[derive(Deserialize, Clone, Debug)]
351pub struct MetricIdentifier {
352    #[serde(rename = "metricName")]
353    pub name: String,
354    // We skip `selector` for now, as we don't use it
355}
356
357#[derive(Deserialize, Clone, Debug)]
358pub struct MetricValue {
359    #[serde(rename = "describedObject")]
360    pub described_object: ObjectReference,
361    #[serde(flatten)]
362    pub metric_identifier: MetricIdentifier,
363    pub timestamp: String,
364    pub value: Quantity,
365    // We skip `windowSeconds`, as we don't need it
366}
367
368impl NamespacedKubernetesOrchestrator {
369    fn service_name(&self, id: &str) -> String {
370        format!(
371            "{}{}-{id}",
372            self.config.name_prefix.as_deref().unwrap_or(""),
373            self.namespace
374        )
375    }
376
377    /// Return a `watcher::Config` instance that limits results to the namespace
378    /// assigned to this orchestrator.
379    fn watch_pod_params(&self) -> watcher::Config {
380        let ns_selector = format!(
381            "environmentd.materialize.cloud/namespace={}",
382            self.namespace
383        );
384        // This watcher timeout must be shorter than the client read timeout.
385        watcher::Config::default().timeout(59).labels(&ns_selector)
386    }
387
388    /// Convert a higher-level label key to the actual one we
389    /// will give to Kubernetes
390    fn make_label_key(&self, key: &str) -> String {
391        format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
392    }
393
394    fn label_selector_to_k8s(
395        &self,
396        MzLabelSelector { label_name, logic }: MzLabelSelector,
397    ) -> Result<LabelSelectorRequirement, anyhow::Error> {
398        let (operator, values) = match logic {
399            LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
400            LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
401            LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
402            LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
403            LabelSelectionLogic::InSet { values } => {
404                if values.is_empty() {
405                    Err(anyhow!(
406                        "Invalid selector logic for {label_name}: empty `in` set"
407                    ))
408                } else {
409                    Ok(("In", values))
410                }
411            }
412            LabelSelectionLogic::NotInSet { values } => {
413                if values.is_empty() {
414                    Err(anyhow!(
415                        "Invalid selector logic for {label_name}: empty `notin` set"
416                    ))
417                } else {
418                    Ok(("NotIn", values))
419                }
420            }
421        }?;
422        let lsr = LabelSelectorRequirement {
423            key: self.make_label_key(&label_name),
424            operator: operator.to_string(),
425            values: Some(values),
426        };
427        Ok(lsr)
428    }
429
430    fn send_command(&self, cmd: WorkerCommand) {
431        self.command_tx.send(cmd).expect("worker task not dropped");
432    }
433}
434
435#[derive(Debug)]
436struct ScaledQuantity {
437    integral_part: u64,
438    exponent: i8,
439    base10: bool,
440}
441
442impl ScaledQuantity {
443    pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
444        if base10 != self.base10 {
445            return None;
446        }
447        let exponent = self.exponent - scale;
448        let mut result = self.integral_part;
449        let base = if self.base10 { 10 } else { 2 };
450        if exponent < 0 {
451            for _ in exponent..0 {
452                result /= base;
453            }
454        } else {
455            for _ in 0..exponent {
456                result = result.checked_mul(base)?;
457            }
458        }
459        Some(result)
460    }
461}
462
463// Parse a k8s `Quantity` object
464// into a numeric value.
465//
466// This is intended to support collecting CPU and Memory data.
467// Thus, there are a few that things Kubernetes attempts to do, that we don't,
468// because I've never observed metrics-server specifically sending them:
469// (1) Handle negative numbers (because it's not useful for that use-case)
470// (2) Handle non-integers (because I have never observed them being actually sent)
471// (3) Handle scientific notation (e.g. 1.23e2)
472fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
473    const DEC_SUFFIXES: &[(&str, i8)] = &[
474        ("n", -9),
475        ("u", -6),
476        ("m", -3),
477        ("", 0),
478        ("k", 3), // yep, intentionally lowercase.
479        ("M", 6),
480        ("G", 9),
481        ("T", 12),
482        ("P", 15),
483        ("E", 18),
484    ];
485    const BIN_SUFFIXES: &[(&str, i8)] = &[
486        ("", 0),
487        ("Ki", 10),
488        ("Mi", 20),
489        ("Gi", 30),
490        ("Ti", 40),
491        ("Pi", 50),
492        ("Ei", 60),
493    ];
494
495    let (positive, s) = match s.chars().next() {
496        Some('+') => (true, &s[1..]),
497        Some('-') => (false, &s[1..]),
498        _ => (true, s),
499    };
500
501    if !positive {
502        anyhow::bail!("Negative numbers not supported")
503    }
504
505    fn is_suffix_char(ch: char) -> bool {
506        "numkMGTPEKi".contains(ch)
507    }
508    let (num, suffix) = match s.find(is_suffix_char) {
509        None => (s, ""),
510        Some(idx) => s.split_at(idx),
511    };
512    let num: u64 = num.parse()?;
513    let (exponent, base10) = if let Some((_, exponent)) =
514        DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
515    {
516        (exponent, true)
517    } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
518        (exponent, false)
519    } else {
520        anyhow::bail!("Unrecognized suffix: {suffix}");
521    };
522    Ok(ScaledQuantity {
523        integral_part: num,
524        exponent: *exponent,
525        base10,
526    })
527}
528
529#[async_trait]
530impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
531    async fn fetch_service_metrics(
532        &self,
533        id: &str,
534    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
535        let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
536            *info
537        } else {
538            // This should have been set in `ensure_service`.
539            tracing::error!("Failed to get info for {id}");
540            anyhow::bail!("Failed to get info for {id}");
541        };
542
543        let (result_tx, result_rx) = oneshot::channel();
544        self.send_command(WorkerCommand::FetchServiceMetrics {
545            name: self.service_name(id),
546            info,
547            result_tx,
548        });
549
550        let metrics = result_rx.await.expect("worker task not dropped");
551        Ok(metrics)
552    }
553
554    fn ensure_service(
555        &self,
556        id: &str,
557        ServiceConfig {
558            image,
559            init_container_image,
560            args,
561            ports: ports_in,
562            memory_limit,
563            memory_request,
564            cpu_limit,
565            scale,
566            labels: labels_in,
567            annotations: annotations_in,
568            availability_zones,
569            other_replicas_selector,
570            replicas_selector,
571            disk_limit,
572            node_selector,
573        }: ServiceConfig,
574    ) -> Result<Box<dyn Service>, anyhow::Error> {
575        // This is extremely cheap to clone, so just look into the lock once.
576        let scheduling_config: ServiceSchedulingConfig =
577            self.scheduling_config.read().expect("poisoned").clone();
578
579        // Enable disk if the size does not disable it.
580        let disk = disk_limit != Some(DiskLimit::ZERO);
581
582        let name = self.service_name(id);
583        // The match labels should be the minimal set of labels that uniquely
584        // identify the pods in the stateful set. Changing these after the
585        // `StatefulSet` is created is not permitted by Kubernetes, and we're
586        // not yet smart enough to handle deleting and recreating the
587        // `StatefulSet`.
588        let mut match_labels = btreemap! {
589            "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
590            "environmentd.materialize.cloud/service-id".into() => id.into(),
591        };
592        for (key, value) in &self.config.service_labels {
593            match_labels.insert(key.clone(), value.clone());
594        }
595
596        let mut labels = match_labels.clone();
597        for (key, value) in labels_in {
598            labels.insert(self.make_label_key(&key), value);
599        }
600
601        labels.insert(self.make_label_key("scale"), scale.to_string());
602
603        for port in &ports_in {
604            labels.insert(
605                format!("environmentd.materialize.cloud/port-{}", port.name),
606                "true".into(),
607            );
608        }
609        let mut limits = BTreeMap::new();
610        let mut requests = BTreeMap::new();
611        if let Some(memory_limit) = memory_limit {
612            limits.insert(
613                "memory".into(),
614                Quantity(memory_limit.0.as_u64().to_string()),
615            );
616            requests.insert(
617                "memory".into(),
618                Quantity(memory_limit.0.as_u64().to_string()),
619            );
620        }
621        if let Some(memory_request) = memory_request {
622            requests.insert(
623                "memory".into(),
624                Quantity(memory_request.0.as_u64().to_string()),
625            );
626        }
627        if let Some(cpu_limit) = cpu_limit {
628            limits.insert(
629                "cpu".into(),
630                Quantity(format!("{}m", cpu_limit.as_millicpus())),
631            );
632            requests.insert(
633                "cpu".into(),
634                Quantity(format!("{}m", cpu_limit.as_millicpus())),
635            );
636        }
637        let service = K8sService {
638            metadata: ObjectMeta {
639                name: Some(name.clone()),
640                ..Default::default()
641            },
642            spec: Some(ServiceSpec {
643                ports: Some(
644                    ports_in
645                        .iter()
646                        .map(|port| ServicePort {
647                            port: port.port_hint.into(),
648                            name: Some(port.name.clone()),
649                            ..Default::default()
650                        })
651                        .collect(),
652                ),
653                cluster_ip: Some("None".to_string()),
654                selector: Some(match_labels.clone()),
655                ..Default::default()
656            }),
657            status: None,
658        };
659
660        let hosts = (0..scale.get())
661            .map(|i| {
662                format!(
663                    "{name}-{i}.{name}.{}.svc.cluster.local",
664                    self.kubernetes_namespace
665                )
666            })
667            .collect::<Vec<_>>();
668        let ports = ports_in
669            .iter()
670            .map(|p| (p.name.clone(), p.port_hint))
671            .collect::<BTreeMap<_, _>>();
672
673        let mut listen_addrs = BTreeMap::new();
674        let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
675        for (name, port) in &ports {
676            listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
677            for (i, host) in hosts.iter().enumerate() {
678                peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
679            }
680        }
681        let mut args = args(ServiceAssignments {
682            listen_addrs: &listen_addrs,
683            peer_addrs: &peer_addrs,
684        });
685
686        // This constrains the orchestrator (for those orchestrators that support
687        // anti-affinity, today just k8s) to never schedule pods for different replicas
688        // of the same cluster on the same node. Pods from the _same_ replica are fine;
689        // pods from different clusters are also fine.
690        //
691        // The point is that if pods of two replicas are on the same node, that node
692        // going down would kill both replicas, and so the replication factor of the
693        // cluster in question is illusory.
694        let anti_affinity = Some({
695            let label_selector_requirements = other_replicas_selector
696                .clone()
697                .into_iter()
698                .map(|ls| self.label_selector_to_k8s(ls))
699                .collect::<Result<Vec<_>, _>>()?;
700            let ls = LabelSelector {
701                match_expressions: Some(label_selector_requirements),
702                ..Default::default()
703            };
704            let pat = PodAffinityTerm {
705                label_selector: Some(ls),
706                topology_key: "kubernetes.io/hostname".to_string(),
707                ..Default::default()
708            };
709
710            if !scheduling_config.soften_replication_anti_affinity {
711                PodAntiAffinity {
712                    required_during_scheduling_ignored_during_execution: Some(vec![pat]),
713                    ..Default::default()
714                }
715            } else {
716                PodAntiAffinity {
717                    preferred_during_scheduling_ignored_during_execution: Some(vec![
718                        WeightedPodAffinityTerm {
719                            weight: scheduling_config.soften_replication_anti_affinity_weight,
720                            pod_affinity_term: pat,
721                        },
722                    ]),
723                    ..Default::default()
724                }
725            }
726        });
727
728        let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
729            // `match_labels` sufficiently selects pods in the same replica.
730            let ls = LabelSelector {
731                match_labels: Some(match_labels.clone()),
732                ..Default::default()
733            };
734            let pat = PodAffinityTerm {
735                label_selector: Some(ls),
736                topology_key: "topology.kubernetes.io/zone".to_string(),
737                ..Default::default()
738            };
739
740            Some(PodAffinity {
741                preferred_during_scheduling_ignored_during_execution: Some(vec![
742                    WeightedPodAffinityTerm {
743                        weight,
744                        pod_affinity_term: pat,
745                    },
746                ]),
747                ..Default::default()
748            })
749        } else {
750            None
751        };
752
753        let topology_spread = if scheduling_config.topology_spread.enabled {
754            let config = &scheduling_config.topology_spread;
755
756            if !config.ignore_non_singular_scale || scale.get() == 1 {
757                let label_selector_requirements = (if config.ignore_non_singular_scale {
758                    let mut replicas_selector_ignoring_scale = replicas_selector.clone();
759
760                    replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
761                        label_name: "scale".into(),
762                        logic: mz_orchestrator::LabelSelectionLogic::Eq {
763                            value: "1".to_string(),
764                        },
765                    });
766
767                    replicas_selector_ignoring_scale
768                } else {
769                    replicas_selector
770                })
771                .into_iter()
772                .map(|ls| self.label_selector_to_k8s(ls))
773                .collect::<Result<Vec<_>, _>>()?;
774                let ls = LabelSelector {
775                    match_expressions: Some(label_selector_requirements),
776                    ..Default::default()
777                };
778
779                let constraint = TopologySpreadConstraint {
780                    label_selector: Some(ls),
781                    min_domains: config.min_domains,
782                    max_skew: config.max_skew,
783                    topology_key: "topology.kubernetes.io/zone".to_string(),
784                    when_unsatisfiable: if config.soft {
785                        "ScheduleAnyway".to_string()
786                    } else {
787                        "DoNotSchedule".to_string()
788                    },
789                    // TODO(guswynn): restore these once they are supported.
790                    // Consider node affinities when calculating topology spread. This is the
791                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_affinity_policy>,
792                    // made explicit.
793                    // node_affinity_policy: Some("Honor".to_string()),
794                    // Do not consider node taints when calculating topology spread. This is the
795                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_taints_policy>,
796                    // made explicit.
797                    // node_taints_policy: Some("Ignore".to_string()),
798                    match_label_keys: None,
799                    // Once the above are restorted, we should't have `..Default::default()` here because the specifics of these fields are
800                    // subtle enough where we want compilation failures when we upgrade
801                    ..Default::default()
802                };
803                Some(vec![constraint])
804            } else {
805                None
806            }
807        } else {
808            None
809        };
810
811        let mut pod_annotations = btreemap! {
812            // Prevent the cluster-autoscaler (or karpenter) from evicting these pods in attempts to scale down
813            // and terminate nodes.
814            // This will cost us more money, but should give us better uptime.
815            // This does not prevent all evictions by Kubernetes, only the ones initiated by the
816            // cluster-autoscaler (or karpenter). Notably, eviction of pods for resource overuse is still enabled.
817            "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
818            "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
819
820            // It's called do-not-disrupt in newer versions of karpenter, so adding for forward/backward compatibility
821            "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
822        };
823        for (key, value) in annotations_in {
824            // We want to use the same prefix as our labels keys
825            pod_annotations.insert(self.make_label_key(&key), value);
826        }
827        if self.config.enable_prometheus_scrape_annotations {
828            if let Some(internal_http_port) = ports_in
829                .iter()
830                .find(|port| port.name == "internal-http")
831                .map(|port| port.port_hint.to_string())
832            {
833                // Enable prometheus scrape discovery
834                pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
835                pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
836                pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
837                pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
838            }
839        }
840        for (key, value) in &self.config.service_annotations {
841            pod_annotations.insert(key.clone(), value.clone());
842        }
843
844        let default_node_selector = if disk {
845            vec![("materialize.cloud/disk".to_string(), disk.to_string())]
846        } else {
847            // if the cluster doesn't require disk, we can omit the selector
848            // allowing it to be scheduled onto nodes with and without the
849            // selector
850            vec![]
851        };
852
853        let node_selector: BTreeMap<String, String> = default_node_selector
854            .into_iter()
855            .chain(self.config.service_node_selector.clone())
856            .chain(node_selector)
857            .collect();
858
859        let node_affinity = if let Some(availability_zones) = availability_zones {
860            let selector = NodeSelectorTerm {
861                match_expressions: Some(vec![NodeSelectorRequirement {
862                    key: "materialize.cloud/availability-zone".to_string(),
863                    operator: "In".to_string(),
864                    values: Some(availability_zones),
865                }]),
866                match_fields: None,
867            };
868
869            if scheduling_config.soften_az_affinity {
870                Some(NodeAffinity {
871                    preferred_during_scheduling_ignored_during_execution: Some(vec![
872                        PreferredSchedulingTerm {
873                            preference: selector,
874                            weight: scheduling_config.soften_az_affinity_weight,
875                        },
876                    ]),
877                    required_during_scheduling_ignored_during_execution: None,
878                })
879            } else {
880                Some(NodeAffinity {
881                    preferred_during_scheduling_ignored_during_execution: None,
882                    required_during_scheduling_ignored_during_execution: Some(NodeSelector {
883                        node_selector_terms: vec![selector],
884                    }),
885                })
886            }
887        } else {
888            None
889        };
890
891        let mut affinity = Affinity {
892            pod_anti_affinity: anti_affinity,
893            pod_affinity,
894            node_affinity,
895            ..Default::default()
896        };
897        if let Some(service_affinity) = &self.config.service_affinity {
898            affinity.merge_from(serde_json::from_str(service_affinity)?);
899        }
900
901        let container_name = image
902            .rsplit_once('/')
903            .and_then(|(_, name_version)| name_version.rsplit_once(':'))
904            .context("`image` is not ORG/NAME:VERSION")?
905            .0
906            .to_string();
907
908        let container_security_context = if scheduling_config.security_context_enabled {
909            Some(SecurityContext {
910                privileged: Some(false),
911                run_as_non_root: Some(true),
912                allow_privilege_escalation: Some(false),
913                seccomp_profile: Some(SeccompProfile {
914                    type_: "RuntimeDefault".to_string(),
915                    ..Default::default()
916                }),
917                capabilities: Some(Capabilities {
918                    drop: Some(vec!["ALL".to_string()]),
919                    ..Default::default()
920                }),
921                ..Default::default()
922            })
923        } else {
924            None
925        };
926
927        let init_containers = init_container_image.map(|image| {
928            vec![Container {
929                name: "init".to_string(),
930                image: Some(image),
931                image_pull_policy: Some(self.config.image_pull_policy.to_string()),
932                resources: Some(ResourceRequirements {
933                    claims: None,
934                    limits: Some(limits.clone()),
935                    requests: Some(requests.clone()),
936                }),
937                security_context: container_security_context.clone(),
938                env: Some(vec![
939                    EnvVar {
940                        name: "MZ_NAMESPACE".to_string(),
941                        value_from: Some(EnvVarSource {
942                            field_ref: Some(ObjectFieldSelector {
943                                field_path: "metadata.namespace".to_string(),
944                                ..Default::default()
945                            }),
946                            ..Default::default()
947                        }),
948                        ..Default::default()
949                    },
950                    EnvVar {
951                        name: "MZ_POD_NAME".to_string(),
952                        value_from: Some(EnvVarSource {
953                            field_ref: Some(ObjectFieldSelector {
954                                field_path: "metadata.name".to_string(),
955                                ..Default::default()
956                            }),
957                            ..Default::default()
958                        }),
959                        ..Default::default()
960                    },
961                    EnvVar {
962                        name: "MZ_NODE_NAME".to_string(),
963                        value_from: Some(EnvVarSource {
964                            field_ref: Some(ObjectFieldSelector {
965                                field_path: "spec.nodeName".to_string(),
966                                ..Default::default()
967                            }),
968                            ..Default::default()
969                        }),
970                        ..Default::default()
971                    },
972                ]),
973                ..Default::default()
974            }]
975        });
976
977        let env = if self.config.coverage {
978            Some(vec![EnvVar {
979                name: "LLVM_PROFILE_FILE".to_string(),
980                value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
981                ..Default::default()
982            }])
983        } else {
984            None
985        };
986
987        let mut volume_mounts = vec![];
988
989        if self.config.coverage {
990            volume_mounts.push(VolumeMount {
991                name: "coverage".to_string(),
992                mount_path: "/coverage".to_string(),
993                ..Default::default()
994            })
995        }
996
997        let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
998            (true, Some(ephemeral_volume_storage_class)) => {
999                volume_mounts.push(VolumeMount {
1000                    name: "scratch".to_string(),
1001                    mount_path: "/scratch".to_string(),
1002                    ..Default::default()
1003                });
1004                args.push("--scratch-directory=/scratch".into());
1005
1006                Some(vec![Volume {
1007                    name: "scratch".to_string(),
1008                    ephemeral: Some(EphemeralVolumeSource {
1009                        volume_claim_template: Some(PersistentVolumeClaimTemplate {
1010                            spec: PersistentVolumeClaimSpec {
1011                                access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1012                                storage_class_name: Some(
1013                                    ephemeral_volume_storage_class.to_string(),
1014                                ),
1015                                resources: Some(VolumeResourceRequirements {
1016                                    requests: Some(BTreeMap::from([(
1017                                        "storage".to_string(),
1018                                        Quantity(
1019                                            disk_limit
1020                                                .unwrap_or(DiskLimit::ARBITRARY)
1021                                                .0
1022                                                .as_u64()
1023                                                .to_string(),
1024                                        ),
1025                                    )])),
1026                                    ..Default::default()
1027                                }),
1028                                ..Default::default()
1029                            },
1030                            ..Default::default()
1031                        }),
1032                        ..Default::default()
1033                    }),
1034                    ..Default::default()
1035                }])
1036            }
1037            (true, None) => {
1038                return Err(anyhow!(
1039                    "service requested disk but no ephemeral volume storage class was configured"
1040                ));
1041            }
1042            (false, _) => None,
1043        };
1044
1045        if let Some(name_prefix) = &self.config.name_prefix {
1046            args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1047        }
1048
1049        let volume_claim_templates = if self.config.coverage {
1050            Some(vec![PersistentVolumeClaim {
1051                metadata: ObjectMeta {
1052                    name: Some("coverage".to_string()),
1053                    ..Default::default()
1054                },
1055                spec: Some(PersistentVolumeClaimSpec {
1056                    access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1057                    resources: Some(VolumeResourceRequirements {
1058                        requests: Some(BTreeMap::from([(
1059                            "storage".to_string(),
1060                            Quantity("10Gi".to_string()),
1061                        )])),
1062                        ..Default::default()
1063                    }),
1064                    ..Default::default()
1065                }),
1066                ..Default::default()
1067            }])
1068        } else {
1069            None
1070        };
1071
1072        let security_context = if let Some(fs_group) = self.config.service_fs_group {
1073            Some(PodSecurityContext {
1074                fs_group: Some(fs_group),
1075                run_as_user: Some(fs_group),
1076                run_as_group: Some(fs_group),
1077                ..Default::default()
1078            })
1079        } else {
1080            None
1081        };
1082
1083        let mut tolerations = vec![
1084            // When the node becomes `NotReady` it indicates there is a problem
1085            // with the node. By default Kubernetes waits 300s (5 minutes)
1086            // before descheduling the pod, but we tune this to 30s for faster
1087            // recovery in the case of node failure.
1088            Toleration {
1089                effect: Some("NoExecute".into()),
1090                key: Some("node.kubernetes.io/not-ready".into()),
1091                operator: Some("Exists".into()),
1092                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1093                value: None,
1094            },
1095            Toleration {
1096                effect: Some("NoExecute".into()),
1097                key: Some("node.kubernetes.io/unreachable".into()),
1098                operator: Some("Exists".into()),
1099                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1100                value: None,
1101            },
1102        ];
1103        if let Some(service_tolerations) = &self.config.service_tolerations {
1104            tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1105        }
1106        let tolerations = Some(tolerations);
1107
1108        let mut pod_template_spec = PodTemplateSpec {
1109            metadata: Some(ObjectMeta {
1110                labels: Some(labels.clone()),
1111                // Only set `annotations` _after_ we have computed the pod template hash, to
1112                // avoid that annotation changes cause pod replacements.
1113                ..Default::default()
1114            }),
1115            spec: Some(PodSpec {
1116                init_containers,
1117                containers: vec![Container {
1118                    name: container_name,
1119                    image: Some(image),
1120                    args: Some(args),
1121                    image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1122                    ports: Some(
1123                        ports_in
1124                            .iter()
1125                            .map(|port| ContainerPort {
1126                                container_port: port.port_hint.into(),
1127                                name: Some(port.name.clone()),
1128                                ..Default::default()
1129                            })
1130                            .collect(),
1131                    ),
1132                    security_context: container_security_context.clone(),
1133                    resources: Some(ResourceRequirements {
1134                        claims: None,
1135                        limits: Some(limits),
1136                        requests: Some(requests),
1137                    }),
1138                    volume_mounts: if !volume_mounts.is_empty() {
1139                        Some(volume_mounts)
1140                    } else {
1141                        None
1142                    },
1143                    env,
1144                    ..Default::default()
1145                }],
1146                volumes,
1147                security_context,
1148                node_selector: Some(node_selector),
1149                scheduler_name: self.config.scheduler_name.clone(),
1150                service_account: self.config.service_account.clone(),
1151                affinity: Some(affinity),
1152                topology_spread_constraints: topology_spread,
1153                tolerations,
1154                // Setting a 0s termination grace period has the side effect of
1155                // automatically starting a new pod when the previous pod is
1156                // currently terminating. This enables recovery from a node
1157                // failure with no manual intervention. Without this setting,
1158                // the StatefulSet controller will refuse to start a new pod
1159                // until the failed node is manually removed from the Kubernetes
1160                // cluster.
1161                //
1162                // The Kubernetes documentation strongly advises against this
1163                // setting, as StatefulSets attempt to provide "at most once"
1164                // semantics [0]--that is, the guarantee that for a given pod in
1165                // a StatefulSet there is *at most* one pod with that identity
1166                // running in the cluster.
1167                //
1168                // Materialize services, however, are carefully designed to
1169                // *not* rely on this guarantee. In fact, we do not believe that
1170                // correct distributed systems can meaningfully rely on
1171                // Kubernetes's guarantee--network packets from a pod can be
1172                // arbitrarily delayed, long past that pod's termination.
1173                //
1174                // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1175                termination_grace_period_seconds: Some(0),
1176                ..Default::default()
1177            }),
1178        };
1179        let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1180        let mut hasher = Sha256::new();
1181        hasher.update(pod_template_json);
1182        let pod_template_hash = format!("{:x}", hasher.finalize());
1183        pod_annotations.insert(
1184            POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1185            pod_template_hash.clone(),
1186        );
1187
1188        pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1189
1190        let stateful_set = StatefulSet {
1191            metadata: ObjectMeta {
1192                name: Some(name.clone()),
1193                ..Default::default()
1194            },
1195            spec: Some(StatefulSetSpec {
1196                selector: LabelSelector {
1197                    match_labels: Some(match_labels),
1198                    ..Default::default()
1199                },
1200                service_name: Some(name.clone()),
1201                replicas: Some(scale.cast_into()),
1202                template: pod_template_spec,
1203                update_strategy: Some(StatefulSetUpdateStrategy {
1204                    type_: Some("OnDelete".to_owned()),
1205                    ..Default::default()
1206                }),
1207                pod_management_policy: Some("Parallel".to_string()),
1208                volume_claim_templates,
1209                ..Default::default()
1210            }),
1211            status: None,
1212        };
1213
1214        self.send_command(WorkerCommand::EnsureService {
1215            desc: ServiceDescription {
1216                name,
1217                scale,
1218                service,
1219                stateful_set,
1220                pod_template_hash,
1221            },
1222        });
1223
1224        self.service_infos
1225            .lock()
1226            .expect("poisoned lock")
1227            .insert(id.to_string(), ServiceInfo { scale });
1228
1229        Ok(Box::new(KubernetesService { hosts, ports }))
1230    }
1231
1232    /// Drops the identified service, if it exists.
1233    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1234        fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1235        self.service_infos.lock().expect("poisoned lock").remove(id);
1236
1237        self.send_command(WorkerCommand::DropService {
1238            name: self.service_name(id),
1239        });
1240
1241        Ok(())
1242    }
1243
1244    /// Lists the identifiers of all known services.
1245    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1246        let (result_tx, result_rx) = oneshot::channel();
1247        self.send_command(WorkerCommand::ListServices {
1248            namespace: self.namespace.clone(),
1249            result_tx,
1250        });
1251
1252        let list = result_rx.await.expect("worker task not dropped");
1253        Ok(list)
1254    }
1255
1256    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1257        fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1258            let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1259            let service_id_label = "environmentd.materialize.cloud/service-id";
1260            let service_id = pod
1261                .labels()
1262                .get(service_id_label)
1263                .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1264                .clone();
1265
1266            let oomed = pod
1267                .status
1268                .as_ref()
1269                .and_then(|status| status.container_statuses.as_ref())
1270                .map(|container_statuses| {
1271                    container_statuses.iter().any(|cs| {
1272                        // The container might have already transitioned from "terminated" to
1273                        // "waiting"/"running" state, in which case we need to check its previous
1274                        // state to find out why it terminated.
1275                        let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1276                        let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1277                        let termination_state = current_state.or(last_state);
1278
1279                        // The interesting exit codes are:
1280                        //  * 135 (SIGBUS): occurs when lgalloc runs out of disk
1281                        //  * 137 (SIGKILL): occurs when the OOM killer terminates the container
1282                        //  * 167: occurs when the lgalloc or memory limiter terminates the process
1283                        // We treat the all of these as OOM conditions since swap and lgalloc use
1284                        // disk only for spilling memory.
1285                        let exit_code = termination_state.map(|s| s.exit_code);
1286                        exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1287                    })
1288                })
1289                .unwrap_or(false);
1290
1291            let (pod_ready, last_probe_time) = pod
1292                .status
1293                .and_then(|status| status.conditions)
1294                .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1295                .map(|c| (c.status == "True", c.last_probe_time))
1296                .unwrap_or((false, None));
1297
1298            let status = if pod_ready {
1299                ServiceStatus::Online
1300            } else {
1301                ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1302            };
1303            let time = if let Some(time) = last_probe_time {
1304                time.0
1305            } else {
1306                Utc::now()
1307            };
1308
1309            Ok(ServiceEvent {
1310                service_id,
1311                process_id,
1312                status,
1313                time,
1314            })
1315        }
1316
1317        let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1318            .touched_objects()
1319            .filter_map(|object| async move {
1320                match object {
1321                    Ok(pod) => Some(into_service_event(pod)),
1322                    Err(error) => {
1323                        // We assume that errors returned by Kubernetes are usually transient, so we
1324                        // just log a warning and ignore them otherwise.
1325                        tracing::warn!("service watch error: {error}");
1326                        None
1327                    }
1328                }
1329            });
1330        Box::pin(stream)
1331    }
1332
1333    fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1334        *self.scheduling_config.write().expect("poisoned") = config;
1335    }
1336}
1337
1338impl OrchestratorWorker {
1339    fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1340        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1341    }
1342
1343    async fn run(mut self) {
1344        {
1345            info!("initializing Kubernetes orchestrator worker");
1346            let start = Instant::now();
1347
1348            // Fetch the owner reference for our own pod (usually a
1349            // StatefulSet), so that we can propagate it to the services we
1350            // create.
1351            let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1352            let orchestrator_pod = Retry::default()
1353                .clamp_backoff(Duration::from_secs(10))
1354                .retry_async(|_| self.pod_api.get(&hostname))
1355                .await
1356                .expect("always retries on error");
1357            self.owner_references
1358                .extend(orchestrator_pod.owner_references().into_iter().cloned());
1359
1360            info!(
1361                "Kubernetes orchestrator worker initialized in {:?}",
1362                start.elapsed()
1363            );
1364        }
1365
1366        while let Some(cmd) = self.command_rx.recv().await {
1367            self.handle_command(cmd).await;
1368        }
1369    }
1370
1371    /// Handle a worker command.
1372    ///
1373    /// If handling the command fails, it is automatically retried. All command handlers return
1374    /// [`K8sError`], so we can reasonably assume that a failure is caused by issues communicating
1375    /// with the K8S server and that retrying resolves them eventually.
1376    async fn handle_command(&self, cmd: WorkerCommand) {
1377        async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1378        where
1379            F: Fn() -> U,
1380            U: Future<Output = Result<R, K8sError>>,
1381        {
1382            Retry::default()
1383                .clamp_backoff(Duration::from_secs(10))
1384                .retry_async(|_| {
1385                    f().map_err(
1386                        |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1387                    )
1388                })
1389                .await
1390                .expect("always retries on error")
1391        }
1392
1393        use WorkerCommand::*;
1394        match cmd {
1395            EnsureService { desc } => {
1396                retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1397            }
1398            DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1399            ListServices {
1400                namespace,
1401                result_tx,
1402            } => {
1403                let result = retry(|| self.list_services(&namespace), "ListServices").await;
1404                let _ = result_tx.send(result);
1405            }
1406            FetchServiceMetrics {
1407                name,
1408                info,
1409                result_tx,
1410            } => {
1411                let result = self.fetch_service_metrics(&name, &info).await;
1412                let _ = result_tx.send(result);
1413            }
1414        }
1415    }
1416
1417    async fn fetch_service_metrics(
1418        &self,
1419        name: &str,
1420        info: &ServiceInfo,
1421    ) -> Vec<ServiceProcessMetrics> {
1422        if !self.collect_pod_metrics {
1423            return (0..info.scale.get())
1424                .map(|_| ServiceProcessMetrics::default())
1425                .collect();
1426        }
1427
1428        /// Usage metrics reported by clusterd processes.
1429        #[derive(Deserialize)]
1430        pub(crate) struct ClusterdUsage {
1431            disk_bytes: Option<u64>,
1432            memory_bytes: Option<u64>,
1433            swap_bytes: Option<u64>,
1434            heap_limit: Option<u64>,
1435        }
1436
1437        /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
1438        ///
1439        /// Note that we want to keep going even if a lookup fails for whatever reason,
1440        /// so this function is infallible. If we fail to get cpu or memory for a particular pod,
1441        /// we just log a warning and install `None` in the returned struct.
1442        async fn get_metrics(
1443            self_: &OrchestratorWorker,
1444            service_name: &str,
1445            i: usize,
1446        ) -> ServiceProcessMetrics {
1447            let name = format!("{service_name}-{i}");
1448
1449            let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1450            let (metrics, clusterd_usage) =
1451                match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1452                {
1453                    (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1454                    (Ok(metrics), Err(e)) => {
1455                        warn!("Failed to fetch clusterd usage for {name}: {e}");
1456                        (metrics, None)
1457                    }
1458                    (Err(e), _) => {
1459                        warn!("Failed to get metrics for {name}: {e}");
1460                        return ServiceProcessMetrics::default();
1461                    }
1462                };
1463            let Some(PodMetricsContainer {
1464                usage:
1465                    PodMetricsContainerUsage {
1466                        cpu: Quantity(cpu_str),
1467                        memory: Quantity(mem_str),
1468                    },
1469                ..
1470            }) = metrics.containers.get(0)
1471            else {
1472                warn!("metrics result contained no containers for {name}");
1473                return ServiceProcessMetrics::default();
1474            };
1475
1476            let mut process_metrics = ServiceProcessMetrics::default();
1477
1478            match parse_k8s_quantity(cpu_str) {
1479                Ok(q) => match q.try_to_integer(-9, true) {
1480                    Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1481                    None => error!("CPU value {q:?} out of range"),
1482                },
1483                Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1484            }
1485            match parse_k8s_quantity(mem_str) {
1486                Ok(q) => match q.try_to_integer(0, false) {
1487                    Some(mem) => process_metrics.memory_bytes = Some(mem),
1488                    None => error!("memory value {q:?} out of range"),
1489                },
1490                Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1491            }
1492
1493            if let Some(usage) = clusterd_usage {
1494                // clusterd may report disk usage as either `disk_bytes`, or `swap_bytes`, or both.
1495                //
1496                // For now the Console expects the swap size to be reported in `disk_bytes`.
1497                // Once the Console has been ported to use `heap_bytes`/`heap_limit`, we can
1498                // simplify things by setting `process_metrics.disk_bytes = usage.disk_bytes`.
1499                process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1500                    (Some(disk), Some(swap)) => Some(disk + swap),
1501                    (disk, swap) => disk.or(swap),
1502                };
1503
1504                // clusterd may report heap usage as `memory_bytes` and optionally `swap_bytes`.
1505                // If no `memory_bytes` is reported, we can't know the heap usage.
1506                process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1507                    (Some(memory), Some(swap)) => Some(memory + swap),
1508                    (Some(memory), None) => Some(memory),
1509                    (None, _) => None,
1510                };
1511
1512                process_metrics.heap_limit = usage.heap_limit;
1513            }
1514
1515            process_metrics
1516        }
1517
1518        /// Get the current usage metrics exposed by a clusterd process.
1519        ///
1520        /// Usage metrics are collected by connecting to a metrics endpoint exposed by the process.
1521        /// The endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
1522        /// `/api/usage-metrics`.
1523        async fn get_clusterd_usage(
1524            self_: &OrchestratorWorker,
1525            service_name: &str,
1526            i: usize,
1527        ) -> anyhow::Result<ClusterdUsage> {
1528            let service = self_
1529                .service_api
1530                .get(service_name)
1531                .await
1532                .with_context(|| format!("failed to get service {service_name}"))?;
1533            let namespace = service
1534                .metadata
1535                .namespace
1536                .context("missing service namespace")?;
1537            let internal_http_port = service
1538                .spec
1539                .and_then(|spec| spec.ports)
1540                .and_then(|ports| {
1541                    ports
1542                        .into_iter()
1543                        .find(|p| p.name == Some("internal-http".into()))
1544                })
1545                .map(|p| p.port);
1546            let Some(port) = internal_http_port else {
1547                bail!("internal-http port missing in service spec");
1548            };
1549            let metrics_url = format!(
1550                "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1551                 /api/usage-metrics"
1552            );
1553
1554            let http_client = reqwest::Client::builder()
1555                .timeout(Duration::from_secs(10))
1556                .build()
1557                .context("error building HTTP client")?;
1558            let resp = http_client.get(metrics_url).send().await?;
1559            let usage = resp.json().await?;
1560
1561            Ok(usage)
1562        }
1563
1564        let ret = futures::future::join_all(
1565            (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1566        );
1567
1568        ret.await
1569    }
1570
1571    async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1572        // We inject our own pod's owner references into the Kubernetes objects
1573        // created for the service so that if the
1574        // Deployment/StatefulSet/whatever that owns the pod running the
1575        // orchestrator gets deleted, so do all services spawned by this
1576        // orchestrator.
1577        desc.service
1578            .metadata
1579            .owner_references
1580            .get_or_insert(vec![])
1581            .extend(self.owner_references.iter().cloned());
1582        desc.stateful_set
1583            .metadata
1584            .owner_references
1585            .get_or_insert(vec![])
1586            .extend(self.owner_references.iter().cloned());
1587
1588        let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1589        let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1590        let pod_annotations = pod_metadata.annotations.clone();
1591
1592        self.service_api
1593            .patch(
1594                &desc.name,
1595                &PatchParams::apply(FIELD_MANAGER).force(),
1596                &Patch::Apply(desc.service),
1597            )
1598            .await?;
1599        self.stateful_set_api
1600            .patch(
1601                &desc.name,
1602                &PatchParams::apply(FIELD_MANAGER).force(),
1603                &Patch::Apply(desc.stateful_set),
1604            )
1605            .await?;
1606
1607        // We manage pod recreation manually, using the OnDelete StatefulSet update strategy, for
1608        // two reasons:
1609        //  * Kubernetes doesn't always automatically replace StatefulSet pods when their specs
1610        //    change, see https://github.com/kubernetes/kubernetes#67250.
1611        //  * Kubernetes replaces StatefulSet pods when their annotations change, which is not
1612        //    something we want as it could cause unavailability.
1613        //
1614        // Our pod recreation policy is simple: If a pod's template hash changed, delete it, and
1615        // let the StatefulSet controller recreate it. Otherwise, patch the existing pod's
1616        // annotations to line up with the ones in the spec.
1617        for pod_id in 0..desc.scale.get() {
1618            let pod_name = format!("{}-{pod_id}", desc.name);
1619            let pod = match self.pod_api.get(&pod_name).await {
1620                Ok(pod) => pod,
1621                // Pod already doesn't exist.
1622                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1623                Err(e) => return Err(e),
1624            };
1625
1626            let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1627                != Some(&desc.pod_template_hash)
1628            {
1629                self.pod_api
1630                    .delete(&pod_name, &DeleteParams::default())
1631                    .await
1632                    .map(|_| ())
1633            } else {
1634                let metadata = ObjectMeta {
1635                    annotations: pod_annotations.clone(),
1636                    ..Default::default()
1637                }
1638                .into_request_partial::<Pod>();
1639                self.pod_api
1640                    .patch_metadata(
1641                        &pod_name,
1642                        &PatchParams::apply(FIELD_MANAGER).force(),
1643                        &Patch::Apply(&metadata),
1644                    )
1645                    .await
1646                    .map(|_| ())
1647            };
1648
1649            match result {
1650                Ok(()) => (),
1651                // Pod was deleted concurrently.
1652                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1653                Err(e) => return Err(e),
1654            }
1655        }
1656
1657        Ok(())
1658    }
1659
1660    async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1661        let res = self
1662            .stateful_set_api
1663            .delete(name, &DeleteParams::default())
1664            .await;
1665        match res {
1666            Ok(_) => (),
1667            Err(K8sError::Api(e)) if e.code == 404 => (),
1668            Err(e) => return Err(e),
1669        }
1670
1671        let res = self
1672            .service_api
1673            .delete(name, &DeleteParams::default())
1674            .await;
1675        match res {
1676            Ok(_) => Ok(()),
1677            Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1678            Err(e) => Err(e),
1679        }
1680    }
1681
1682    async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1683        let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1684        let name_prefix = format!("{}{namespace}-", self.name_prefix);
1685        Ok(stateful_sets
1686            .into_iter()
1687            .filter_map(|ss| {
1688                ss.metadata
1689                    .name
1690                    .unwrap()
1691                    .strip_prefix(&name_prefix)
1692                    .map(Into::into)
1693            })
1694            .collect())
1695    }
1696}
1697
1698#[derive(Debug, Clone)]
1699struct KubernetesService {
1700    hosts: Vec<String>,
1701    ports: BTreeMap<String, u16>,
1702}
1703
1704impl Service for KubernetesService {
1705    fn addresses(&self, port: &str) -> Vec<String> {
1706        let port = self.ports[port];
1707        self.hosts
1708            .iter()
1709            .map(|host| format!("{host}:{port}"))
1710            .collect()
1711    }
1712}
1713
1714#[cfg(test)]
1715mod tests {
1716    use super::*;
1717
1718    #[mz_ore::test]
1719    fn k8s_quantity_base10_large() {
1720        let cases = &[
1721            ("42", 42),
1722            ("42k", 42000),
1723            ("42M", 42000000),
1724            ("42G", 42000000000),
1725            ("42T", 42000000000000),
1726            ("42P", 42000000000000000),
1727        ];
1728
1729        for (input, expected) in cases {
1730            let quantity = parse_k8s_quantity(input).unwrap();
1731            let number = quantity.try_to_integer(0, true).unwrap();
1732            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1733        }
1734    }
1735
1736    #[mz_ore::test]
1737    fn k8s_quantity_base10_small() {
1738        let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1739
1740        for (input, expected) in cases {
1741            let quantity = parse_k8s_quantity(input).unwrap();
1742            let number = quantity.try_to_integer(-9, true).unwrap();
1743            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1744        }
1745    }
1746
1747    #[mz_ore::test]
1748    fn k8s_quantity_base2() {
1749        let cases = &[
1750            ("42Ki", 42 << 10),
1751            ("42Mi", 42 << 20),
1752            ("42Gi", 42 << 30),
1753            ("42Ti", 42 << 40),
1754            ("42Pi", 42 << 50),
1755        ];
1756
1757        for (input, expected) in cases {
1758            let quantity = parse_k8s_quantity(input).unwrap();
1759            let number = quantity.try_to_integer(0, false).unwrap();
1760            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1761        }
1762    }
1763}