Skip to main content

mz_orchestrator_kubernetes/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27    Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28    NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29    ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30    PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31    PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32    SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33    Sysctl, Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34    WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38    LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use k8s_openapi::jiff::Timestamp;
41use kube::ResourceExt;
42use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
43use kube::client::Client;
44use kube::error::Error as K8sError;
45use kube::runtime::{WatchStreamExt, watcher};
46use maplit::btreemap;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
49use mz_orchestrator::{
50    DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
51    OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
52    ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
53};
54use mz_ore::cast::CastInto;
55use mz_ore::retry::Retry;
56use mz_ore::task::AbortOnDropHandle;
57use serde::Deserialize;
58use sha2::{Digest, Sha256};
59use tokio::sync::{mpsc, oneshot};
60use tracing::{error, info, warn};
61
62pub mod cloud_resource_controller;
63pub mod secrets;
64pub mod util;
65
66const FIELD_MANAGER: &str = "environmentd";
67const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
68
69const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
70
71/// Configures a [`KubernetesOrchestrator`].
72#[derive(Debug, Clone)]
73pub struct KubernetesOrchestratorConfig {
74    /// The name of a Kubernetes context to use, if the Kubernetes configuration
75    /// is loaded from the local kubeconfig.
76    pub context: String,
77    /// The name of a non-default Kubernetes scheduler to use, if any.
78    pub scheduler_name: Option<String>,
79    /// Annotations to install on every service created by the orchestrator.
80    pub service_annotations: BTreeMap<String, String>,
81    /// Labels to install on every service created by the orchestrator.
82    pub service_labels: BTreeMap<String, String>,
83    /// Node selector to install on every service created by the orchestrator.
84    pub service_node_selector: BTreeMap<String, String>,
85    /// Affinity to install on every service created by the orchestrator.
86    pub service_affinity: Option<String>,
87    /// Tolerations to install on every service created by the orchestrator.
88    pub service_tolerations: Option<String>,
89    /// The service account that each service should run as, if any.
90    pub service_account: Option<String>,
91    /// The image pull policy to set for services created by the orchestrator.
92    pub image_pull_policy: KubernetesImagePullPolicy,
93    /// An AWS external ID prefix to use when making AWS operations on behalf
94    /// of the environment.
95    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
96    /// Whether to use code coverage mode or not. Always false for production.
97    pub coverage: bool,
98    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
99    /// services that request disk.
100    ///
101    /// If unspecified, the orchestrator will refuse to create services that
102    /// request disk.
103    pub ephemeral_volume_storage_class: Option<String>,
104    /// The optional fs group for service's pods' `securityContext`.
105    pub service_fs_group: Option<i64>,
106    /// The prefix to prepend to all object names
107    pub name_prefix: Option<String>,
108    /// Whether we should attempt to collect metrics from kubernetes
109    pub collect_pod_metrics: bool,
110    /// Whether to annotate pods for prometheus service discovery.
111    pub enable_prometheus_scrape_annotations: bool,
112}
113
114impl KubernetesOrchestratorConfig {
115    pub fn name_prefix(&self) -> String {
116        self.name_prefix.clone().unwrap_or_default()
117    }
118}
119
120/// Specifies whether Kubernetes should pull Docker images when creating pods.
121#[derive(ValueEnum, Debug, Clone, Copy)]
122pub enum KubernetesImagePullPolicy {
123    /// Always pull the Docker image from the registry.
124    Always,
125    /// Pull the Docker image only if the image is not present.
126    IfNotPresent,
127    /// Never pull the Docker image.
128    Never,
129}
130
131impl fmt::Display for KubernetesImagePullPolicy {
132    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133        match self {
134            KubernetesImagePullPolicy::Always => f.write_str("Always"),
135            KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
136            KubernetesImagePullPolicy::Never => f.write_str("Never"),
137        }
138    }
139}
140
141impl KubernetesImagePullPolicy {
142    pub fn as_kebab_case_str(&self) -> &'static str {
143        match self {
144            Self::Always => "always",
145            Self::IfNotPresent => "if-not-present",
146            Self::Never => "never",
147        }
148    }
149}
150
151/// An orchestrator backed by Kubernetes.
152pub struct KubernetesOrchestrator {
153    client: Client,
154    kubernetes_namespace: String,
155    config: KubernetesOrchestratorConfig,
156    secret_api: Api<Secret>,
157    vpc_endpoint_api: Api<VpcEndpoint>,
158    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
159    resource_reader: Arc<KubernetesResourceReader>,
160}
161
162impl fmt::Debug for KubernetesOrchestrator {
163    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164        f.debug_struct("KubernetesOrchestrator").finish()
165    }
166}
167
168impl KubernetesOrchestrator {
169    /// Creates a new Kubernetes orchestrator from the provided configuration.
170    pub async fn new(
171        config: KubernetesOrchestratorConfig,
172    ) -> Result<KubernetesOrchestrator, anyhow::Error> {
173        let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
174        let resource_reader =
175            Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
176        Ok(KubernetesOrchestrator {
177            client: client.clone(),
178            kubernetes_namespace,
179            config,
180            secret_api: Api::default_namespaced(client.clone()),
181            vpc_endpoint_api: Api::default_namespaced(client),
182            namespaces: Mutex::new(BTreeMap::new()),
183            resource_reader,
184        })
185    }
186}
187
188impl Orchestrator for KubernetesOrchestrator {
189    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
190        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
191        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
192            let (command_tx, command_rx) = mpsc::unbounded_channel();
193            let worker = OrchestratorWorker {
194                metrics_api: Api::default_namespaced(self.client.clone()),
195                service_api: Api::default_namespaced(self.client.clone()),
196                stateful_set_api: Api::default_namespaced(self.client.clone()),
197                pod_api: Api::default_namespaced(self.client.clone()),
198                owner_references: vec![],
199                command_rx,
200                name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
201                collect_pod_metrics: self.config.collect_pod_metrics,
202            }
203            .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
204
205            Arc::new(NamespacedKubernetesOrchestrator {
206                pod_api: Api::default_namespaced(self.client.clone()),
207                kubernetes_namespace: self.kubernetes_namespace.clone(),
208                namespace: namespace.into(),
209                config: self.config.clone(),
210                // TODO(guswynn): make this configurable.
211                scheduling_config: Default::default(),
212                service_infos: std::sync::Mutex::new(BTreeMap::new()),
213                command_tx,
214                _worker: worker,
215            })
216        }))
217    }
218}
219
220#[derive(Clone, Copy)]
221struct ServiceInfo {
222    scale: NonZero<u16>,
223}
224
225struct NamespacedKubernetesOrchestrator {
226    pod_api: Api<Pod>,
227    kubernetes_namespace: String,
228    namespace: String,
229    config: KubernetesOrchestratorConfig,
230    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
231    service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
232    command_tx: mpsc::UnboundedSender<WorkerCommand>,
233    _worker: AbortOnDropHandle<()>,
234}
235
236impl fmt::Debug for NamespacedKubernetesOrchestrator {
237    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238        f.debug_struct("NamespacedKubernetesOrchestrator")
239            .field("kubernetes_namespace", &self.kubernetes_namespace)
240            .field("namespace", &self.namespace)
241            .field("config", &self.config)
242            .finish()
243    }
244}
245
246/// Commands sent from a [`NamespacedKubernetesOrchestrator`] to its
247/// [`OrchestratorWorker`].
248///
249/// Commands for which the caller expects a result include a `result_tx` on which the
250/// [`OrchestratorWorker`] will deliver the result.
251enum WorkerCommand {
252    EnsureService {
253        desc: ServiceDescription,
254    },
255    DropService {
256        name: String,
257    },
258    ListServices {
259        namespace: String,
260        result_tx: oneshot::Sender<Vec<String>>,
261    },
262    FetchServiceMetrics {
263        name: String,
264        info: ServiceInfo,
265        result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
266    },
267}
268
269/// A description of a service to be created by an [`OrchestratorWorker`].
270#[derive(Debug, Clone)]
271struct ServiceDescription {
272    name: String,
273    scale: NonZero<u16>,
274    service: K8sService,
275    stateful_set: StatefulSet,
276    pod_template_hash: String,
277}
278
279/// A task executing blocking work for a [`NamespacedKubernetesOrchestrator`] in the background.
280///
281/// This type exists to enable making [`NamespacedKubernetesOrchestrator::ensure_service`] and
282/// [`NamespacedKubernetesOrchestrator::drop_service`] non-blocking, allowing invocation of these
283/// methods in latency-sensitive contexts.
284///
285/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
286/// orchestrator calls that query service state (such as `list_services`). These need to be
287/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
288/// ensure that a `list_services` result contains exactly those services that were previously
289/// created with `ensure_service` and not yet dropped with `drop_service`.
290struct OrchestratorWorker {
291    metrics_api: Api<PodMetrics>,
292    service_api: Api<K8sService>,
293    stateful_set_api: Api<StatefulSet>,
294    pod_api: Api<Pod>,
295    owner_references: Vec<OwnerReference>,
296    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
297    name_prefix: String,
298    collect_pod_metrics: bool,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainer {
303    pub name: String,
304    pub usage: PodMetricsContainerUsage,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetricsContainerUsage {
309    pub cpu: Quantity,
310    pub memory: Quantity,
311}
312
313#[derive(Deserialize, Clone, Debug)]
314pub struct PodMetrics {
315    pub metadata: ObjectMeta,
316    pub timestamp: String,
317    pub window: String,
318    pub containers: Vec<PodMetricsContainer>,
319}
320
321impl k8s_openapi::Resource for PodMetrics {
322    const GROUP: &'static str = "metrics.k8s.io";
323    const KIND: &'static str = "PodMetrics";
324    const VERSION: &'static str = "v1beta1";
325    const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
326    const URL_PATH_SEGMENT: &'static str = "pods";
327
328    type Scope = k8s_openapi::NamespaceResourceScope;
329}
330
331impl k8s_openapi::Metadata for PodMetrics {
332    type Ty = ObjectMeta;
333
334    fn metadata(&self) -> &Self::Ty {
335        &self.metadata
336    }
337
338    fn metadata_mut(&mut self) -> &mut Self::Ty {
339        &mut self.metadata
340    }
341}
342
343// Note that these types are very weird. We are `get`-ing a
344// `List` object, and lying about it having an `ObjectMeta`
345// (it deserializes as empty, but we don't need it). The custom
346// metrics API is designed this way, which is very non-standard.
347// A discussion in the `kube` channel in the `tokio` discord
348// confirmed that this layout + using `get_subresource` is the
349// best way to handle this.
350
351#[derive(Deserialize, Clone, Debug)]
352pub struct MetricIdentifier {
353    #[serde(rename = "metricName")]
354    pub name: String,
355    // We skip `selector` for now, as we don't use it
356}
357
358#[derive(Deserialize, Clone, Debug)]
359pub struct MetricValue {
360    #[serde(rename = "describedObject")]
361    pub described_object: ObjectReference,
362    #[serde(flatten)]
363    pub metric_identifier: MetricIdentifier,
364    pub timestamp: String,
365    pub value: Quantity,
366    // We skip `windowSeconds`, as we don't need it
367}
368
369impl NamespacedKubernetesOrchestrator {
370    fn service_name(&self, id: &str) -> String {
371        format!(
372            "{}{}-{id}",
373            self.config.name_prefix.as_deref().unwrap_or(""),
374            self.namespace
375        )
376    }
377
378    /// Return a `watcher::Config` instance that limits results to the namespace
379    /// assigned to this orchestrator.
380    fn watch_pod_params(&self) -> watcher::Config {
381        let ns_selector = format!(
382            "environmentd.materialize.cloud/namespace={}",
383            self.namespace
384        );
385        // This watcher timeout must be shorter than the client read timeout.
386        watcher::Config::default().timeout(59).labels(&ns_selector)
387    }
388
389    /// Convert a higher-level label key to the actual one we
390    /// will give to Kubernetes
391    fn make_label_key(&self, key: &str) -> String {
392        format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
393    }
394
395    fn label_selector_to_k8s(
396        &self,
397        MzLabelSelector { label_name, logic }: MzLabelSelector,
398    ) -> Result<LabelSelectorRequirement, anyhow::Error> {
399        let (operator, values) = match logic {
400            LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
401            LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
402            LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
403            LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
404            LabelSelectionLogic::InSet { values } => {
405                if values.is_empty() {
406                    Err(anyhow!(
407                        "Invalid selector logic for {label_name}: empty `in` set"
408                    ))
409                } else {
410                    Ok(("In", values))
411                }
412            }
413            LabelSelectionLogic::NotInSet { values } => {
414                if values.is_empty() {
415                    Err(anyhow!(
416                        "Invalid selector logic for {label_name}: empty `notin` set"
417                    ))
418                } else {
419                    Ok(("NotIn", values))
420                }
421            }
422        }?;
423        let lsr = LabelSelectorRequirement {
424            key: self.make_label_key(&label_name),
425            operator: operator.to_string(),
426            values: Some(values),
427        };
428        Ok(lsr)
429    }
430
431    fn send_command(&self, cmd: WorkerCommand) {
432        self.command_tx.send(cmd).expect("worker task not dropped");
433    }
434}
435
436#[derive(Debug)]
437struct ScaledQuantity {
438    integral_part: u64,
439    exponent: i8,
440    base10: bool,
441}
442
443impl ScaledQuantity {
444    pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
445        if base10 != self.base10 {
446            return None;
447        }
448        let exponent = self.exponent - scale;
449        let mut result = self.integral_part;
450        let base = if self.base10 { 10 } else { 2 };
451        if exponent < 0 {
452            for _ in exponent..0 {
453                result /= base;
454            }
455        } else {
456            for _ in 0..exponent {
457                result = result.checked_mul(base)?;
458            }
459        }
460        Some(result)
461    }
462}
463
464// Parse a k8s `Quantity` object
465// into a numeric value.
466//
467// This is intended to support collecting CPU and Memory data.
468// Thus, there are a few that things Kubernetes attempts to do, that we don't,
469// because I've never observed metrics-server specifically sending them:
470// (1) Handle negative numbers (because it's not useful for that use-case)
471// (2) Handle non-integers (because I have never observed them being actually sent)
472// (3) Handle scientific notation (e.g. 1.23e2)
473fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
474    const DEC_SUFFIXES: &[(&str, i8)] = &[
475        ("n", -9),
476        ("u", -6),
477        ("m", -3),
478        ("", 0),
479        ("k", 3), // yep, intentionally lowercase.
480        ("M", 6),
481        ("G", 9),
482        ("T", 12),
483        ("P", 15),
484        ("E", 18),
485    ];
486    const BIN_SUFFIXES: &[(&str, i8)] = &[
487        ("", 0),
488        ("Ki", 10),
489        ("Mi", 20),
490        ("Gi", 30),
491        ("Ti", 40),
492        ("Pi", 50),
493        ("Ei", 60),
494    ];
495
496    let (positive, s) = match s.chars().next() {
497        Some('+') => (true, &s[1..]),
498        Some('-') => (false, &s[1..]),
499        _ => (true, s),
500    };
501
502    if !positive {
503        anyhow::bail!("Negative numbers not supported")
504    }
505
506    fn is_suffix_char(ch: char) -> bool {
507        "numkMGTPEKi".contains(ch)
508    }
509    let (num, suffix) = match s.find(is_suffix_char) {
510        None => (s, ""),
511        Some(idx) => s.split_at(idx),
512    };
513    let num: u64 = num.parse()?;
514    let (exponent, base10) = if let Some((_, exponent)) =
515        DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
516    {
517        (exponent, true)
518    } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
519        (exponent, false)
520    } else {
521        anyhow::bail!("Unrecognized suffix: {suffix}");
522    };
523    Ok(ScaledQuantity {
524        integral_part: num,
525        exponent: *exponent,
526        base10,
527    })
528}
529
530#[async_trait]
531impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
532    async fn fetch_service_metrics(
533        &self,
534        id: &str,
535    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
536        let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
537            *info
538        } else {
539            // This should have been set in `ensure_service`.
540            tracing::error!("Failed to get info for {id}");
541            anyhow::bail!("Failed to get info for {id}");
542        };
543
544        let (result_tx, result_rx) = oneshot::channel();
545        self.send_command(WorkerCommand::FetchServiceMetrics {
546            name: self.service_name(id),
547            info,
548            result_tx,
549        });
550
551        let metrics = result_rx.await.expect("worker task not dropped");
552        Ok(metrics)
553    }
554
555    fn ensure_service(
556        &self,
557        id: &str,
558        ServiceConfig {
559            image,
560            init_container_image,
561            args,
562            ports: ports_in,
563            memory_limit,
564            memory_request,
565            cpu_limit,
566            cpu_request,
567            scale,
568            labels: labels_in,
569            annotations: annotations_in,
570            availability_zones,
571            other_replicas_selector,
572            replicas_selector,
573            disk_limit,
574            node_selector,
575        }: ServiceConfig,
576    ) -> Result<Box<dyn Service>, anyhow::Error> {
577        // This is extremely cheap to clone, so just look into the lock once.
578        let scheduling_config: ServiceSchedulingConfig =
579            self.scheduling_config.read().expect("poisoned").clone();
580
581        // Enable disk if the size does not disable it.
582        let disk = disk_limit != Some(DiskLimit::ZERO);
583
584        let name = self.service_name(id);
585        // The match labels should be the minimal set of labels that uniquely
586        // identify the pods in the stateful set. Changing these after the
587        // `StatefulSet` is created is not permitted by Kubernetes, and we're
588        // not yet smart enough to handle deleting and recreating the
589        // `StatefulSet`.
590        let mut match_labels = btreemap! {
591            "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
592            "environmentd.materialize.cloud/service-id".into() => id.into(),
593        };
594        for (key, value) in &self.config.service_labels {
595            match_labels.insert(key.clone(), value.clone());
596        }
597
598        let mut labels = match_labels.clone();
599        for (key, value) in labels_in {
600            labels.insert(self.make_label_key(&key), value);
601        }
602
603        labels.insert(self.make_label_key("scale"), scale.to_string());
604
605        for port in &ports_in {
606            labels.insert(
607                format!("environmentd.materialize.cloud/port-{}", port.name),
608                "true".into(),
609            );
610        }
611        let mut limits = BTreeMap::new();
612        let mut requests = BTreeMap::new();
613        if let Some(memory_limit) = memory_limit {
614            limits.insert(
615                "memory".into(),
616                Quantity(memory_limit.0.as_u64().to_string()),
617            );
618            requests.insert(
619                "memory".into(),
620                Quantity(memory_limit.0.as_u64().to_string()),
621            );
622        }
623        if let Some(memory_request) = memory_request {
624            requests.insert(
625                "memory".into(),
626                Quantity(memory_request.0.as_u64().to_string()),
627            );
628        }
629        if let Some(cpu_limit) = cpu_limit {
630            limits.insert(
631                "cpu".into(),
632                Quantity(format!("{}m", cpu_limit.as_millicpus())),
633            );
634            requests.insert(
635                "cpu".into(),
636                Quantity(format!("{}m", cpu_limit.as_millicpus())),
637            );
638        }
639        if let Some(cpu_request) = cpu_request {
640            requests.insert(
641                "cpu".into(),
642                Quantity(format!("{}m", cpu_request.as_millicpus())),
643            );
644        }
645        let service = K8sService {
646            metadata: ObjectMeta {
647                name: Some(name.clone()),
648                ..Default::default()
649            },
650            spec: Some(ServiceSpec {
651                ports: Some(
652                    ports_in
653                        .iter()
654                        .map(|port| ServicePort {
655                            port: port.port_hint.into(),
656                            name: Some(port.name.clone()),
657                            ..Default::default()
658                        })
659                        .collect(),
660                ),
661                cluster_ip: Some("None".to_string()),
662                selector: Some(match_labels.clone()),
663                ..Default::default()
664            }),
665            status: None,
666        };
667
668        let hosts = (0..scale.get())
669            .map(|i| {
670                format!(
671                    "{name}-{i}.{name}.{}.svc.cluster.local",
672                    self.kubernetes_namespace
673                )
674            })
675            .collect::<Vec<_>>();
676        let ports = ports_in
677            .iter()
678            .map(|p| (p.name.clone(), p.port_hint))
679            .collect::<BTreeMap<_, _>>();
680
681        let mut listen_addrs = BTreeMap::new();
682        let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
683        for (name, port) in &ports {
684            listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
685            for (i, host) in hosts.iter().enumerate() {
686                peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
687            }
688        }
689        let mut args = args(ServiceAssignments {
690            listen_addrs: &listen_addrs,
691            peer_addrs: &peer_addrs,
692        });
693
694        // This constrains the orchestrator (for those orchestrators that support
695        // anti-affinity, today just k8s) to never schedule pods for different replicas
696        // of the same cluster on the same node. Pods from the _same_ replica are fine;
697        // pods from different clusters are also fine.
698        //
699        // The point is that if pods of two replicas are on the same node, that node
700        // going down would kill both replicas, and so the replication factor of the
701        // cluster in question is illusory.
702        let anti_affinity = Some({
703            let label_selector_requirements = other_replicas_selector
704                .clone()
705                .into_iter()
706                .map(|ls| self.label_selector_to_k8s(ls))
707                .collect::<Result<Vec<_>, _>>()?;
708            let ls = LabelSelector {
709                match_expressions: Some(label_selector_requirements),
710                ..Default::default()
711            };
712            let pat = PodAffinityTerm {
713                label_selector: Some(ls),
714                topology_key: "kubernetes.io/hostname".to_string(),
715                ..Default::default()
716            };
717
718            if !scheduling_config.soften_replication_anti_affinity {
719                PodAntiAffinity {
720                    required_during_scheduling_ignored_during_execution: Some(vec![pat]),
721                    ..Default::default()
722                }
723            } else {
724                PodAntiAffinity {
725                    preferred_during_scheduling_ignored_during_execution: Some(vec![
726                        WeightedPodAffinityTerm {
727                            weight: scheduling_config.soften_replication_anti_affinity_weight,
728                            pod_affinity_term: pat,
729                        },
730                    ]),
731                    ..Default::default()
732                }
733            }
734        });
735
736        let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
737            // `match_labels` sufficiently selects pods in the same replica.
738            let ls = LabelSelector {
739                match_labels: Some(match_labels.clone()),
740                ..Default::default()
741            };
742            let pat = PodAffinityTerm {
743                label_selector: Some(ls),
744                topology_key: "topology.kubernetes.io/zone".to_string(),
745                ..Default::default()
746            };
747
748            Some(PodAffinity {
749                preferred_during_scheduling_ignored_during_execution: Some(vec![
750                    WeightedPodAffinityTerm {
751                        weight,
752                        pod_affinity_term: pat,
753                    },
754                ]),
755                ..Default::default()
756            })
757        } else {
758            None
759        };
760
761        let topology_spread = if scheduling_config.topology_spread.enabled {
762            let config = &scheduling_config.topology_spread;
763
764            if !config.ignore_non_singular_scale || scale.get() == 1 {
765                let label_selector_requirements = (if config.ignore_non_singular_scale {
766                    let mut replicas_selector_ignoring_scale = replicas_selector.clone();
767
768                    replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
769                        label_name: "scale".into(),
770                        logic: mz_orchestrator::LabelSelectionLogic::Eq {
771                            value: "1".to_string(),
772                        },
773                    });
774
775                    replicas_selector_ignoring_scale
776                } else {
777                    replicas_selector
778                })
779                .into_iter()
780                .map(|ls| self.label_selector_to_k8s(ls))
781                .collect::<Result<Vec<_>, _>>()?;
782                let ls = LabelSelector {
783                    match_expressions: Some(label_selector_requirements),
784                    ..Default::default()
785                };
786
787                if config.soft && config.min_domains.is_some() {
788                    warn!(
789                        "topology spread is soft but min_domains is set; \
790                         Kubernetes rejects minDomains with ScheduleAnyway, \
791                         so min_domains will be ignored"
792                    );
793                }
794                if availability_zones.is_some() && config.min_domains.is_some() {
795                    warn!(
796                        "topology spread has min_domains set but availability_zones \
797                         constrains eligible topology domains via node affinity; \
798                         minDomains will be ignored to avoid preventing pod scheduling"
799                    );
800                }
801
802                let constraint = TopologySpreadConstraint {
803                    label_selector: Some(ls),
804                    min_domains: topology_spread_min_domains(
805                        config.soft,
806                        availability_zones.is_some(),
807                        config.min_domains,
808                    ),
809                    max_skew: config.max_skew,
810                    topology_key: "topology.kubernetes.io/zone".to_string(),
811                    when_unsatisfiable: if config.soft {
812                        "ScheduleAnyway".to_string()
813                    } else {
814                        "DoNotSchedule".to_string()
815                    },
816                    // TODO(guswynn): restore these once they are supported.
817                    // Consider node affinities when calculating topology spread. This is the
818                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_affinity_policy>,
819                    // made explicit.
820                    // node_affinity_policy: Some("Honor".to_string()),
821                    // Do not consider node taints when calculating topology spread. This is the
822                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_taints_policy>,
823                    // made explicit.
824                    // node_taints_policy: Some("Ignore".to_string()),
825                    match_label_keys: None,
826                    // Once the above are restorted, we should't have `..Default::default()` here because the specifics of these fields are
827                    // subtle enough where we want compilation failures when we upgrade
828                    ..Default::default()
829                };
830                Some(vec![constraint])
831            } else {
832                None
833            }
834        } else {
835            None
836        };
837
838        let mut pod_annotations = btreemap! {
839            // Prevent the cluster-autoscaler (or karpenter) from evicting these pods in attempts to scale down
840            // and terminate nodes.
841            // This will cost us more money, but should give us better uptime.
842            // This does not prevent all evictions by Kubernetes, only the ones initiated by the
843            // cluster-autoscaler (or karpenter). Notably, eviction of pods for resource overuse is still enabled.
844            "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
845            "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
846
847            // It's called do-not-disrupt in newer versions of karpenter, so adding for forward/backward compatibility
848            "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
849        };
850        for (key, value) in annotations_in {
851            // We want to use the same prefix as our labels keys
852            pod_annotations.insert(self.make_label_key(&key), value);
853        }
854        if self.config.enable_prometheus_scrape_annotations {
855            if let Some(internal_http_port) = ports_in
856                .iter()
857                .find(|port| port.name == "internal-http")
858                .map(|port| port.port_hint.to_string())
859            {
860                // Enable prometheus scrape discovery
861                pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
862                pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
863                pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
864                pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
865            }
866        }
867        for (key, value) in &self.config.service_annotations {
868            pod_annotations.insert(key.clone(), value.clone());
869        }
870
871        let default_node_selector = if disk {
872            vec![("materialize.cloud/disk".to_string(), disk.to_string())]
873        } else {
874            // if the cluster doesn't require disk, we can omit the selector
875            // allowing it to be scheduled onto nodes with and without the
876            // selector
877            vec![]
878        };
879
880        let node_selector: BTreeMap<String, String> = default_node_selector
881            .into_iter()
882            .chain(self.config.service_node_selector.clone())
883            .chain(node_selector)
884            .collect();
885
886        let node_affinity = if let Some(availability_zones) = availability_zones {
887            let selector = NodeSelectorTerm {
888                match_expressions: Some(vec![NodeSelectorRequirement {
889                    key: "materialize.cloud/availability-zone".to_string(),
890                    operator: "In".to_string(),
891                    values: Some(availability_zones),
892                }]),
893                match_fields: None,
894            };
895
896            if scheduling_config.soften_az_affinity {
897                Some(NodeAffinity {
898                    preferred_during_scheduling_ignored_during_execution: Some(vec![
899                        PreferredSchedulingTerm {
900                            preference: selector,
901                            weight: scheduling_config.soften_az_affinity_weight,
902                        },
903                    ]),
904                    required_during_scheduling_ignored_during_execution: None,
905                })
906            } else {
907                Some(NodeAffinity {
908                    preferred_during_scheduling_ignored_during_execution: None,
909                    required_during_scheduling_ignored_during_execution: Some(NodeSelector {
910                        node_selector_terms: vec![selector],
911                    }),
912                })
913            }
914        } else {
915            None
916        };
917
918        let mut affinity = Affinity {
919            pod_anti_affinity: anti_affinity,
920            pod_affinity,
921            node_affinity,
922            ..Default::default()
923        };
924        if let Some(service_affinity) = &self.config.service_affinity {
925            affinity.merge_from(serde_json::from_str(service_affinity)?);
926        }
927
928        let container_name = image
929            .rsplit_once('/')
930            .and_then(|(_, name_version)| name_version.rsplit_once(':'))
931            .context("`image` is not ORG/NAME:VERSION")?
932            .0
933            .to_string();
934
935        let container_security_context = if scheduling_config.security_context_enabled {
936            Some(SecurityContext {
937                privileged: Some(false),
938                run_as_non_root: Some(true),
939                allow_privilege_escalation: Some(false),
940                seccomp_profile: Some(SeccompProfile {
941                    type_: "RuntimeDefault".to_string(),
942                    ..Default::default()
943                }),
944                capabilities: Some(Capabilities {
945                    drop: Some(vec!["ALL".to_string()]),
946                    ..Default::default()
947                }),
948                ..Default::default()
949            })
950        } else {
951            None
952        };
953
954        let init_containers = init_container_image.map(|image| {
955            vec![Container {
956                name: "init".to_string(),
957                image: Some(image),
958                image_pull_policy: Some(self.config.image_pull_policy.to_string()),
959                resources: Some(ResourceRequirements {
960                    claims: None,
961                    limits: Some(limits.clone()),
962                    requests: Some(requests.clone()),
963                }),
964                security_context: container_security_context.clone(),
965                env: Some(vec![
966                    EnvVar {
967                        name: "MZ_NAMESPACE".to_string(),
968                        value_from: Some(EnvVarSource {
969                            field_ref: Some(ObjectFieldSelector {
970                                field_path: "metadata.namespace".to_string(),
971                                ..Default::default()
972                            }),
973                            ..Default::default()
974                        }),
975                        ..Default::default()
976                    },
977                    EnvVar {
978                        name: "MZ_POD_NAME".to_string(),
979                        value_from: Some(EnvVarSource {
980                            field_ref: Some(ObjectFieldSelector {
981                                field_path: "metadata.name".to_string(),
982                                ..Default::default()
983                            }),
984                            ..Default::default()
985                        }),
986                        ..Default::default()
987                    },
988                    EnvVar {
989                        name: "MZ_NODE_NAME".to_string(),
990                        value_from: Some(EnvVarSource {
991                            field_ref: Some(ObjectFieldSelector {
992                                field_path: "spec.nodeName".to_string(),
993                                ..Default::default()
994                            }),
995                            ..Default::default()
996                        }),
997                        ..Default::default()
998                    },
999                ]),
1000                ..Default::default()
1001            }]
1002        });
1003
1004        let env = if self.config.coverage {
1005            Some(vec![EnvVar {
1006                name: "LLVM_PROFILE_FILE".to_string(),
1007                value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
1008                ..Default::default()
1009            }])
1010        } else {
1011            None
1012        };
1013
1014        let mut volume_mounts = vec![];
1015
1016        if self.config.coverage {
1017            volume_mounts.push(VolumeMount {
1018                name: "coverage".to_string(),
1019                mount_path: "/coverage".to_string(),
1020                ..Default::default()
1021            })
1022        }
1023
1024        let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1025            (true, Some(ephemeral_volume_storage_class)) => {
1026                volume_mounts.push(VolumeMount {
1027                    name: "scratch".to_string(),
1028                    mount_path: "/scratch".to_string(),
1029                    ..Default::default()
1030                });
1031                args.push("--scratch-directory=/scratch".into());
1032
1033                Some(vec![Volume {
1034                    name: "scratch".to_string(),
1035                    ephemeral: Some(EphemeralVolumeSource {
1036                        volume_claim_template: Some(PersistentVolumeClaimTemplate {
1037                            spec: PersistentVolumeClaimSpec {
1038                                access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1039                                storage_class_name: Some(
1040                                    ephemeral_volume_storage_class.to_string(),
1041                                ),
1042                                resources: Some(VolumeResourceRequirements {
1043                                    requests: Some(BTreeMap::from([(
1044                                        "storage".to_string(),
1045                                        Quantity(
1046                                            disk_limit
1047                                                .unwrap_or(DiskLimit::ARBITRARY)
1048                                                .0
1049                                                .as_u64()
1050                                                .to_string(),
1051                                        ),
1052                                    )])),
1053                                    ..Default::default()
1054                                }),
1055                                ..Default::default()
1056                            },
1057                            ..Default::default()
1058                        }),
1059                        ..Default::default()
1060                    }),
1061                    ..Default::default()
1062                }])
1063            }
1064            (true, None) => {
1065                return Err(anyhow!(
1066                    "service requested disk but no ephemeral volume storage class was configured"
1067                ));
1068            }
1069            (false, _) => None,
1070        };
1071
1072        if let Some(name_prefix) = &self.config.name_prefix {
1073            args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1074        }
1075
1076        let volume_claim_templates = if self.config.coverage {
1077            Some(vec![PersistentVolumeClaim {
1078                metadata: ObjectMeta {
1079                    name: Some("coverage".to_string()),
1080                    ..Default::default()
1081                },
1082                spec: Some(PersistentVolumeClaimSpec {
1083                    access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1084                    resources: Some(VolumeResourceRequirements {
1085                        requests: Some(BTreeMap::from([(
1086                            "storage".to_string(),
1087                            Quantity("10Gi".to_string()),
1088                        )])),
1089                        ..Default::default()
1090                    }),
1091                    ..Default::default()
1092                }),
1093                ..Default::default()
1094            }])
1095        } else {
1096            None
1097        };
1098
1099        let tcp_keepalive_sysctls = vec![
1100            Sysctl {
1101                name: "net.ipv4.tcp_keepalive_time".to_string(),
1102                value: "300".to_string(),
1103            },
1104            Sysctl {
1105                name: "net.ipv4.tcp_keepalive_intvl".to_string(),
1106                value: "30".to_string(),
1107            },
1108            Sysctl {
1109                name: "net.ipv4.tcp_keepalive_probes".to_string(),
1110                value: "3".to_string(),
1111            },
1112        ];
1113
1114        let security_context = if let Some(fs_group) = self.config.service_fs_group {
1115            Some(PodSecurityContext {
1116                fs_group: Some(fs_group),
1117                run_as_user: Some(fs_group),
1118                run_as_group: Some(fs_group),
1119                sysctls: Some(tcp_keepalive_sysctls),
1120                ..Default::default()
1121            })
1122        } else {
1123            Some(PodSecurityContext {
1124                sysctls: Some(tcp_keepalive_sysctls),
1125                ..Default::default()
1126            })
1127        };
1128
1129        let mut tolerations = vec![
1130            // When the node becomes `NotReady` it indicates there is a problem
1131            // with the node. By default Kubernetes waits 300s (5 minutes)
1132            // before descheduling the pod, but we tune this to 30s for faster
1133            // recovery in the case of node failure.
1134            Toleration {
1135                effect: Some("NoExecute".into()),
1136                key: Some("node.kubernetes.io/not-ready".into()),
1137                operator: Some("Exists".into()),
1138                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1139                value: None,
1140            },
1141            Toleration {
1142                effect: Some("NoExecute".into()),
1143                key: Some("node.kubernetes.io/unreachable".into()),
1144                operator: Some("Exists".into()),
1145                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1146                value: None,
1147            },
1148        ];
1149        if let Some(service_tolerations) = &self.config.service_tolerations {
1150            tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1151        }
1152        let tolerations = Some(tolerations);
1153
1154        let mut pod_template_spec = PodTemplateSpec {
1155            metadata: Some(ObjectMeta {
1156                labels: Some(labels.clone()),
1157                // Only set `annotations` _after_ we have computed the pod template hash, to
1158                // avoid that annotation changes cause pod replacements.
1159                ..Default::default()
1160            }),
1161            spec: Some(PodSpec {
1162                init_containers,
1163                containers: vec![Container {
1164                    name: container_name,
1165                    image: Some(image),
1166                    args: Some(args),
1167                    image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1168                    ports: Some(
1169                        ports_in
1170                            .iter()
1171                            .map(|port| ContainerPort {
1172                                container_port: port.port_hint.into(),
1173                                name: Some(port.name.clone()),
1174                                ..Default::default()
1175                            })
1176                            .collect(),
1177                    ),
1178                    security_context: container_security_context.clone(),
1179                    resources: Some(ResourceRequirements {
1180                        claims: None,
1181                        limits: Some(limits),
1182                        requests: Some(requests),
1183                    }),
1184                    volume_mounts: if !volume_mounts.is_empty() {
1185                        Some(volume_mounts)
1186                    } else {
1187                        None
1188                    },
1189                    env,
1190                    ..Default::default()
1191                }],
1192                volumes,
1193                security_context,
1194                node_selector: Some(node_selector),
1195                scheduler_name: self.config.scheduler_name.clone(),
1196                service_account: self.config.service_account.clone(),
1197                affinity: Some(affinity),
1198                topology_spread_constraints: topology_spread,
1199                tolerations,
1200                // Setting a 0s termination grace period has the side effect of
1201                // automatically starting a new pod when the previous pod is
1202                // currently terminating. This enables recovery from a node
1203                // failure with no manual intervention. Without this setting,
1204                // the StatefulSet controller will refuse to start a new pod
1205                // until the failed node is manually removed from the Kubernetes
1206                // cluster.
1207                //
1208                // The Kubernetes documentation strongly advises against this
1209                // setting, as StatefulSets attempt to provide "at most once"
1210                // semantics [0]--that is, the guarantee that for a given pod in
1211                // a StatefulSet there is *at most* one pod with that identity
1212                // running in the cluster.
1213                //
1214                // Materialize services, however, are carefully designed to
1215                // *not* rely on this guarantee. In fact, we do not believe that
1216                // correct distributed systems can meaningfully rely on
1217                // Kubernetes's guarantee--network packets from a pod can be
1218                // arbitrarily delayed, long past that pod's termination.
1219                //
1220                // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1221                termination_grace_period_seconds: Some(0),
1222                ..Default::default()
1223            }),
1224        };
1225        let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1226        let mut hasher = Sha256::new();
1227        hasher.update(pod_template_json);
1228        let pod_template_hash = format!("{:x}", hasher.finalize());
1229        pod_annotations.insert(
1230            POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1231            pod_template_hash.clone(),
1232        );
1233
1234        pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1235
1236        let stateful_set = StatefulSet {
1237            metadata: ObjectMeta {
1238                name: Some(name.clone()),
1239                ..Default::default()
1240            },
1241            spec: Some(StatefulSetSpec {
1242                selector: LabelSelector {
1243                    match_labels: Some(match_labels),
1244                    ..Default::default()
1245                },
1246                service_name: Some(name.clone()),
1247                replicas: Some(scale.cast_into()),
1248                template: pod_template_spec,
1249                update_strategy: Some(StatefulSetUpdateStrategy {
1250                    type_: Some("OnDelete".to_owned()),
1251                    ..Default::default()
1252                }),
1253                pod_management_policy: Some("Parallel".to_string()),
1254                volume_claim_templates,
1255                ..Default::default()
1256            }),
1257            status: None,
1258        };
1259
1260        self.send_command(WorkerCommand::EnsureService {
1261            desc: ServiceDescription {
1262                name,
1263                scale,
1264                service,
1265                stateful_set,
1266                pod_template_hash,
1267            },
1268        });
1269
1270        self.service_infos
1271            .lock()
1272            .expect("poisoned lock")
1273            .insert(id.to_string(), ServiceInfo { scale });
1274
1275        Ok(Box::new(KubernetesService { hosts, ports }))
1276    }
1277
1278    /// Drops the identified service, if it exists.
1279    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1280        fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1281        self.service_infos.lock().expect("poisoned lock").remove(id);
1282
1283        self.send_command(WorkerCommand::DropService {
1284            name: self.service_name(id),
1285        });
1286
1287        Ok(())
1288    }
1289
1290    /// Lists the identifiers of all known services.
1291    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1292        let (result_tx, result_rx) = oneshot::channel();
1293        self.send_command(WorkerCommand::ListServices {
1294            namespace: self.namespace.clone(),
1295            result_tx,
1296        });
1297
1298        let list = result_rx.await.expect("worker task not dropped");
1299        Ok(list)
1300    }
1301
1302    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1303        fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1304            let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1305            let service_id_label = "environmentd.materialize.cloud/service-id";
1306            let service_id = pod
1307                .labels()
1308                .get(service_id_label)
1309                .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1310                .clone();
1311
1312            let oomed = pod
1313                .status
1314                .as_ref()
1315                .and_then(|status| status.container_statuses.as_ref())
1316                .map(|container_statuses| {
1317                    container_statuses.iter().any(|cs| {
1318                        // The container might have already transitioned from "terminated" to
1319                        // "waiting"/"running" state, in which case we need to check its previous
1320                        // state to find out why it terminated.
1321                        let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1322                        let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1323                        let termination_state = current_state.or(last_state);
1324
1325                        // The interesting exit codes are:
1326                        //  * 135 (SIGBUS): occurs when lgalloc runs out of disk
1327                        //  * 137 (SIGKILL): occurs when the OOM killer terminates the container
1328                        //  * 167: occurs when the lgalloc or memory limiter terminates the process
1329                        // We treat the all of these as OOM conditions since swap and lgalloc use
1330                        // disk only for spilling memory.
1331                        let exit_code = termination_state.map(|s| s.exit_code);
1332                        exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1333                    })
1334                })
1335                .unwrap_or(false);
1336
1337            let (pod_ready, last_probe_time) = pod
1338                .status
1339                .and_then(|status| status.conditions)
1340                .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1341                .map(|c| (c.status == "True", c.last_probe_time))
1342                .unwrap_or((false, None));
1343
1344            let status = if pod_ready {
1345                ServiceStatus::Online
1346            } else {
1347                ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1348            };
1349            let time = if let Some(time) = last_probe_time {
1350                time.0
1351            } else {
1352                Timestamp::now()
1353            };
1354
1355            Ok(ServiceEvent {
1356                service_id,
1357                process_id,
1358                status,
1359                time: DateTime::from_timestamp_nanos(
1360                    time.as_nanosecond().try_into().expect("must fit"),
1361                ),
1362            })
1363        }
1364
1365        let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1366            .touched_objects()
1367            .filter_map(|object| async move {
1368                match object {
1369                    Ok(pod) => Some(into_service_event(pod)),
1370                    Err(error) => {
1371                        // We assume that errors returned by Kubernetes are usually transient, so we
1372                        // just log a warning and ignore them otherwise.
1373                        tracing::warn!("service watch error: {error}");
1374                        None
1375                    }
1376                }
1377            });
1378        Box::pin(stream)
1379    }
1380
1381    fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1382        *self.scheduling_config.write().expect("poisoned") = config;
1383    }
1384}
1385
1386impl OrchestratorWorker {
1387    fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1388        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1389    }
1390
1391    async fn run(mut self) {
1392        {
1393            info!("initializing Kubernetes orchestrator worker");
1394            let start = Instant::now();
1395
1396            // Fetch the owner reference for our own pod (usually a
1397            // StatefulSet), so that we can propagate it to the services we
1398            // create.
1399            let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1400            let orchestrator_pod = Retry::default()
1401                .clamp_backoff(Duration::from_secs(10))
1402                .retry_async(|_| self.pod_api.get(&hostname))
1403                .await
1404                .expect("always retries on error");
1405            self.owner_references
1406                .extend(orchestrator_pod.owner_references().into_iter().cloned());
1407
1408            if !self.collect_pod_metrics {
1409                info!(
1410                    "pod metrics collection is disabled; resource usage graphs in the console will not be available"
1411                );
1412            }
1413
1414            info!(
1415                "Kubernetes orchestrator worker initialized in {:?}",
1416                start.elapsed()
1417            );
1418        }
1419
1420        while let Some(cmd) = self.command_rx.recv().await {
1421            self.handle_command(cmd).await;
1422        }
1423    }
1424
1425    /// Handle a worker command.
1426    ///
1427    /// If handling the command fails, it is automatically retried. All command handlers return
1428    /// [`K8sError`], so we can reasonably assume that a failure is caused by issues communicating
1429    /// with the K8S server and that retrying resolves them eventually.
1430    async fn handle_command(&self, cmd: WorkerCommand) {
1431        async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1432        where
1433            F: Fn() -> U,
1434            U: Future<Output = Result<R, K8sError>>,
1435        {
1436            Retry::default()
1437                .clamp_backoff(Duration::from_secs(10))
1438                .retry_async(|_| {
1439                    f().map_err(
1440                        |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1441                    )
1442                })
1443                .await
1444                .expect("always retries on error")
1445        }
1446
1447        use WorkerCommand::*;
1448        match cmd {
1449            EnsureService { desc } => {
1450                retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1451            }
1452            DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1453            ListServices {
1454                namespace,
1455                result_tx,
1456            } => {
1457                let result = retry(|| self.list_services(&namespace), "ListServices").await;
1458                let _ = result_tx.send(result);
1459            }
1460            FetchServiceMetrics {
1461                name,
1462                info,
1463                result_tx,
1464            } => {
1465                let result = self.fetch_service_metrics(&name, &info).await;
1466                let _ = result_tx.send(result);
1467            }
1468        }
1469    }
1470
1471    async fn fetch_service_metrics(
1472        &self,
1473        name: &str,
1474        info: &ServiceInfo,
1475    ) -> Vec<ServiceProcessMetrics> {
1476        if !self.collect_pod_metrics {
1477            return (0..info.scale.get())
1478                .map(|_| ServiceProcessMetrics::default())
1479                .collect();
1480        }
1481
1482        /// Usage metrics reported by clusterd processes.
1483        #[derive(Deserialize)]
1484        pub(crate) struct ClusterdUsage {
1485            disk_bytes: Option<u64>,
1486            memory_bytes: Option<u64>,
1487            swap_bytes: Option<u64>,
1488            heap_limit: Option<u64>,
1489        }
1490
1491        /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
1492        ///
1493        /// Note that we want to keep going even if a lookup fails for whatever reason,
1494        /// so this function is infallible. If we fail to get cpu or memory for a particular pod,
1495        /// we just log a warning and install `None` in the returned struct.
1496        async fn get_metrics(
1497            self_: &OrchestratorWorker,
1498            service_name: &str,
1499            i: usize,
1500        ) -> ServiceProcessMetrics {
1501            let name = format!("{service_name}-{i}");
1502
1503            let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1504            let (metrics, clusterd_usage) =
1505                match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1506                {
1507                    (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1508                    (Ok(metrics), Err(e)) => {
1509                        warn!("Failed to fetch clusterd usage for {name}: {e}");
1510                        (metrics, None)
1511                    }
1512                    (Err(e), _) => {
1513                        warn!("Failed to get metrics for {name}: {e}");
1514                        return ServiceProcessMetrics::default();
1515                    }
1516                };
1517            let Some(PodMetricsContainer {
1518                usage:
1519                    PodMetricsContainerUsage {
1520                        cpu: Quantity(cpu_str),
1521                        memory: Quantity(mem_str),
1522                    },
1523                ..
1524            }) = metrics.containers.get(0)
1525            else {
1526                warn!("metrics result contained no containers for {name}");
1527                return ServiceProcessMetrics::default();
1528            };
1529
1530            let mut process_metrics = ServiceProcessMetrics::default();
1531
1532            match parse_k8s_quantity(cpu_str) {
1533                Ok(q) => match q.try_to_integer(-9, true) {
1534                    Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1535                    None => error!("CPU value {q:?} out of range"),
1536                },
1537                Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1538            }
1539            match parse_k8s_quantity(mem_str) {
1540                Ok(q) => match q.try_to_integer(0, false) {
1541                    Some(mem) => process_metrics.memory_bytes = Some(mem),
1542                    None => error!("memory value {q:?} out of range"),
1543                },
1544                Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1545            }
1546
1547            if let Some(usage) = clusterd_usage {
1548                // clusterd may report disk usage as either `disk_bytes`, or `swap_bytes`, or both.
1549                //
1550                // For now the Console expects the swap size to be reported in `disk_bytes`.
1551                // Once the Console has been ported to use `heap_bytes`/`heap_limit`, we can
1552                // simplify things by setting `process_metrics.disk_bytes = usage.disk_bytes`.
1553                process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1554                    (Some(disk), Some(swap)) => Some(disk + swap),
1555                    (disk, swap) => disk.or(swap),
1556                };
1557
1558                // clusterd may report heap usage as `memory_bytes` and optionally `swap_bytes`.
1559                // If no `memory_bytes` is reported, we can't know the heap usage.
1560                process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1561                    (Some(memory), Some(swap)) => Some(memory + swap),
1562                    (Some(memory), None) => Some(memory),
1563                    (None, _) => None,
1564                };
1565
1566                process_metrics.heap_limit = usage.heap_limit;
1567            }
1568
1569            process_metrics
1570        }
1571
1572        /// Get the current usage metrics exposed by a clusterd process.
1573        ///
1574        /// Usage metrics are collected by connecting to a metrics endpoint exposed by the process.
1575        /// The endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
1576        /// `/api/usage-metrics`.
1577        async fn get_clusterd_usage(
1578            self_: &OrchestratorWorker,
1579            service_name: &str,
1580            i: usize,
1581        ) -> anyhow::Result<ClusterdUsage> {
1582            let service = self_
1583                .service_api
1584                .get(service_name)
1585                .await
1586                .with_context(|| format!("failed to get service {service_name}"))?;
1587            let namespace = service
1588                .metadata
1589                .namespace
1590                .context("missing service namespace")?;
1591            let internal_http_port = service
1592                .spec
1593                .and_then(|spec| spec.ports)
1594                .and_then(|ports| {
1595                    ports
1596                        .into_iter()
1597                        .find(|p| p.name == Some("internal-http".into()))
1598                })
1599                .map(|p| p.port);
1600            let Some(port) = internal_http_port else {
1601                bail!("internal-http port missing in service spec");
1602            };
1603            let metrics_url = format!(
1604                "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1605                 /api/usage-metrics"
1606            );
1607
1608            let http_client = reqwest::Client::builder()
1609                .timeout(Duration::from_secs(10))
1610                .build()
1611                .context("error building HTTP client")?;
1612            let resp = http_client.get(metrics_url).send().await?;
1613            let usage = resp.json().await?;
1614
1615            Ok(usage)
1616        }
1617
1618        let ret = futures::future::join_all(
1619            (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1620        );
1621
1622        ret.await
1623    }
1624
1625    async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1626        // We inject our own pod's owner references into the Kubernetes objects
1627        // created for the service so that if the
1628        // Deployment/StatefulSet/whatever that owns the pod running the
1629        // orchestrator gets deleted, so do all services spawned by this
1630        // orchestrator.
1631        desc.service
1632            .metadata
1633            .owner_references
1634            .get_or_insert(vec![])
1635            .extend(self.owner_references.iter().cloned());
1636        desc.stateful_set
1637            .metadata
1638            .owner_references
1639            .get_or_insert(vec![])
1640            .extend(self.owner_references.iter().cloned());
1641
1642        let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1643        let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1644        let pod_annotations = pod_metadata.annotations.clone();
1645
1646        self.service_api
1647            .patch(
1648                &desc.name,
1649                &PatchParams::apply(FIELD_MANAGER).force(),
1650                &Patch::Apply(desc.service),
1651            )
1652            .await?;
1653        self.stateful_set_api
1654            .patch(
1655                &desc.name,
1656                &PatchParams::apply(FIELD_MANAGER).force(),
1657                &Patch::Apply(desc.stateful_set),
1658            )
1659            .await?;
1660
1661        // We manage pod recreation manually, using the OnDelete StatefulSet update strategy, for
1662        // two reasons:
1663        //  * Kubernetes doesn't always automatically replace StatefulSet pods when their specs
1664        //    change, see https://github.com/kubernetes/kubernetes#67250.
1665        //  * Kubernetes replaces StatefulSet pods when their annotations change, which is not
1666        //    something we want as it could cause unavailability.
1667        //
1668        // Our pod recreation policy is simple: If a pod's template hash changed, delete it, and
1669        // let the StatefulSet controller recreate it. Otherwise, patch the existing pod's
1670        // annotations to line up with the ones in the spec.
1671        for pod_id in 0..desc.scale.get() {
1672            let pod_name = format!("{}-{pod_id}", desc.name);
1673            let pod = match self.pod_api.get(&pod_name).await {
1674                Ok(pod) => pod,
1675                // Pod already doesn't exist.
1676                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1677                Err(e) => return Err(e),
1678            };
1679
1680            let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1681                != Some(&desc.pod_template_hash)
1682            {
1683                self.pod_api
1684                    .delete(&pod_name, &DeleteParams::default())
1685                    .await
1686                    .map(|_| ())
1687            } else {
1688                let metadata = ObjectMeta {
1689                    annotations: pod_annotations.clone(),
1690                    ..Default::default()
1691                }
1692                .into_request_partial::<Pod>();
1693                self.pod_api
1694                    .patch_metadata(
1695                        &pod_name,
1696                        &PatchParams::apply(FIELD_MANAGER).force(),
1697                        &Patch::Apply(&metadata),
1698                    )
1699                    .await
1700                    .map(|_| ())
1701            };
1702
1703            match result {
1704                Ok(()) => (),
1705                // Pod was deleted concurrently.
1706                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1707                Err(e) => return Err(e),
1708            }
1709        }
1710
1711        Ok(())
1712    }
1713
1714    async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1715        let res = self
1716            .stateful_set_api
1717            .delete(name, &DeleteParams::default())
1718            .await;
1719        match res {
1720            Ok(_) => (),
1721            Err(K8sError::Api(e)) if e.code == 404 => (),
1722            Err(e) => return Err(e),
1723        }
1724
1725        let res = self
1726            .service_api
1727            .delete(name, &DeleteParams::default())
1728            .await;
1729        match res {
1730            Ok(_) => Ok(()),
1731            Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1732            Err(e) => Err(e),
1733        }
1734    }
1735
1736    async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1737        let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1738        let name_prefix = format!("{}{namespace}-", self.name_prefix);
1739        Ok(stateful_sets
1740            .into_iter()
1741            .filter_map(|ss| {
1742                ss.metadata
1743                    .name
1744                    .unwrap()
1745                    .strip_prefix(&name_prefix)
1746                    .map(Into::into)
1747            })
1748            .collect())
1749    }
1750}
1751
1752#[derive(Debug, Clone)]
1753struct KubernetesService {
1754    hosts: Vec<String>,
1755    ports: BTreeMap<String, u16>,
1756}
1757
1758impl Service for KubernetesService {
1759    fn addresses(&self, port: &str) -> Vec<String> {
1760        let port = self.ports[port];
1761        self.hosts
1762            .iter()
1763            .map(|host| format!("{host}:{port}"))
1764            .collect()
1765    }
1766}
1767
1768/// Returns the `minDomains` value for a `TopologySpreadConstraint`.
1769///
1770/// `minDomains` must be suppressed when spread is soft (Kubernetes rejects
1771/// `minDomains` with `ScheduleAnyway`) and when `availability_zones` is set
1772/// (node affinity already constrains eligible domains; if `minDomains` exceeds
1773/// the number of pinned zones the global minimum is treated as 0, causing all
1774/// but one replica to remain pending with `maxSkew=1`).
1775fn topology_spread_min_domains(
1776    soft: bool,
1777    az_pinned: bool,
1778    min_domains: Option<i32>,
1779) -> Option<i32> {
1780    if soft || az_pinned { None } else { min_domains }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785    use super::*;
1786
1787    #[mz_ore::test]
1788    fn topology_spread_min_domains_suppression() {
1789        // min_domains is kept when neither soft nor az-pinned
1790        assert_eq!(topology_spread_min_domains(false, false, Some(3)), Some(3));
1791        // min_domains is None when not set regardless of flags
1792        assert_eq!(topology_spread_min_domains(false, false, None), None);
1793        // suppressed when soft (Kubernetes rejects minDomains with ScheduleAnyway)
1794        assert_eq!(topology_spread_min_domains(true, false, Some(3)), None);
1795        // suppressed when availability_zones pins to specific AZs
1796        assert_eq!(topology_spread_min_domains(false, true, Some(3)), None);
1797        // suppressed when both
1798        assert_eq!(topology_spread_min_domains(true, true, Some(3)), None);
1799    }
1800
1801    #[mz_ore::test]
1802    fn k8s_quantity_base10_large() {
1803        let cases = &[
1804            ("42", 42),
1805            ("42k", 42000),
1806            ("42M", 42000000),
1807            ("42G", 42000000000),
1808            ("42T", 42000000000000),
1809            ("42P", 42000000000000000),
1810        ];
1811
1812        for (input, expected) in cases {
1813            let quantity = parse_k8s_quantity(input).unwrap();
1814            let number = quantity.try_to_integer(0, true).unwrap();
1815            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1816        }
1817    }
1818
1819    #[mz_ore::test]
1820    fn k8s_quantity_base10_small() {
1821        let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1822
1823        for (input, expected) in cases {
1824            let quantity = parse_k8s_quantity(input).unwrap();
1825            let number = quantity.try_to_integer(-9, true).unwrap();
1826            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1827        }
1828    }
1829
1830    #[mz_ore::test]
1831    fn k8s_quantity_base2() {
1832        let cases = &[
1833            ("42Ki", 42 << 10),
1834            ("42Mi", 42 << 20),
1835            ("42Gi", 42 << 30),
1836            ("42Ti", 42 << 40),
1837            ("42Pi", 42 << 50),
1838        ];
1839
1840        for (input, expected) in cases {
1841            let quantity = parse_k8s_quantity(input).unwrap();
1842            let number = quantity.try_to_integer(0, false).unwrap();
1843            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1844        }
1845    }
1846}