mz_orchestrator_kubernetes/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow, bail};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::DeepMerge;
24use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
25use k8s_openapi::api::core::v1::{
26    Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
27    NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
28    ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
29    PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
30    PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
31    SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
32    Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
33    WeightedPodAffinityTerm,
34};
35use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
36use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
37    LabelSelector, LabelSelectorRequirement, OwnerReference,
38};
39use kube::ResourceExt;
40use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
41use kube::client::Client;
42use kube::error::Error as K8sError;
43use kube::runtime::{WatchStreamExt, watcher};
44use maplit::btreemap;
45use mz_cloud_resources::AwsExternalIdPrefix;
46use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
47use mz_orchestrator::{
48    DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
49    OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
50    ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
51};
52use mz_ore::retry::Retry;
53use mz_ore::task::AbortOnDropHandle;
54use serde::Deserialize;
55use sha2::{Digest, Sha256};
56use tokio::sync::{mpsc, oneshot};
57use tracing::{error, info, warn};
58
59pub mod cloud_resource_controller;
60pub mod secrets;
61pub mod util;
62
63const FIELD_MANAGER: &str = "environmentd";
64const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
65
66const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
67
68/// Configures a [`KubernetesOrchestrator`].
69#[derive(Debug, Clone)]
70pub struct KubernetesOrchestratorConfig {
71    /// The name of a Kubernetes context to use, if the Kubernetes configuration
72    /// is loaded from the local kubeconfig.
73    pub context: String,
74    /// The name of a non-default Kubernetes scheduler to use, if any.
75    pub scheduler_name: Option<String>,
76    /// Annotations to install on every service created by the orchestrator.
77    pub service_annotations: BTreeMap<String, String>,
78    /// Labels to install on every service created by the orchestrator.
79    pub service_labels: BTreeMap<String, String>,
80    /// Node selector to install on every service created by the orchestrator.
81    pub service_node_selector: BTreeMap<String, String>,
82    /// Affinity to install on every service created by the orchestrator.
83    pub service_affinity: Option<String>,
84    /// Tolerations to install on every service created by the orchestrator.
85    pub service_tolerations: Option<String>,
86    /// The service account that each service should run as, if any.
87    pub service_account: Option<String>,
88    /// The image pull policy to set for services created by the orchestrator.
89    pub image_pull_policy: KubernetesImagePullPolicy,
90    /// An AWS external ID prefix to use when making AWS operations on behalf
91    /// of the environment.
92    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
93    /// Whether to use code coverage mode or not. Always false for production.
94    pub coverage: bool,
95    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
96    /// services that request disk.
97    ///
98    /// If unspecified, the orchestrator will refuse to create services that
99    /// request disk.
100    pub ephemeral_volume_storage_class: Option<String>,
101    /// The optional fs group for service's pods' `securityContext`.
102    pub service_fs_group: Option<i64>,
103    /// The prefix to prepend to all object names
104    pub name_prefix: Option<String>,
105    /// Whether we should attempt to collect metrics from kubernetes
106    pub collect_pod_metrics: bool,
107    /// Whether to annotate pods for prometheus service discovery.
108    pub enable_prometheus_scrape_annotations: bool,
109}
110
111impl KubernetesOrchestratorConfig {
112    pub fn name_prefix(&self) -> String {
113        self.name_prefix.clone().unwrap_or_default()
114    }
115}
116
117/// Specifies whether Kubernetes should pull Docker images when creating pods.
118#[derive(ValueEnum, Debug, Clone, Copy)]
119pub enum KubernetesImagePullPolicy {
120    /// Always pull the Docker image from the registry.
121    Always,
122    /// Pull the Docker image only if the image is not present.
123    IfNotPresent,
124    /// Never pull the Docker image.
125    Never,
126}
127
128impl fmt::Display for KubernetesImagePullPolicy {
129    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130        match self {
131            KubernetesImagePullPolicy::Always => f.write_str("Always"),
132            KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
133            KubernetesImagePullPolicy::Never => f.write_str("Never"),
134        }
135    }
136}
137
138impl KubernetesImagePullPolicy {
139    pub fn as_kebab_case_str(&self) -> &'static str {
140        match self {
141            Self::Always => "always",
142            Self::IfNotPresent => "if-not-present",
143            Self::Never => "never",
144        }
145    }
146}
147
148/// An orchestrator backed by Kubernetes.
149pub struct KubernetesOrchestrator {
150    client: Client,
151    kubernetes_namespace: String,
152    config: KubernetesOrchestratorConfig,
153    secret_api: Api<Secret>,
154    vpc_endpoint_api: Api<VpcEndpoint>,
155    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
156    resource_reader: Arc<KubernetesResourceReader>,
157}
158
159impl fmt::Debug for KubernetesOrchestrator {
160    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
161        f.debug_struct("KubernetesOrchestrator").finish()
162    }
163}
164
165impl KubernetesOrchestrator {
166    /// Creates a new Kubernetes orchestrator from the provided configuration.
167    pub async fn new(
168        config: KubernetesOrchestratorConfig,
169    ) -> Result<KubernetesOrchestrator, anyhow::Error> {
170        let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
171        let resource_reader =
172            Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
173        Ok(KubernetesOrchestrator {
174            client: client.clone(),
175            kubernetes_namespace,
176            config,
177            secret_api: Api::default_namespaced(client.clone()),
178            vpc_endpoint_api: Api::default_namespaced(client),
179            namespaces: Mutex::new(BTreeMap::new()),
180            resource_reader,
181        })
182    }
183}
184
185impl Orchestrator for KubernetesOrchestrator {
186    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
187        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
188        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
189            let (command_tx, command_rx) = mpsc::unbounded_channel();
190            let worker = OrchestratorWorker {
191                metrics_api: Api::default_namespaced(self.client.clone()),
192                service_api: Api::default_namespaced(self.client.clone()),
193                stateful_set_api: Api::default_namespaced(self.client.clone()),
194                pod_api: Api::default_namespaced(self.client.clone()),
195                owner_references: vec![],
196                command_rx,
197                name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
198                collect_pod_metrics: self.config.collect_pod_metrics,
199            }
200            .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
201
202            Arc::new(NamespacedKubernetesOrchestrator {
203                pod_api: Api::default_namespaced(self.client.clone()),
204                kubernetes_namespace: self.kubernetes_namespace.clone(),
205                namespace: namespace.into(),
206                config: self.config.clone(),
207                // TODO(guswynn): make this configurable.
208                scheduling_config: Default::default(),
209                service_infos: std::sync::Mutex::new(BTreeMap::new()),
210                command_tx,
211                _worker: worker,
212            })
213        }))
214    }
215}
216
217#[derive(Clone, Copy)]
218struct ServiceInfo {
219    scale: u16,
220}
221
222struct NamespacedKubernetesOrchestrator {
223    pod_api: Api<Pod>,
224    kubernetes_namespace: String,
225    namespace: String,
226    config: KubernetesOrchestratorConfig,
227    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
228    service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
229    command_tx: mpsc::UnboundedSender<WorkerCommand>,
230    _worker: AbortOnDropHandle<()>,
231}
232
233impl fmt::Debug for NamespacedKubernetesOrchestrator {
234    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235        f.debug_struct("NamespacedKubernetesOrchestrator")
236            .field("kubernetes_namespace", &self.kubernetes_namespace)
237            .field("namespace", &self.namespace)
238            .field("config", &self.config)
239            .finish()
240    }
241}
242
243/// Commands sent from a [`NamespacedKubernetesOrchestrator`] to its
244/// [`OrchestratorWorker`].
245///
246/// Commands for which the caller expects a result include a `result_tx` on which the
247/// [`OrchestratorWorker`] will deliver the result.
248enum WorkerCommand {
249    EnsureService {
250        desc: ServiceDescription,
251    },
252    DropService {
253        name: String,
254    },
255    ListServices {
256        namespace: String,
257        result_tx: oneshot::Sender<Vec<String>>,
258    },
259    FetchServiceMetrics {
260        name: String,
261        info: ServiceInfo,
262        result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
263    },
264}
265
266/// A description of a service to be created by an [`OrchestratorWorker`].
267#[derive(Debug, Clone)]
268struct ServiceDescription {
269    name: String,
270    scale: u16,
271    service: K8sService,
272    stateful_set: StatefulSet,
273    pod_template_hash: String,
274}
275
276/// A task executing blocking work for a [`NamespacedKubernetesOrchestrator`] in the background.
277///
278/// This type exists to enable making [`NamespacedKubernetesOrchestrator::ensure_service`] and
279/// [`NamespacedKubernetesOrchestrator::drop_service`] non-blocking, allowing invocation of these
280/// methods in latency-sensitive contexts.
281///
282/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
283/// orchestrator calls that query service state (such as `list_services`). These need to be
284/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
285/// ensure that a `list_services` result contains exactly those services that were previously
286/// created with `ensure_service` and not yet dropped with `drop_service`.
287struct OrchestratorWorker {
288    metrics_api: Api<PodMetrics>,
289    service_api: Api<K8sService>,
290    stateful_set_api: Api<StatefulSet>,
291    pod_api: Api<Pod>,
292    owner_references: Vec<OwnerReference>,
293    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
294    name_prefix: String,
295    collect_pod_metrics: bool,
296}
297
298#[derive(Deserialize, Clone, Debug)]
299pub struct PodMetricsContainer {
300    pub name: String,
301    pub usage: PodMetricsContainerUsage,
302}
303
304#[derive(Deserialize, Clone, Debug)]
305pub struct PodMetricsContainerUsage {
306    pub cpu: Quantity,
307    pub memory: Quantity,
308}
309
310#[derive(Deserialize, Clone, Debug)]
311pub struct PodMetrics {
312    pub metadata: ObjectMeta,
313    pub timestamp: String,
314    pub window: String,
315    pub containers: Vec<PodMetricsContainer>,
316}
317
318impl k8s_openapi::Resource for PodMetrics {
319    const GROUP: &'static str = "metrics.k8s.io";
320    const KIND: &'static str = "PodMetrics";
321    const VERSION: &'static str = "v1beta1";
322    const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
323    const URL_PATH_SEGMENT: &'static str = "pods";
324
325    type Scope = k8s_openapi::NamespaceResourceScope;
326}
327
328impl k8s_openapi::Metadata for PodMetrics {
329    type Ty = ObjectMeta;
330
331    fn metadata(&self) -> &Self::Ty {
332        &self.metadata
333    }
334
335    fn metadata_mut(&mut self) -> &mut Self::Ty {
336        &mut self.metadata
337    }
338}
339
340// Note that these types are very weird. We are `get`-ing a
341// `List` object, and lying about it having an `ObjectMeta`
342// (it deserializes as empty, but we don't need it). The custom
343// metrics API is designed this way, which is very non-standard.
344// A discussion in the `kube` channel in the `tokio` discord
345// confirmed that this layout + using `get_subresource` is the
346// best way to handle this.
347
348#[derive(Deserialize, Clone, Debug)]
349pub struct MetricIdentifier {
350    #[serde(rename = "metricName")]
351    pub name: String,
352    // We skip `selector` for now, as we don't use it
353}
354
355#[derive(Deserialize, Clone, Debug)]
356pub struct MetricValue {
357    #[serde(rename = "describedObject")]
358    pub described_object: ObjectReference,
359    #[serde(flatten)]
360    pub metric_identifier: MetricIdentifier,
361    pub timestamp: String,
362    pub value: Quantity,
363    // We skip `windowSeconds`, as we don't need it
364}
365
366impl NamespacedKubernetesOrchestrator {
367    fn service_name(&self, id: &str) -> String {
368        format!(
369            "{}{}-{id}",
370            self.config.name_prefix.as_deref().unwrap_or(""),
371            self.namespace
372        )
373    }
374
375    /// Return a `watcher::Config` instance that limits results to the namespace
376    /// assigned to this orchestrator.
377    fn watch_pod_params(&self) -> watcher::Config {
378        let ns_selector = format!(
379            "environmentd.materialize.cloud/namespace={}",
380            self.namespace
381        );
382        // This watcher timeout must be shorter than the client read timeout.
383        watcher::Config::default().timeout(59).labels(&ns_selector)
384    }
385
386    /// Convert a higher-level label key to the actual one we
387    /// will give to Kubernetes
388    fn make_label_key(&self, key: &str) -> String {
389        format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
390    }
391
392    fn label_selector_to_k8s(
393        &self,
394        MzLabelSelector { label_name, logic }: MzLabelSelector,
395    ) -> Result<LabelSelectorRequirement, anyhow::Error> {
396        let (operator, values) = match logic {
397            LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
398            LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
399            LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
400            LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
401            LabelSelectionLogic::InSet { values } => {
402                if values.is_empty() {
403                    Err(anyhow!(
404                        "Invalid selector logic for {label_name}: empty `in` set"
405                    ))
406                } else {
407                    Ok(("In", values))
408                }
409            }
410            LabelSelectionLogic::NotInSet { values } => {
411                if values.is_empty() {
412                    Err(anyhow!(
413                        "Invalid selector logic for {label_name}: empty `notin` set"
414                    ))
415                } else {
416                    Ok(("NotIn", values))
417                }
418            }
419        }?;
420        let lsr = LabelSelectorRequirement {
421            key: self.make_label_key(&label_name),
422            operator: operator.to_string(),
423            values: Some(values),
424        };
425        Ok(lsr)
426    }
427
428    fn send_command(&self, cmd: WorkerCommand) {
429        self.command_tx.send(cmd).expect("worker task not dropped");
430    }
431}
432
433#[derive(Debug)]
434struct ScaledQuantity {
435    integral_part: u64,
436    exponent: i8,
437    base10: bool,
438}
439
440impl ScaledQuantity {
441    pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
442        if base10 != self.base10 {
443            return None;
444        }
445        let exponent = self.exponent - scale;
446        let mut result = self.integral_part;
447        let base = if self.base10 { 10 } else { 2 };
448        if exponent < 0 {
449            for _ in exponent..0 {
450                result /= base;
451            }
452        } else {
453            for _ in 0..exponent {
454                result = result.checked_mul(base)?;
455            }
456        }
457        Some(result)
458    }
459}
460
461// Parse a k8s `Quantity` object
462// into a numeric value.
463//
464// This is intended to support collecting CPU and Memory data.
465// Thus, there are a few that things Kubernetes attempts to do, that we don't,
466// because I've never observed metrics-server specifically sending them:
467// (1) Handle negative numbers (because it's not useful for that use-case)
468// (2) Handle non-integers (because I have never observed them being actually sent)
469// (3) Handle scientific notation (e.g. 1.23e2)
470fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
471    const DEC_SUFFIXES: &[(&str, i8)] = &[
472        ("n", -9),
473        ("u", -6),
474        ("m", -3),
475        ("", 0),
476        ("k", 3), // yep, intentionally lowercase.
477        ("M", 6),
478        ("G", 9),
479        ("T", 12),
480        ("P", 15),
481        ("E", 18),
482    ];
483    const BIN_SUFFIXES: &[(&str, i8)] = &[
484        ("", 0),
485        ("Ki", 10),
486        ("Mi", 20),
487        ("Gi", 30),
488        ("Ti", 40),
489        ("Pi", 50),
490        ("Ei", 60),
491    ];
492
493    let (positive, s) = match s.chars().next() {
494        Some('+') => (true, &s[1..]),
495        Some('-') => (false, &s[1..]),
496        _ => (true, s),
497    };
498
499    if !positive {
500        anyhow::bail!("Negative numbers not supported")
501    }
502
503    fn is_suffix_char(ch: char) -> bool {
504        "numkMGTPEKi".contains(ch)
505    }
506    let (num, suffix) = match s.find(is_suffix_char) {
507        None => (s, ""),
508        Some(idx) => s.split_at(idx),
509    };
510    let num: u64 = num.parse()?;
511    let (exponent, base10) = if let Some((_, exponent)) =
512        DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
513    {
514        (exponent, true)
515    } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
516        (exponent, false)
517    } else {
518        anyhow::bail!("Unrecognized suffix: {suffix}");
519    };
520    Ok(ScaledQuantity {
521        integral_part: num,
522        exponent: *exponent,
523        base10,
524    })
525}
526
527#[async_trait]
528impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
529    async fn fetch_service_metrics(
530        &self,
531        id: &str,
532    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
533        let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
534            *info
535        } else {
536            // This should have been set in `ensure_service`.
537            tracing::error!("Failed to get info for {id}");
538            anyhow::bail!("Failed to get info for {id}");
539        };
540
541        let (result_tx, result_rx) = oneshot::channel();
542        self.send_command(WorkerCommand::FetchServiceMetrics {
543            name: self.service_name(id),
544            info,
545            result_tx,
546        });
547
548        let metrics = result_rx.await.expect("worker task not dropped");
549        Ok(metrics)
550    }
551
552    fn ensure_service(
553        &self,
554        id: &str,
555        ServiceConfig {
556            image,
557            init_container_image,
558            args,
559            ports: ports_in,
560            memory_limit,
561            memory_request,
562            cpu_limit,
563            scale,
564            labels: labels_in,
565            annotations: annotations_in,
566            availability_zones,
567            other_replicas_selector,
568            replicas_selector,
569            disk_limit,
570            node_selector,
571        }: ServiceConfig,
572    ) -> Result<Box<dyn Service>, anyhow::Error> {
573        // This is extremely cheap to clone, so just look into the lock once.
574        let scheduling_config: ServiceSchedulingConfig =
575            self.scheduling_config.read().expect("poisoned").clone();
576
577        // Enable disk if the size does not disable it.
578        let disk = disk_limit != Some(DiskLimit::ZERO);
579
580        let name = self.service_name(id);
581        // The match labels should be the minimal set of labels that uniquely
582        // identify the pods in the stateful set. Changing these after the
583        // `StatefulSet` is created is not permitted by Kubernetes, and we're
584        // not yet smart enough to handle deleting and recreating the
585        // `StatefulSet`.
586        let mut match_labels = btreemap! {
587            "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
588            "environmentd.materialize.cloud/service-id".into() => id.into(),
589        };
590        for (key, value) in &self.config.service_labels {
591            match_labels.insert(key.clone(), value.clone());
592        }
593
594        let mut labels = match_labels.clone();
595        for (key, value) in labels_in {
596            labels.insert(self.make_label_key(&key), value);
597        }
598
599        labels.insert(self.make_label_key("scale"), scale.to_string());
600
601        for port in &ports_in {
602            labels.insert(
603                format!("environmentd.materialize.cloud/port-{}", port.name),
604                "true".into(),
605            );
606        }
607        let mut limits = BTreeMap::new();
608        let mut requests = BTreeMap::new();
609        if let Some(memory_limit) = memory_limit {
610            limits.insert(
611                "memory".into(),
612                Quantity(memory_limit.0.as_u64().to_string()),
613            );
614            requests.insert(
615                "memory".into(),
616                Quantity(memory_limit.0.as_u64().to_string()),
617            );
618        }
619        if let Some(memory_request) = memory_request {
620            requests.insert(
621                "memory".into(),
622                Quantity(memory_request.0.as_u64().to_string()),
623            );
624        }
625        if let Some(cpu_limit) = cpu_limit {
626            limits.insert(
627                "cpu".into(),
628                Quantity(format!("{}m", cpu_limit.as_millicpus())),
629            );
630            requests.insert(
631                "cpu".into(),
632                Quantity(format!("{}m", cpu_limit.as_millicpus())),
633            );
634        }
635        let service = K8sService {
636            metadata: ObjectMeta {
637                name: Some(name.clone()),
638                ..Default::default()
639            },
640            spec: Some(ServiceSpec {
641                ports: Some(
642                    ports_in
643                        .iter()
644                        .map(|port| ServicePort {
645                            port: port.port_hint.into(),
646                            name: Some(port.name.clone()),
647                            ..Default::default()
648                        })
649                        .collect(),
650                ),
651                cluster_ip: Some("None".to_string()),
652                selector: Some(match_labels.clone()),
653                ..Default::default()
654            }),
655            status: None,
656        };
657
658        let hosts = (0..scale)
659            .map(|i| {
660                format!(
661                    "{name}-{i}.{name}.{}.svc.cluster.local",
662                    self.kubernetes_namespace
663                )
664            })
665            .collect::<Vec<_>>();
666        let ports = ports_in
667            .iter()
668            .map(|p| (p.name.clone(), p.port_hint))
669            .collect::<BTreeMap<_, _>>();
670
671        let mut listen_addrs = BTreeMap::new();
672        let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
673        for (name, port) in &ports {
674            listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
675            for (i, host) in hosts.iter().enumerate() {
676                peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
677            }
678        }
679        let mut args = args(ServiceAssignments {
680            listen_addrs: &listen_addrs,
681            peer_addrs: &peer_addrs,
682        });
683
684        // This constrains the orchestrator (for those orchestrators that support
685        // anti-affinity, today just k8s) to never schedule pods for different replicas
686        // of the same cluster on the same node. Pods from the _same_ replica are fine;
687        // pods from different clusters are also fine.
688        //
689        // The point is that if pods of two replicas are on the same node, that node
690        // going down would kill both replicas, and so the replication factor of the
691        // cluster in question is illusory.
692        let anti_affinity = Some({
693            let label_selector_requirements = other_replicas_selector
694                .clone()
695                .into_iter()
696                .map(|ls| self.label_selector_to_k8s(ls))
697                .collect::<Result<Vec<_>, _>>()?;
698            let ls = LabelSelector {
699                match_expressions: Some(label_selector_requirements),
700                ..Default::default()
701            };
702            let pat = PodAffinityTerm {
703                label_selector: Some(ls),
704                topology_key: "kubernetes.io/hostname".to_string(),
705                ..Default::default()
706            };
707
708            if !scheduling_config.soften_replication_anti_affinity {
709                PodAntiAffinity {
710                    required_during_scheduling_ignored_during_execution: Some(vec![pat]),
711                    ..Default::default()
712                }
713            } else {
714                PodAntiAffinity {
715                    preferred_during_scheduling_ignored_during_execution: Some(vec![
716                        WeightedPodAffinityTerm {
717                            weight: scheduling_config.soften_replication_anti_affinity_weight,
718                            pod_affinity_term: pat,
719                        },
720                    ]),
721                    ..Default::default()
722                }
723            }
724        });
725
726        let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
727            // `match_labels` sufficiently selects pods in the same replica.
728            let ls = LabelSelector {
729                match_labels: Some(match_labels.clone()),
730                ..Default::default()
731            };
732            let pat = PodAffinityTerm {
733                label_selector: Some(ls),
734                topology_key: "topology.kubernetes.io/zone".to_string(),
735                ..Default::default()
736            };
737
738            Some(PodAffinity {
739                preferred_during_scheduling_ignored_during_execution: Some(vec![
740                    WeightedPodAffinityTerm {
741                        weight,
742                        pod_affinity_term: pat,
743                    },
744                ]),
745                ..Default::default()
746            })
747        } else {
748            None
749        };
750
751        let topology_spread = if scheduling_config.topology_spread.enabled {
752            let config = &scheduling_config.topology_spread;
753
754            if !config.ignore_non_singular_scale || scale <= 1 {
755                let label_selector_requirements = (if config.ignore_non_singular_scale {
756                    let mut replicas_selector_ignoring_scale = replicas_selector.clone();
757
758                    replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
759                        label_name: "scale".into(),
760                        logic: mz_orchestrator::LabelSelectionLogic::Eq {
761                            value: "1".to_string(),
762                        },
763                    });
764
765                    replicas_selector_ignoring_scale
766                } else {
767                    replicas_selector
768                })
769                .into_iter()
770                .map(|ls| self.label_selector_to_k8s(ls))
771                .collect::<Result<Vec<_>, _>>()?;
772                let ls = LabelSelector {
773                    match_expressions: Some(label_selector_requirements),
774                    ..Default::default()
775                };
776
777                let constraint = TopologySpreadConstraint {
778                    label_selector: Some(ls),
779                    min_domains: config.min_domains,
780                    max_skew: config.max_skew,
781                    topology_key: "topology.kubernetes.io/zone".to_string(),
782                    when_unsatisfiable: if config.soft {
783                        "ScheduleAnyway".to_string()
784                    } else {
785                        "DoNotSchedule".to_string()
786                    },
787                    // TODO(guswynn): restore these once they are supported.
788                    // Consider node affinities when calculating topology spread. This is the
789                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_affinity_policy>,
790                    // made explicit.
791                    // node_affinity_policy: Some("Honor".to_string()),
792                    // Do not consider node taints when calculating topology spread. This is the
793                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_taints_policy>,
794                    // made explicit.
795                    // node_taints_policy: Some("Ignore".to_string()),
796                    match_label_keys: None,
797                    // Once the above are restorted, we should't have `..Default::default()` here because the specifics of these fields are
798                    // subtle enough where we want compilation failures when we upgrade
799                    ..Default::default()
800                };
801                Some(vec![constraint])
802            } else {
803                None
804            }
805        } else {
806            None
807        };
808
809        let mut pod_annotations = btreemap! {
810            // Prevent the cluster-autoscaler (or karpenter) from evicting these pods in attempts to scale down
811            // and terminate nodes.
812            // This will cost us more money, but should give us better uptime.
813            // This does not prevent all evictions by Kubernetes, only the ones initiated by the
814            // cluster-autoscaler (or karpenter). Notably, eviction of pods for resource overuse is still enabled.
815            "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
816            "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
817
818            // It's called do-not-disrupt in newer versions of karpenter, so adding for forward/backward compatibility
819            "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
820        };
821        for (key, value) in annotations_in {
822            // We want to use the same prefix as our labels keys
823            pod_annotations.insert(self.make_label_key(&key), value);
824        }
825        if self.config.enable_prometheus_scrape_annotations {
826            if let Some(internal_http_port) = ports_in
827                .iter()
828                .find(|port| port.name == "internal-http")
829                .map(|port| port.port_hint.to_string())
830            {
831                // Enable prometheus scrape discovery
832                pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
833                pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
834                pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
835                pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
836            }
837        }
838        for (key, value) in &self.config.service_annotations {
839            pod_annotations.insert(key.clone(), value.clone());
840        }
841
842        let default_node_selector = if disk {
843            vec![("materialize.cloud/disk".to_string(), disk.to_string())]
844        } else {
845            // if the cluster doesn't require disk, we can omit the selector
846            // allowing it to be scheduled onto nodes with and without the
847            // selector
848            vec![]
849        };
850
851        let node_selector: BTreeMap<String, String> = default_node_selector
852            .into_iter()
853            .chain(self.config.service_node_selector.clone())
854            .chain(node_selector)
855            .collect();
856
857        let node_affinity = if let Some(availability_zones) = availability_zones {
858            let selector = NodeSelectorTerm {
859                match_expressions: Some(vec![NodeSelectorRequirement {
860                    key: "materialize.cloud/availability-zone".to_string(),
861                    operator: "In".to_string(),
862                    values: Some(availability_zones),
863                }]),
864                match_fields: None,
865            };
866
867            if scheduling_config.soften_az_affinity {
868                Some(NodeAffinity {
869                    preferred_during_scheduling_ignored_during_execution: Some(vec![
870                        PreferredSchedulingTerm {
871                            preference: selector,
872                            weight: scheduling_config.soften_az_affinity_weight,
873                        },
874                    ]),
875                    required_during_scheduling_ignored_during_execution: None,
876                })
877            } else {
878                Some(NodeAffinity {
879                    preferred_during_scheduling_ignored_during_execution: None,
880                    required_during_scheduling_ignored_during_execution: Some(NodeSelector {
881                        node_selector_terms: vec![selector],
882                    }),
883                })
884            }
885        } else {
886            None
887        };
888
889        let mut affinity = Affinity {
890            pod_anti_affinity: anti_affinity,
891            pod_affinity,
892            node_affinity,
893            ..Default::default()
894        };
895        if let Some(service_affinity) = &self.config.service_affinity {
896            affinity.merge_from(serde_json::from_str(service_affinity)?);
897        }
898
899        let container_name = image
900            .rsplit_once('/')
901            .and_then(|(_, name_version)| name_version.rsplit_once(':'))
902            .context("`image` is not ORG/NAME:VERSION")?
903            .0
904            .to_string();
905
906        let container_security_context = if scheduling_config.security_context_enabled {
907            Some(SecurityContext {
908                privileged: Some(false),
909                run_as_non_root: Some(true),
910                allow_privilege_escalation: Some(false),
911                seccomp_profile: Some(SeccompProfile {
912                    type_: "RuntimeDefault".to_string(),
913                    ..Default::default()
914                }),
915                capabilities: Some(Capabilities {
916                    drop: Some(vec!["ALL".to_string()]),
917                    ..Default::default()
918                }),
919                ..Default::default()
920            })
921        } else {
922            None
923        };
924
925        let init_containers = init_container_image.map(|image| {
926            vec![Container {
927                name: "init".to_string(),
928                image: Some(image),
929                image_pull_policy: Some(self.config.image_pull_policy.to_string()),
930                resources: Some(ResourceRequirements {
931                    claims: None,
932                    limits: Some(limits.clone()),
933                    requests: Some(requests.clone()),
934                }),
935                security_context: container_security_context.clone(),
936                env: Some(vec![
937                    EnvVar {
938                        name: "MZ_NAMESPACE".to_string(),
939                        value_from: Some(EnvVarSource {
940                            field_ref: Some(ObjectFieldSelector {
941                                field_path: "metadata.namespace".to_string(),
942                                ..Default::default()
943                            }),
944                            ..Default::default()
945                        }),
946                        ..Default::default()
947                    },
948                    EnvVar {
949                        name: "MZ_POD_NAME".to_string(),
950                        value_from: Some(EnvVarSource {
951                            field_ref: Some(ObjectFieldSelector {
952                                field_path: "metadata.name".to_string(),
953                                ..Default::default()
954                            }),
955                            ..Default::default()
956                        }),
957                        ..Default::default()
958                    },
959                    EnvVar {
960                        name: "MZ_NODE_NAME".to_string(),
961                        value_from: Some(EnvVarSource {
962                            field_ref: Some(ObjectFieldSelector {
963                                field_path: "spec.nodeName".to_string(),
964                                ..Default::default()
965                            }),
966                            ..Default::default()
967                        }),
968                        ..Default::default()
969                    },
970                ]),
971                ..Default::default()
972            }]
973        });
974
975        let env = if self.config.coverage {
976            Some(vec![EnvVar {
977                name: "LLVM_PROFILE_FILE".to_string(),
978                value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
979                ..Default::default()
980            }])
981        } else {
982            None
983        };
984
985        let mut volume_mounts = vec![];
986
987        if self.config.coverage {
988            volume_mounts.push(VolumeMount {
989                name: "coverage".to_string(),
990                mount_path: "/coverage".to_string(),
991                ..Default::default()
992            })
993        }
994
995        let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
996            (true, Some(ephemeral_volume_storage_class)) => {
997                volume_mounts.push(VolumeMount {
998                    name: "scratch".to_string(),
999                    mount_path: "/scratch".to_string(),
1000                    ..Default::default()
1001                });
1002                args.push("--scratch-directory=/scratch".into());
1003
1004                Some(vec![Volume {
1005                    name: "scratch".to_string(),
1006                    ephemeral: Some(EphemeralVolumeSource {
1007                        volume_claim_template: Some(PersistentVolumeClaimTemplate {
1008                            spec: PersistentVolumeClaimSpec {
1009                                access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1010                                storage_class_name: Some(
1011                                    ephemeral_volume_storage_class.to_string(),
1012                                ),
1013                                resources: Some(VolumeResourceRequirements {
1014                                    requests: Some(BTreeMap::from([(
1015                                        "storage".to_string(),
1016                                        Quantity(
1017                                            disk_limit
1018                                                .unwrap_or(DiskLimit::ARBITRARY)
1019                                                .0
1020                                                .as_u64()
1021                                                .to_string(),
1022                                        ),
1023                                    )])),
1024                                    ..Default::default()
1025                                }),
1026                                ..Default::default()
1027                            },
1028                            ..Default::default()
1029                        }),
1030                        ..Default::default()
1031                    }),
1032                    ..Default::default()
1033                }])
1034            }
1035            (true, None) => {
1036                return Err(anyhow!(
1037                    "service requested disk but no ephemeral volume storage class was configured"
1038                ));
1039            }
1040            (false, _) => None,
1041        };
1042
1043        if let Some(name_prefix) = &self.config.name_prefix {
1044            args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1045        }
1046
1047        let volume_claim_templates = if self.config.coverage {
1048            Some(vec![PersistentVolumeClaim {
1049                metadata: ObjectMeta {
1050                    name: Some("coverage".to_string()),
1051                    ..Default::default()
1052                },
1053                spec: Some(PersistentVolumeClaimSpec {
1054                    access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1055                    resources: Some(VolumeResourceRequirements {
1056                        requests: Some(BTreeMap::from([(
1057                            "storage".to_string(),
1058                            Quantity("10Gi".to_string()),
1059                        )])),
1060                        ..Default::default()
1061                    }),
1062                    ..Default::default()
1063                }),
1064                ..Default::default()
1065            }])
1066        } else {
1067            None
1068        };
1069
1070        let security_context = if let Some(fs_group) = self.config.service_fs_group {
1071            Some(PodSecurityContext {
1072                fs_group: Some(fs_group),
1073                run_as_user: Some(fs_group),
1074                run_as_group: Some(fs_group),
1075                ..Default::default()
1076            })
1077        } else {
1078            None
1079        };
1080
1081        let mut tolerations = vec![
1082            // When the node becomes `NotReady` it indicates there is a problem
1083            // with the node. By default Kubernetes waits 300s (5 minutes)
1084            // before descheduling the pod, but we tune this to 30s for faster
1085            // recovery in the case of node failure.
1086            Toleration {
1087                effect: Some("NoExecute".into()),
1088                key: Some("node.kubernetes.io/not-ready".into()),
1089                operator: Some("Exists".into()),
1090                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1091                value: None,
1092            },
1093            Toleration {
1094                effect: Some("NoExecute".into()),
1095                key: Some("node.kubernetes.io/unreachable".into()),
1096                operator: Some("Exists".into()),
1097                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1098                value: None,
1099            },
1100        ];
1101        if let Some(service_tolerations) = &self.config.service_tolerations {
1102            tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1103        }
1104        let tolerations = Some(tolerations);
1105
1106        let mut pod_template_spec = PodTemplateSpec {
1107            metadata: Some(ObjectMeta {
1108                labels: Some(labels.clone()),
1109                // Only set `annotations` _after_ we have computed the pod template hash, to
1110                // avoid that annotation changes cause pod replacements.
1111                ..Default::default()
1112            }),
1113            spec: Some(PodSpec {
1114                init_containers,
1115                containers: vec![Container {
1116                    name: container_name,
1117                    image: Some(image),
1118                    args: Some(args),
1119                    image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1120                    ports: Some(
1121                        ports_in
1122                            .iter()
1123                            .map(|port| ContainerPort {
1124                                container_port: port.port_hint.into(),
1125                                name: Some(port.name.clone()),
1126                                ..Default::default()
1127                            })
1128                            .collect(),
1129                    ),
1130                    security_context: container_security_context.clone(),
1131                    resources: Some(ResourceRequirements {
1132                        claims: None,
1133                        limits: Some(limits),
1134                        requests: Some(requests),
1135                    }),
1136                    volume_mounts: if !volume_mounts.is_empty() {
1137                        Some(volume_mounts)
1138                    } else {
1139                        None
1140                    },
1141                    env,
1142                    ..Default::default()
1143                }],
1144                volumes,
1145                security_context,
1146                node_selector: Some(node_selector),
1147                scheduler_name: self.config.scheduler_name.clone(),
1148                service_account: self.config.service_account.clone(),
1149                affinity: Some(affinity),
1150                topology_spread_constraints: topology_spread,
1151                tolerations,
1152                // Setting a 0s termination grace period has the side effect of
1153                // automatically starting a new pod when the previous pod is
1154                // currently terminating. This enables recovery from a node
1155                // failure with no manual intervention. Without this setting,
1156                // the StatefulSet controller will refuse to start a new pod
1157                // until the failed node is manually removed from the Kubernetes
1158                // cluster.
1159                //
1160                // The Kubernetes documentation strongly advises against this
1161                // setting, as StatefulSets attempt to provide "at most once"
1162                // semantics [0]--that is, the guarantee that for a given pod in
1163                // a StatefulSet there is *at most* one pod with that identity
1164                // running in the cluster.
1165                //
1166                // Materialize services, however, are carefully designed to
1167                // *not* rely on this guarantee. In fact, we do not believe that
1168                // correct distributed systems can meaningfully rely on
1169                // Kubernetes's guarantee--network packets from a pod can be
1170                // arbitrarily delayed, long past that pod's termination.
1171                //
1172                // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1173                termination_grace_period_seconds: Some(0),
1174                ..Default::default()
1175            }),
1176        };
1177        let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1178        let mut hasher = Sha256::new();
1179        hasher.update(pod_template_json);
1180        let pod_template_hash = format!("{:x}", hasher.finalize());
1181        pod_annotations.insert(
1182            POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1183            pod_template_hash.clone(),
1184        );
1185
1186        pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1187
1188        let stateful_set = StatefulSet {
1189            metadata: ObjectMeta {
1190                name: Some(name.clone()),
1191                ..Default::default()
1192            },
1193            spec: Some(StatefulSetSpec {
1194                selector: LabelSelector {
1195                    match_labels: Some(match_labels),
1196                    ..Default::default()
1197                },
1198                service_name: Some(name.clone()),
1199                replicas: Some(scale.into()),
1200                template: pod_template_spec,
1201                update_strategy: Some(StatefulSetUpdateStrategy {
1202                    type_: Some("OnDelete".to_owned()),
1203                    ..Default::default()
1204                }),
1205                pod_management_policy: Some("Parallel".to_string()),
1206                volume_claim_templates,
1207                ..Default::default()
1208            }),
1209            status: None,
1210        };
1211
1212        self.send_command(WorkerCommand::EnsureService {
1213            desc: ServiceDescription {
1214                name,
1215                scale,
1216                service,
1217                stateful_set,
1218                pod_template_hash,
1219            },
1220        });
1221
1222        self.service_infos
1223            .lock()
1224            .expect("poisoned lock")
1225            .insert(id.to_string(), ServiceInfo { scale });
1226
1227        Ok(Box::new(KubernetesService { hosts, ports }))
1228    }
1229
1230    /// Drops the identified service, if it exists.
1231    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1232        fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1233        self.service_infos.lock().expect("poisoned lock").remove(id);
1234
1235        self.send_command(WorkerCommand::DropService {
1236            name: self.service_name(id),
1237        });
1238
1239        Ok(())
1240    }
1241
1242    /// Lists the identifiers of all known services.
1243    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1244        let (result_tx, result_rx) = oneshot::channel();
1245        self.send_command(WorkerCommand::ListServices {
1246            namespace: self.namespace.clone(),
1247            result_tx,
1248        });
1249
1250        let list = result_rx.await.expect("worker task not dropped");
1251        Ok(list)
1252    }
1253
1254    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1255        fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1256            let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1257            let service_id_label = "environmentd.materialize.cloud/service-id";
1258            let service_id = pod
1259                .labels()
1260                .get(service_id_label)
1261                .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1262                .clone();
1263
1264            let oomed = pod
1265                .status
1266                .as_ref()
1267                .and_then(|status| status.container_statuses.as_ref())
1268                .map(|container_statuses| {
1269                    container_statuses.iter().any(|cs| {
1270                        // The container might have already transitioned from "terminated" to
1271                        // "waiting"/"running" state, in which case we need to check its previous
1272                        // state to find out why it terminated.
1273                        let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1274                        let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1275                        let termination_state = current_state.or(last_state);
1276
1277                        // The interesting exit codes are:
1278                        //  * 135 (SIGBUS): occurs when lgalloc runs out of disk
1279                        //  * 137 (SIGKILL): occurs when the OOM killer terminates the container
1280                        //  * 167: occurs when the lgalloc or memory limiter terminates the process
1281                        // We treat the all of these as OOM conditions since swap and lgalloc use
1282                        // disk only for spilling memory.
1283                        let exit_code = termination_state.map(|s| s.exit_code);
1284                        exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1285                    })
1286                })
1287                .unwrap_or(false);
1288
1289            let (pod_ready, last_probe_time) = pod
1290                .status
1291                .and_then(|status| status.conditions)
1292                .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1293                .map(|c| (c.status == "True", c.last_probe_time))
1294                .unwrap_or((false, None));
1295
1296            let status = if pod_ready {
1297                ServiceStatus::Online
1298            } else {
1299                ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1300            };
1301            let time = if let Some(time) = last_probe_time {
1302                time.0
1303            } else {
1304                Utc::now()
1305            };
1306
1307            Ok(ServiceEvent {
1308                service_id,
1309                process_id,
1310                status,
1311                time,
1312            })
1313        }
1314
1315        let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1316            .touched_objects()
1317            .filter_map(|object| async move {
1318                match object {
1319                    Ok(pod) => Some(into_service_event(pod)),
1320                    Err(error) => {
1321                        // We assume that errors returned by Kubernetes are usually transient, so we
1322                        // just log a warning and ignore them otherwise.
1323                        tracing::warn!("service watch error: {error}");
1324                        None
1325                    }
1326                }
1327            });
1328        Box::pin(stream)
1329    }
1330
1331    fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1332        *self.scheduling_config.write().expect("poisoned") = config;
1333    }
1334}
1335
1336impl OrchestratorWorker {
1337    fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1338        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1339    }
1340
1341    async fn run(mut self) {
1342        {
1343            info!("initializing Kubernetes orchestrator worker");
1344            let start = Instant::now();
1345
1346            // Fetch the owner reference for our own pod (usually a
1347            // StatefulSet), so that we can propagate it to the services we
1348            // create.
1349            let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1350            let orchestrator_pod = Retry::default()
1351                .clamp_backoff(Duration::from_secs(10))
1352                .retry_async(|_| self.pod_api.get(&hostname))
1353                .await
1354                .expect("always retries on error");
1355            self.owner_references
1356                .extend(orchestrator_pod.owner_references().into_iter().cloned());
1357
1358            info!(
1359                "Kubernetes orchestrator worker initialized in {:?}",
1360                start.elapsed()
1361            );
1362        }
1363
1364        while let Some(cmd) = self.command_rx.recv().await {
1365            self.handle_command(cmd).await;
1366        }
1367    }
1368
1369    /// Handle a worker command.
1370    ///
1371    /// If handling the command fails, it is automatically retried. All command handlers return
1372    /// [`K8sError`], so we can reasonably assume that a failure is caused by issues communicating
1373    /// with the K8S server and that retrying resolves them eventually.
1374    async fn handle_command(&self, cmd: WorkerCommand) {
1375        async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1376        where
1377            F: Fn() -> U,
1378            U: Future<Output = Result<R, K8sError>>,
1379        {
1380            Retry::default()
1381                .clamp_backoff(Duration::from_secs(10))
1382                .retry_async(|_| {
1383                    f().map_err(
1384                        |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1385                    )
1386                })
1387                .await
1388                .expect("always retries on error")
1389        }
1390
1391        use WorkerCommand::*;
1392        match cmd {
1393            EnsureService { desc } => {
1394                retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1395            }
1396            DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1397            ListServices {
1398                namespace,
1399                result_tx,
1400            } => {
1401                let result = retry(|| self.list_services(&namespace), "ListServices").await;
1402                let _ = result_tx.send(result);
1403            }
1404            FetchServiceMetrics {
1405                name,
1406                info,
1407                result_tx,
1408            } => {
1409                let result = self.fetch_service_metrics(&name, &info).await;
1410                let _ = result_tx.send(result);
1411            }
1412        }
1413    }
1414
1415    async fn fetch_service_metrics(
1416        &self,
1417        name: &str,
1418        info: &ServiceInfo,
1419    ) -> Vec<ServiceProcessMetrics> {
1420        if !self.collect_pod_metrics {
1421            return (0..info.scale)
1422                .map(|_| ServiceProcessMetrics::default())
1423                .collect();
1424        }
1425
1426        /// Usage metrics reported by clusterd processes.
1427        #[derive(Deserialize)]
1428        pub(crate) struct ClusterdUsage {
1429            disk_bytes: Option<u64>,
1430            memory_bytes: Option<u64>,
1431            swap_bytes: Option<u64>,
1432            heap_limit: Option<u64>,
1433        }
1434
1435        /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
1436        ///
1437        /// Note that we want to keep going even if a lookup fails for whatever reason,
1438        /// so this function is infallible. If we fail to get cpu or memory for a particular pod,
1439        /// we just log a warning and install `None` in the returned struct.
1440        async fn get_metrics(
1441            self_: &OrchestratorWorker,
1442            service_name: &str,
1443            i: usize,
1444        ) -> ServiceProcessMetrics {
1445            let name = format!("{service_name}-{i}");
1446
1447            let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1448            let (metrics, clusterd_usage) =
1449                match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1450                {
1451                    (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1452                    (Ok(metrics), Err(e)) => {
1453                        warn!("Failed to fetch clusterd usage for {name}: {e}");
1454                        (metrics, None)
1455                    }
1456                    (Err(e), _) => {
1457                        warn!("Failed to get metrics for {name}: {e}");
1458                        return ServiceProcessMetrics::default();
1459                    }
1460                };
1461            let Some(PodMetricsContainer {
1462                usage:
1463                    PodMetricsContainerUsage {
1464                        cpu: Quantity(cpu_str),
1465                        memory: Quantity(mem_str),
1466                    },
1467                ..
1468            }) = metrics.containers.get(0)
1469            else {
1470                warn!("metrics result contained no containers for {name}");
1471                return ServiceProcessMetrics::default();
1472            };
1473
1474            let mut process_metrics = ServiceProcessMetrics::default();
1475
1476            match parse_k8s_quantity(cpu_str) {
1477                Ok(q) => match q.try_to_integer(-9, true) {
1478                    Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1479                    None => error!("CPU value {q:?} out of range"),
1480                },
1481                Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1482            }
1483            match parse_k8s_quantity(mem_str) {
1484                Ok(q) => match q.try_to_integer(0, false) {
1485                    Some(mem) => process_metrics.memory_bytes = Some(mem),
1486                    None => error!("memory value {q:?} out of range"),
1487                },
1488                Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1489            }
1490
1491            if let Some(usage) = clusterd_usage {
1492                // clusterd may report disk usage as either `disk_bytes`, or `swap_bytes`, or both.
1493                //
1494                // For now the Console expects the swap size to be reported in `disk_bytes`.
1495                // Once the Console has been ported to use `heap_bytes`/`heap_limit`, we can
1496                // simplify things by setting `process_metrics.disk_bytes = usage.disk_bytes`.
1497                process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1498                    (Some(disk), Some(swap)) => Some(disk + swap),
1499                    (disk, swap) => disk.or(swap),
1500                };
1501
1502                // clusterd may report heap usage as `memory_bytes` and optionally `swap_bytes`.
1503                // If no `memory_bytes` is reported, we can't know the heap usage.
1504                process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1505                    (Some(memory), Some(swap)) => Some(memory + swap),
1506                    (Some(memory), None) => Some(memory),
1507                    (None, _) => None,
1508                };
1509
1510                process_metrics.heap_limit = usage.heap_limit;
1511            }
1512
1513            process_metrics
1514        }
1515
1516        /// Get the current usage metrics exposed by a clusterd process.
1517        ///
1518        /// Usage metrics are collected by connecting to a metrics endpoint exposed by the process.
1519        /// The endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
1520        /// `/api/usage-metrics`.
1521        async fn get_clusterd_usage(
1522            self_: &OrchestratorWorker,
1523            service_name: &str,
1524            i: usize,
1525        ) -> anyhow::Result<ClusterdUsage> {
1526            let service = self_
1527                .service_api
1528                .get(service_name)
1529                .await
1530                .with_context(|| format!("failed to get service {service_name}"))?;
1531            let namespace = service
1532                .metadata
1533                .namespace
1534                .context("missing service namespace")?;
1535            let internal_http_port = service
1536                .spec
1537                .and_then(|spec| spec.ports)
1538                .and_then(|ports| {
1539                    ports
1540                        .into_iter()
1541                        .find(|p| p.name == Some("internal-http".into()))
1542                })
1543                .map(|p| p.port);
1544            let Some(port) = internal_http_port else {
1545                bail!("internal-http port missing in service spec");
1546            };
1547            let metrics_url = format!(
1548                "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1549                 /api/usage-metrics"
1550            );
1551
1552            let http_client = reqwest::Client::builder()
1553                .timeout(Duration::from_secs(10))
1554                .build()
1555                .context("error building HTTP client")?;
1556            let resp = http_client.get(metrics_url).send().await?;
1557            let usage = resp.json().await?;
1558
1559            Ok(usage)
1560        }
1561
1562        let ret =
1563            futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into())));
1564
1565        ret.await
1566    }
1567
1568    async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1569        // We inject our own pod's owner references into the Kubernetes objects
1570        // created for the service so that if the
1571        // Deployment/StatefulSet/whatever that owns the pod running the
1572        // orchestrator gets deleted, so do all services spawned by this
1573        // orchestrator.
1574        desc.service
1575            .metadata
1576            .owner_references
1577            .get_or_insert(vec![])
1578            .extend(self.owner_references.iter().cloned());
1579        desc.stateful_set
1580            .metadata
1581            .owner_references
1582            .get_or_insert(vec![])
1583            .extend(self.owner_references.iter().cloned());
1584
1585        let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1586        let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1587        let pod_annotations = pod_metadata.annotations.clone();
1588
1589        self.service_api
1590            .patch(
1591                &desc.name,
1592                &PatchParams::apply(FIELD_MANAGER).force(),
1593                &Patch::Apply(desc.service),
1594            )
1595            .await?;
1596        self.stateful_set_api
1597            .patch(
1598                &desc.name,
1599                &PatchParams::apply(FIELD_MANAGER).force(),
1600                &Patch::Apply(desc.stateful_set),
1601            )
1602            .await?;
1603
1604        // We manage pod recreation manually, using the OnDelete StatefulSet update strategy, for
1605        // two reasons:
1606        //  * Kubernetes doesn't always automatically replace StatefulSet pods when their specs
1607        //    change, see https://github.com/kubernetes/kubernetes#67250.
1608        //  * Kubernetes replaces StatefulSet pods when their annotations change, which is not
1609        //    something we want as it could cause unavailability.
1610        //
1611        // Our pod recreation policy is simple: If a pod's template hash changed, delete it, and
1612        // let the StatefulSet controller recreate it. Otherwise, patch the existing pod's
1613        // annotations to line up with the ones in the spec.
1614        for pod_id in 0..desc.scale {
1615            let pod_name = format!("{}-{pod_id}", desc.name);
1616            let pod = match self.pod_api.get(&pod_name).await {
1617                Ok(pod) => pod,
1618                // Pod already doesn't exist.
1619                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1620                Err(e) => return Err(e),
1621            };
1622
1623            let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1624                != Some(&desc.pod_template_hash)
1625            {
1626                self.pod_api
1627                    .delete(&pod_name, &DeleteParams::default())
1628                    .await
1629                    .map(|_| ())
1630            } else {
1631                let metadata = ObjectMeta {
1632                    annotations: pod_annotations.clone(),
1633                    ..Default::default()
1634                }
1635                .into_request_partial::<Pod>();
1636                self.pod_api
1637                    .patch_metadata(
1638                        &pod_name,
1639                        &PatchParams::apply(FIELD_MANAGER).force(),
1640                        &Patch::Apply(&metadata),
1641                    )
1642                    .await
1643                    .map(|_| ())
1644            };
1645
1646            match result {
1647                Ok(()) => (),
1648                // Pod was deleted concurrently.
1649                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1650                Err(e) => return Err(e),
1651            }
1652        }
1653
1654        Ok(())
1655    }
1656
1657    async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1658        let res = self
1659            .stateful_set_api
1660            .delete(name, &DeleteParams::default())
1661            .await;
1662        match res {
1663            Ok(_) => (),
1664            Err(K8sError::Api(e)) if e.code == 404 => (),
1665            Err(e) => return Err(e),
1666        }
1667
1668        let res = self
1669            .service_api
1670            .delete(name, &DeleteParams::default())
1671            .await;
1672        match res {
1673            Ok(_) => Ok(()),
1674            Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1675            Err(e) => Err(e),
1676        }
1677    }
1678
1679    async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1680        let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1681        let name_prefix = format!("{}{namespace}-", self.name_prefix);
1682        Ok(stateful_sets
1683            .into_iter()
1684            .filter_map(|ss| {
1685                ss.metadata
1686                    .name
1687                    .unwrap()
1688                    .strip_prefix(&name_prefix)
1689                    .map(Into::into)
1690            })
1691            .collect())
1692    }
1693}
1694
1695#[derive(Debug, Clone)]
1696struct KubernetesService {
1697    hosts: Vec<String>,
1698    ports: BTreeMap<String, u16>,
1699}
1700
1701impl Service for KubernetesService {
1702    fn addresses(&self, port: &str) -> Vec<String> {
1703        let port = self.ports[port];
1704        self.hosts
1705            .iter()
1706            .map(|host| format!("{host}:{port}"))
1707            .collect()
1708    }
1709}
1710
1711#[cfg(test)]
1712mod tests {
1713    use super::*;
1714
1715    #[mz_ore::test]
1716    fn k8s_quantity_base10_large() {
1717        let cases = &[
1718            ("42", 42),
1719            ("42k", 42000),
1720            ("42M", 42000000),
1721            ("42G", 42000000000),
1722            ("42T", 42000000000000),
1723            ("42P", 42000000000000000),
1724        ];
1725
1726        for (input, expected) in cases {
1727            let quantity = parse_k8s_quantity(input).unwrap();
1728            let number = quantity.try_to_integer(0, true).unwrap();
1729            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1730        }
1731    }
1732
1733    #[mz_ore::test]
1734    fn k8s_quantity_base10_small() {
1735        let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1736
1737        for (input, expected) in cases {
1738            let quantity = parse_k8s_quantity(input).unwrap();
1739            let number = quantity.try_to_integer(-9, true).unwrap();
1740            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1741        }
1742    }
1743
1744    #[mz_ore::test]
1745    fn k8s_quantity_base2() {
1746        let cases = &[
1747            ("42Ki", 42 << 10),
1748            ("42Mi", 42 << 20),
1749            ("42Gi", 42 << 30),
1750            ("42Ti", 42 << 40),
1751            ("42Pi", 42 << 50),
1752        ];
1753
1754        for (input, expected) in cases {
1755            let quantity = parse_k8s_quantity(input).unwrap();
1756            let number = quantity.try_to_integer(0, false).unwrap();
1757            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1758        }
1759    }
1760}