mz_orchestrator_kubernetes/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
24use k8s_openapi::api::core::v1::{
25    Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
26    NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
27    ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
28    PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
29    PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
30    SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
31    Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
32    WeightedPodAffinityTerm,
33};
34use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
35use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
36    LabelSelector, LabelSelectorRequirement, OwnerReference,
37};
38use kube::ResourceExt;
39use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
40use kube::client::Client;
41use kube::error::Error as K8sError;
42use kube::runtime::{WatchStreamExt, watcher};
43use maplit::btreemap;
44use mz_cloud_resources::AwsExternalIdPrefix;
45use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
46use mz_orchestrator::{
47    DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
48    OfflineReason, Orchestrator, Service, ServiceConfig, ServiceEvent, ServiceProcessMetrics,
49    ServiceStatus, scheduling_config::*,
50};
51use mz_ore::retry::Retry;
52use mz_ore::task::AbortOnDropHandle;
53use serde::Deserialize;
54use sha2::{Digest, Sha256};
55use tokio::sync::{mpsc, oneshot};
56use tracing::{info, warn};
57
58pub mod cloud_resource_controller;
59pub mod secrets;
60pub mod util;
61
62const FIELD_MANAGER: &str = "environmentd";
63const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
64
65const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
66
67/// Configures a [`KubernetesOrchestrator`].
68#[derive(Debug, Clone)]
69pub struct KubernetesOrchestratorConfig {
70    /// The name of a Kubernetes context to use, if the Kubernetes configuration
71    /// is loaded from the local kubeconfig.
72    pub context: String,
73    /// The name of a non-default Kubernetes scheduler to use, if any.
74    pub scheduler_name: Option<String>,
75    /// Labels to install on every service created by the orchestrator.
76    pub service_labels: BTreeMap<String, String>,
77    /// Node selector to install on every service created by the orchestrator.
78    pub service_node_selector: BTreeMap<String, String>,
79    /// The service account that each service should run as, if any.
80    pub service_account: Option<String>,
81    /// The image pull policy to set for services created by the orchestrator.
82    pub image_pull_policy: KubernetesImagePullPolicy,
83    /// An AWS external ID prefix to use when making AWS operations on behalf
84    /// of the environment.
85    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
86    /// Whether to use code coverage mode or not. Always false for production.
87    pub coverage: bool,
88    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
89    /// services that request disk.
90    ///
91    /// If unspecified, the orchestrator will refuse to create services that
92    /// request disk.
93    pub ephemeral_volume_storage_class: Option<String>,
94    /// The optional fs group for service's pods' `securityContext`.
95    pub service_fs_group: Option<i64>,
96    /// The prefix to prepend to all object names
97    pub name_prefix: Option<String>,
98    /// Whether we should attempt to collect metrics from kubernetes
99    pub collect_pod_metrics: bool,
100    /// Whether to annotate pods for prometheus service discovery.
101    pub enable_prometheus_scrape_annotations: bool,
102}
103
104impl KubernetesOrchestratorConfig {
105    pub fn name_prefix(&self) -> String {
106        self.name_prefix.clone().unwrap_or_default()
107    }
108}
109
110/// Specifies whether Kubernetes should pull Docker images when creating pods.
111#[derive(ValueEnum, Debug, Clone, Copy)]
112pub enum KubernetesImagePullPolicy {
113    /// Always pull the Docker image from the registry.
114    Always,
115    /// Pull the Docker image only if the image is not present.
116    IfNotPresent,
117    /// Never pull the Docker image.
118    Never,
119}
120
121impl fmt::Display for KubernetesImagePullPolicy {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        match self {
124            KubernetesImagePullPolicy::Always => f.write_str("Always"),
125            KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
126            KubernetesImagePullPolicy::Never => f.write_str("Never"),
127        }
128    }
129}
130
131impl KubernetesImagePullPolicy {
132    pub fn as_kebab_case_str(&self) -> &'static str {
133        match self {
134            Self::Always => "always",
135            Self::IfNotPresent => "if-not-present",
136            Self::Never => "never",
137        }
138    }
139}
140
141/// An orchestrator backed by Kubernetes.
142pub struct KubernetesOrchestrator {
143    client: Client,
144    kubernetes_namespace: String,
145    config: KubernetesOrchestratorConfig,
146    secret_api: Api<Secret>,
147    vpc_endpoint_api: Api<VpcEndpoint>,
148    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
149    resource_reader: Arc<KubernetesResourceReader>,
150}
151
152impl fmt::Debug for KubernetesOrchestrator {
153    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
154        f.debug_struct("KubernetesOrchestrator").finish()
155    }
156}
157
158impl KubernetesOrchestrator {
159    /// Creates a new Kubernetes orchestrator from the provided configuration.
160    pub async fn new(
161        config: KubernetesOrchestratorConfig,
162    ) -> Result<KubernetesOrchestrator, anyhow::Error> {
163        let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
164        let resource_reader =
165            Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
166        Ok(KubernetesOrchestrator {
167            client: client.clone(),
168            kubernetes_namespace,
169            config,
170            secret_api: Api::default_namespaced(client.clone()),
171            vpc_endpoint_api: Api::default_namespaced(client),
172            namespaces: Mutex::new(BTreeMap::new()),
173            resource_reader,
174        })
175    }
176}
177
178impl Orchestrator for KubernetesOrchestrator {
179    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
180        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
181        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
182            let (command_tx, command_rx) = mpsc::unbounded_channel();
183            let worker = OrchestratorWorker {
184                metrics_api: Api::default_namespaced(self.client.clone()),
185                custom_metrics_api: Api::default_namespaced(self.client.clone()),
186                service_api: Api::default_namespaced(self.client.clone()),
187                stateful_set_api: Api::default_namespaced(self.client.clone()),
188                pod_api: Api::default_namespaced(self.client.clone()),
189                owner_references: vec![],
190                command_rx,
191                name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
192                collect_pod_metrics: self.config.collect_pod_metrics,
193            }
194            .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
195
196            Arc::new(NamespacedKubernetesOrchestrator {
197                pod_api: Api::default_namespaced(self.client.clone()),
198                kubernetes_namespace: self.kubernetes_namespace.clone(),
199                namespace: namespace.into(),
200                config: self.config.clone(),
201                // TODO(guswynn): make this configurable.
202                scheduling_config: Default::default(),
203                service_infos: std::sync::Mutex::new(BTreeMap::new()),
204                command_tx,
205                _worker: worker,
206            })
207        }))
208    }
209}
210
211#[derive(Clone, Copy)]
212struct ServiceInfo {
213    scale: u16,
214    disk: bool,
215    disk_limit: Option<DiskLimit>,
216}
217
218struct NamespacedKubernetesOrchestrator {
219    pod_api: Api<Pod>,
220    kubernetes_namespace: String,
221    namespace: String,
222    config: KubernetesOrchestratorConfig,
223    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
224    service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
225    command_tx: mpsc::UnboundedSender<WorkerCommand>,
226    _worker: AbortOnDropHandle<()>,
227}
228
229impl fmt::Debug for NamespacedKubernetesOrchestrator {
230    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
231        f.debug_struct("NamespacedKubernetesOrchestrator")
232            .field("kubernetes_namespace", &self.kubernetes_namespace)
233            .field("namespace", &self.namespace)
234            .field("config", &self.config)
235            .finish()
236    }
237}
238
239/// Commands sent from a [`NamespacedKubernetesOrchestrator`] to its
240/// [`OrchestratorWorker`].
241///
242/// Commands for which the caller expects a result include a `result_tx` on which the
243/// [`OrchestratorWorker`] will deliver the result.
244enum WorkerCommand {
245    EnsureService {
246        desc: ServiceDescription,
247    },
248    DropService {
249        name: String,
250    },
251    ListServices {
252        namespace: String,
253        result_tx: oneshot::Sender<Vec<String>>,
254    },
255    FetchServiceMetrics {
256        name: String,
257        info: ServiceInfo,
258        result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
259    },
260}
261
262/// A description of a service to be created by an [`OrchestratorWorker`].
263#[derive(Debug, Clone)]
264struct ServiceDescription {
265    name: String,
266    scale: u16,
267    service: K8sService,
268    stateful_set: StatefulSet,
269    pod_template_hash: String,
270}
271
272/// A task executing blocking work for a [`NamespacedKubernetesOrchestrator`] in the background.
273///
274/// This type exists to enable making [`NamespacedKubernetesOrchestrator::ensure_service`] and
275/// [`NamespacedKubernetesOrchestrator::drop_service`] non-blocking, allowing invocation of these
276/// methods in latency-sensitive contexts.
277///
278/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
279/// orchestrator calls that query service state (such as `list_services`). These need to be
280/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
281/// ensure that a `list_services` result contains exactly those services that were previously
282/// created with `ensure_service` and not yet dropped with `drop_service`.
283struct OrchestratorWorker {
284    metrics_api: Api<PodMetrics>,
285    custom_metrics_api: Api<MetricValueList>,
286    service_api: Api<K8sService>,
287    stateful_set_api: Api<StatefulSet>,
288    pod_api: Api<Pod>,
289    owner_references: Vec<OwnerReference>,
290    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
291    name_prefix: String,
292    collect_pod_metrics: bool,
293}
294
295#[derive(Deserialize, Clone, Debug)]
296pub struct PodMetricsContainer {
297    pub name: String,
298    pub usage: PodMetricsContainerUsage,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainerUsage {
303    pub cpu: Quantity,
304    pub memory: Quantity,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetrics {
309    pub metadata: ObjectMeta,
310    pub timestamp: String,
311    pub window: String,
312    pub containers: Vec<PodMetricsContainer>,
313}
314
315impl k8s_openapi::Resource for PodMetrics {
316    const GROUP: &'static str = "metrics.k8s.io";
317    const KIND: &'static str = "PodMetrics";
318    const VERSION: &'static str = "v1beta1";
319    const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
320    const URL_PATH_SEGMENT: &'static str = "pods";
321
322    type Scope = k8s_openapi::NamespaceResourceScope;
323}
324
325impl k8s_openapi::Metadata for PodMetrics {
326    type Ty = ObjectMeta;
327
328    fn metadata(&self) -> &Self::Ty {
329        &self.metadata
330    }
331
332    fn metadata_mut(&mut self) -> &mut Self::Ty {
333        &mut self.metadata
334    }
335}
336
337// Note that these types are very weird. We are `get`-ing a
338// `List` object, and lying about it having an `ObjectMeta`
339// (it deserializes as empty, but we don't need it). The custom
340// metrics API is designed this way, which is very non-standard.
341// A discussion in the `kube` channel in the `tokio` discord
342// confirmed that this layout + using `get_subresource` is the
343// best way to handle this.
344
345#[derive(Deserialize, Clone, Debug)]
346pub struct MetricIdentifier {
347    #[serde(rename = "metricName")]
348    pub name: String,
349    // We skip `selector` for now, as we don't use it
350}
351
352#[derive(Deserialize, Clone, Debug)]
353pub struct MetricValue {
354    #[serde(rename = "describedObject")]
355    pub described_object: ObjectReference,
356    #[serde(flatten)]
357    pub metric_identifier: MetricIdentifier,
358    pub timestamp: String,
359    pub value: Quantity,
360    // We skip `windowSeconds`, as we don't need it
361}
362
363#[derive(Deserialize, Clone, Debug)]
364pub struct MetricValueList {
365    pub metadata: ObjectMeta,
366    pub items: Vec<MetricValue>,
367}
368
369impl k8s_openapi::Resource for MetricValueList {
370    const GROUP: &'static str = "custom.metrics.k8s.io";
371    const KIND: &'static str = "MetricValueList";
372    const VERSION: &'static str = "v1beta1";
373    const API_VERSION: &'static str = "custom.metrics.k8s.io/v1beta1";
374    const URL_PATH_SEGMENT: &'static str = "persistentvolumeclaims";
375
376    type Scope = k8s_openapi::NamespaceResourceScope;
377}
378
379impl k8s_openapi::Metadata for MetricValueList {
380    type Ty = ObjectMeta;
381
382    fn metadata(&self) -> &Self::Ty {
383        &self.metadata
384    }
385
386    fn metadata_mut(&mut self) -> &mut Self::Ty {
387        &mut self.metadata
388    }
389}
390
391impl NamespacedKubernetesOrchestrator {
392    fn service_name(&self, id: &str) -> String {
393        format!(
394            "{}{}-{id}",
395            self.config.name_prefix.as_deref().unwrap_or(""),
396            self.namespace
397        )
398    }
399
400    /// Return a `watcher::Config` instance that limits results to the namespace
401    /// assigned to this orchestrator.
402    fn watch_pod_params(&self) -> watcher::Config {
403        let ns_selector = format!(
404            "environmentd.materialize.cloud/namespace={}",
405            self.namespace
406        );
407        watcher::Config::default().labels(&ns_selector)
408    }
409
410    /// Convert a higher-level label key to the actual one we
411    /// will give to Kubernetes
412    fn make_label_key(&self, key: &str) -> String {
413        format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
414    }
415
416    fn label_selector_to_k8s(
417        &self,
418        MzLabelSelector { label_name, logic }: MzLabelSelector,
419    ) -> Result<LabelSelectorRequirement, anyhow::Error> {
420        let (operator, values) = match logic {
421            LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
422            LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
423            LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
424            LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
425            LabelSelectionLogic::InSet { values } => {
426                if values.is_empty() {
427                    Err(anyhow!(
428                        "Invalid selector logic for {label_name}: empty `in` set"
429                    ))
430                } else {
431                    Ok(("In", values))
432                }
433            }
434            LabelSelectionLogic::NotInSet { values } => {
435                if values.is_empty() {
436                    Err(anyhow!(
437                        "Invalid selector logic for {label_name}: empty `notin` set"
438                    ))
439                } else {
440                    Ok(("NotIn", values))
441                }
442            }
443        }?;
444        let lsr = LabelSelectorRequirement {
445            key: self.make_label_key(&label_name),
446            operator: operator.to_string(),
447            values: Some(values),
448        };
449        Ok(lsr)
450    }
451
452    fn send_command(&self, cmd: WorkerCommand) {
453        self.command_tx.send(cmd).expect("worker task not dropped");
454    }
455}
456
457#[derive(Debug)]
458struct ScaledQuantity {
459    integral_part: u64,
460    exponent: i8,
461    base10: bool,
462}
463
464impl ScaledQuantity {
465    pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
466        if base10 != self.base10 {
467            return None;
468        }
469        let exponent = self.exponent - scale;
470        let mut result = self.integral_part;
471        let base = if self.base10 { 10 } else { 2 };
472        if exponent < 0 {
473            for _ in exponent..0 {
474                result /= base;
475            }
476        } else {
477            for _ in 0..exponent {
478                result = result.checked_mul(base)?;
479            }
480        }
481        Some(result)
482    }
483}
484
485// Parse a k8s `Quantity` object
486// into a numeric value.
487//
488// This is intended to support collecting CPU and Memory data.
489// Thus, there are a few that things Kubernetes attempts to do, that we don't,
490// because I've never observed metrics-server specifically sending them:
491// (1) Handle negative numbers (because it's not useful for that use-case)
492// (2) Handle non-integers (because I have never observed them being actually sent)
493// (3) Handle scientific notation (e.g. 1.23e2)
494fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
495    const DEC_SUFFIXES: &[(&str, i8)] = &[
496        ("n", -9),
497        ("u", -6),
498        ("m", -3),
499        ("", 0),
500        ("k", 3), // yep, intentionally lowercase.
501        ("M", 6),
502        ("G", 9),
503        ("T", 12),
504        ("P", 15),
505        ("E", 18),
506    ];
507    const BIN_SUFFIXES: &[(&str, i8)] = &[
508        ("", 0),
509        ("Ki", 10),
510        ("Mi", 20),
511        ("Gi", 30),
512        ("Ti", 40),
513        ("Pi", 50),
514        ("Ei", 60),
515    ];
516
517    let (positive, s) = match s.chars().next() {
518        Some('+') => (true, &s[1..]),
519        Some('-') => (false, &s[1..]),
520        _ => (true, s),
521    };
522
523    if !positive {
524        anyhow::bail!("Negative numbers not supported")
525    }
526
527    fn is_suffix_char(ch: char) -> bool {
528        "numkMGTPEKi".contains(ch)
529    }
530    let (num, suffix) = match s.find(is_suffix_char) {
531        None => (s, ""),
532        Some(idx) => s.split_at(idx),
533    };
534    let num: u64 = num.parse()?;
535    let (exponent, base10) = if let Some((_, exponent)) =
536        DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
537    {
538        (exponent, true)
539    } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
540        (exponent, false)
541    } else {
542        anyhow::bail!("Unrecognized suffix: {suffix}");
543    };
544    Ok(ScaledQuantity {
545        integral_part: num,
546        exponent: *exponent,
547        base10,
548    })
549}
550
551#[async_trait]
552impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
553    async fn fetch_service_metrics(
554        &self,
555        id: &str,
556    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
557        let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
558            *info
559        } else {
560            // This should have been set in `ensure_service`.
561            tracing::error!("Failed to get info for {id}");
562            anyhow::bail!("Failed to get info for {id}");
563        };
564
565        let (result_tx, result_rx) = oneshot::channel();
566        self.send_command(WorkerCommand::FetchServiceMetrics {
567            name: self.service_name(id),
568            info,
569            result_tx,
570        });
571
572        let metrics = result_rx.await.expect("worker task not dropped");
573        Ok(metrics)
574    }
575
576    fn ensure_service(
577        &self,
578        id: &str,
579        ServiceConfig {
580            image,
581            init_container_image,
582            args,
583            ports: ports_in,
584            memory_limit,
585            cpu_limit,
586            scale,
587            labels: labels_in,
588            availability_zones,
589            other_replicas_selector,
590            replicas_selector,
591            disk: disk_in,
592            disk_limit,
593            node_selector,
594        }: ServiceConfig,
595    ) -> Result<Box<dyn Service>, anyhow::Error> {
596        // This is extremely cheap to clone, so just look into the lock once.
597        let scheduling_config: ServiceSchedulingConfig =
598            self.scheduling_config.read().expect("poisoned").clone();
599
600        // Determining whether to enable disk is subtle because we need to
601        // support historical sizes in the managed service and custom sizes in
602        // self hosted deployments.
603        let disk = {
604            // Whether the user specified `DISK = TRUE` when creating the
605            // replica OR whether the feature flag to force disk is enabled.
606            let user_requested_disk = disk_in || scheduling_config.always_use_disk;
607            // Whether the cluster replica size map provided by the
608            // administrator explicitly indicates that the size does not support
609            // disk.
610            let size_disables_disk = disk_limit == Some(DiskLimit::ZERO);
611            // Enable disk if the user requested it and the size does not
612            // disable it.
613            //
614            // Arguably we should not allow the user to request disk with sizes
615            // that have a zero disk limit, but configuring disk on a replica by
616            // replica basis is a legacy option that we hope to remove someday.
617            user_requested_disk && !size_disables_disk
618        };
619
620        let name = self.service_name(id);
621        // The match labels should be the minimal set of labels that uniquely
622        // identify the pods in the stateful set. Changing these after the
623        // `StatefulSet` is created is not permitted by Kubernetes, and we're
624        // not yet smart enough to handle deleting and recreating the
625        // `StatefulSet`.
626        let match_labels = btreemap! {
627            "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
628            "environmentd.materialize.cloud/service-id".into() => id.into(),
629        };
630        let mut labels = match_labels.clone();
631        for (key, value) in labels_in {
632            labels.insert(self.make_label_key(&key), value);
633        }
634
635        labels.insert(self.make_label_key("scale"), scale.to_string());
636
637        for port in &ports_in {
638            labels.insert(
639                format!("environmentd.materialize.cloud/port-{}", port.name),
640                "true".into(),
641            );
642        }
643        for (key, value) in &self.config.service_labels {
644            labels.insert(key.clone(), value.clone());
645        }
646        let mut limits = BTreeMap::new();
647        if let Some(memory_limit) = memory_limit {
648            limits.insert(
649                "memory".into(),
650                Quantity(memory_limit.0.as_u64().to_string()),
651            );
652        }
653        if let Some(cpu_limit) = cpu_limit {
654            limits.insert(
655                "cpu".into(),
656                Quantity(format!("{}m", cpu_limit.as_millicpus())),
657            );
658        }
659        let service = K8sService {
660            metadata: ObjectMeta {
661                name: Some(name.clone()),
662                ..Default::default()
663            },
664            spec: Some(ServiceSpec {
665                ports: Some(
666                    ports_in
667                        .iter()
668                        .map(|port| ServicePort {
669                            port: port.port_hint.into(),
670                            name: Some(port.name.clone()),
671                            ..Default::default()
672                        })
673                        .collect(),
674                ),
675                cluster_ip: Some("None".to_string()),
676                selector: Some(match_labels.clone()),
677                ..Default::default()
678            }),
679            status: None,
680        };
681
682        let hosts = (0..scale)
683            .map(|i| {
684                format!(
685                    "{name}-{i}.{name}.{}.svc.cluster.local",
686                    self.kubernetes_namespace
687                )
688            })
689            .collect::<Vec<_>>();
690        let ports = ports_in
691            .iter()
692            .map(|p| (p.name.clone(), p.port_hint))
693            .collect::<BTreeMap<_, _>>();
694        let listen_addrs = ports_in
695            .iter()
696            .map(|p| (p.name.clone(), format!("0.0.0.0:{}", p.port_hint)))
697            .collect::<BTreeMap<_, _>>();
698        let mut args = args(&listen_addrs);
699
700        // This constrains the orchestrator (for those orchestrators that support
701        // anti-affinity, today just k8s) to never schedule pods for different replicas
702        // of the same cluster on the same node. Pods from the _same_ replica are fine;
703        // pods from different clusters are also fine.
704        //
705        // The point is that if pods of two replicas are on the same node, that node
706        // going down would kill both replicas, and so the replication factor of the
707        // cluster in question is illusory.
708        let anti_affinity = Some({
709            let label_selector_requirements = other_replicas_selector
710                .clone()
711                .into_iter()
712                .map(|ls| self.label_selector_to_k8s(ls))
713                .collect::<Result<Vec<_>, _>>()?;
714            let ls = LabelSelector {
715                match_expressions: Some(label_selector_requirements),
716                ..Default::default()
717            };
718            let pat = PodAffinityTerm {
719                label_selector: Some(ls),
720                topology_key: "kubernetes.io/hostname".to_string(),
721                ..Default::default()
722            };
723
724            if !scheduling_config.soften_replication_anti_affinity {
725                PodAntiAffinity {
726                    required_during_scheduling_ignored_during_execution: Some(vec![pat]),
727                    ..Default::default()
728                }
729            } else {
730                PodAntiAffinity {
731                    preferred_during_scheduling_ignored_during_execution: Some(vec![
732                        WeightedPodAffinityTerm {
733                            weight: scheduling_config.soften_replication_anti_affinity_weight,
734                            pod_affinity_term: pat,
735                        },
736                    ]),
737                    ..Default::default()
738                }
739            }
740        });
741
742        let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
743            // `match_labels` sufficiently selects pods in the same replica.
744            let ls = LabelSelector {
745                match_labels: Some(match_labels.clone()),
746                ..Default::default()
747            };
748            let pat = PodAffinityTerm {
749                label_selector: Some(ls),
750                topology_key: "topology.kubernetes.io/zone".to_string(),
751                ..Default::default()
752            };
753
754            Some(PodAffinity {
755                preferred_during_scheduling_ignored_during_execution: Some(vec![
756                    WeightedPodAffinityTerm {
757                        weight,
758                        pod_affinity_term: pat,
759                    },
760                ]),
761                ..Default::default()
762            })
763        } else {
764            None
765        };
766
767        let topology_spread = if scheduling_config.topology_spread.enabled {
768            let config = &scheduling_config.topology_spread;
769
770            if !config.ignore_non_singular_scale || scale <= 1 {
771                let label_selector_requirements = (if config.ignore_non_singular_scale {
772                    let mut replicas_selector_ignoring_scale = replicas_selector.clone();
773
774                    replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
775                        label_name: "scale".into(),
776                        logic: mz_orchestrator::LabelSelectionLogic::Eq {
777                            value: "1".to_string(),
778                        },
779                    });
780
781                    replicas_selector_ignoring_scale
782                } else {
783                    replicas_selector
784                })
785                .into_iter()
786                .map(|ls| self.label_selector_to_k8s(ls))
787                .collect::<Result<Vec<_>, _>>()?;
788                let ls = LabelSelector {
789                    match_expressions: Some(label_selector_requirements),
790                    ..Default::default()
791                };
792
793                let constraint = TopologySpreadConstraint {
794                    label_selector: Some(ls),
795                    min_domains: None,
796                    max_skew: config.max_skew,
797                    topology_key: "topology.kubernetes.io/zone".to_string(),
798                    when_unsatisfiable: if config.soft {
799                        "ScheduleAnyway".to_string()
800                    } else {
801                        "DoNotSchedule".to_string()
802                    },
803                    // TODO(guswynn): restore these once they are supported.
804                    // Consider node affinities when calculating topology spread. This is the
805                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_affinity_policy>,
806                    // made explicit.
807                    // node_affinity_policy: Some("Honor".to_string()),
808                    // Do not consider node taints when calculating topology spread. This is the
809                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_taints_policy>,
810                    // made explicit.
811                    // node_taints_policy: Some("Ignore".to_string()),
812                    match_label_keys: None,
813                    // Once the above are restorted, we should't have `..Default::default()` here because the specifics of these fields are
814                    // subtle enough where we want compilation failures when we upgrade
815                    ..Default::default()
816                };
817                Some(vec![constraint])
818            } else {
819                None
820            }
821        } else {
822            None
823        };
824
825        let mut pod_annotations = btreemap! {
826            // Prevent the cluster-autoscaler (or karpenter) from evicting these pods in attempts to scale down
827            // and terminate nodes.
828            // This will cost us more money, but should give us better uptime.
829            // This does not prevent all evictions by Kubernetes, only the ones initiated by the
830            // cluster-autoscaler (or karpenter). Notably, eviction of pods for resource overuse is still enabled.
831            "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
832            "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
833
834            // It's called do-not-disrupt in newer versions of karpenter, so adding for forward/backward compatibility
835            "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
836        };
837        if self.config.enable_prometheus_scrape_annotations {
838            if let Some(internal_http_port) = ports_in
839                .iter()
840                .find(|port| port.name == "internal-http")
841                .map(|port| port.port_hint.to_string())
842            {
843                // Enable prometheus scrape discovery
844                pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
845                pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
846                pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
847                pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
848            }
849        }
850
851        let default_node_selector = if disk {
852            vec![("materialize.cloud/disk".to_string(), disk.to_string())]
853        } else {
854            // if the cluster doesn't require disk, we can omit the selector
855            // allowing it to be scheduled onto nodes with and without the
856            // selector
857            vec![]
858        };
859
860        let node_selector: BTreeMap<String, String> = default_node_selector
861            .into_iter()
862            .chain(self.config.service_node_selector.clone())
863            .chain(node_selector)
864            .collect();
865
866        let node_affinity = if let Some(availability_zones) = availability_zones {
867            let selector = NodeSelectorTerm {
868                match_expressions: Some(vec![NodeSelectorRequirement {
869                    key: "materialize.cloud/availability-zone".to_string(),
870                    operator: "In".to_string(),
871                    values: Some(availability_zones),
872                }]),
873                match_fields: None,
874            };
875
876            if scheduling_config.soften_az_affinity {
877                Some(NodeAffinity {
878                    preferred_during_scheduling_ignored_during_execution: Some(vec![
879                        PreferredSchedulingTerm {
880                            preference: selector,
881                            weight: scheduling_config.soften_az_affinity_weight,
882                        },
883                    ]),
884                    required_during_scheduling_ignored_during_execution: None,
885                })
886            } else {
887                Some(NodeAffinity {
888                    preferred_during_scheduling_ignored_during_execution: None,
889                    required_during_scheduling_ignored_during_execution: Some(NodeSelector {
890                        node_selector_terms: vec![selector],
891                    }),
892                })
893            }
894        } else {
895            None
896        };
897
898        let container_name = image
899            .rsplit_once('/')
900            .and_then(|(_, name_version)| name_version.rsplit_once(':'))
901            .context("`image` is not ORG/NAME:VERSION")?
902            .0
903            .to_string();
904
905        let container_security_context = if scheduling_config.security_context_enabled {
906            Some(SecurityContext {
907                privileged: Some(false),
908                run_as_non_root: Some(true),
909                allow_privilege_escalation: Some(false),
910                seccomp_profile: Some(SeccompProfile {
911                    type_: "RuntimeDefault".to_string(),
912                    ..Default::default()
913                }),
914                capabilities: Some(Capabilities {
915                    drop: Some(vec!["ALL".to_string()]),
916                    ..Default::default()
917                }),
918                ..Default::default()
919            })
920        } else {
921            None
922        };
923
924        let init_containers = init_container_image.map(|image| {
925            vec![Container {
926                name: "init".to_string(),
927                image: Some(image),
928                image_pull_policy: Some(self.config.image_pull_policy.to_string()),
929                resources: Some(ResourceRequirements {
930                    claims: None,
931                    // Set both limits and requests to the same values, to ensure a
932                    // `Guaranteed` QoS class for the pod.
933                    limits: Some(limits.clone()),
934                    requests: Some(limits.clone()),
935                }),
936                security_context: container_security_context.clone(),
937                env: Some(vec![
938                    EnvVar {
939                        name: "MZ_NAMESPACE".to_string(),
940                        value_from: Some(EnvVarSource {
941                            field_ref: Some(ObjectFieldSelector {
942                                field_path: "metadata.namespace".to_string(),
943                                ..Default::default()
944                            }),
945                            ..Default::default()
946                        }),
947                        ..Default::default()
948                    },
949                    EnvVar {
950                        name: "MZ_POD_NAME".to_string(),
951                        value_from: Some(EnvVarSource {
952                            field_ref: Some(ObjectFieldSelector {
953                                field_path: "metadata.name".to_string(),
954                                ..Default::default()
955                            }),
956                            ..Default::default()
957                        }),
958                        ..Default::default()
959                    },
960                    EnvVar {
961                        name: "MZ_NODE_NAME".to_string(),
962                        value_from: Some(EnvVarSource {
963                            field_ref: Some(ObjectFieldSelector {
964                                field_path: "spec.nodeName".to_string(),
965                                ..Default::default()
966                            }),
967                            ..Default::default()
968                        }),
969                        ..Default::default()
970                    },
971                ]),
972                ..Default::default()
973            }]
974        });
975
976        let env = if self.config.coverage {
977            Some(vec![EnvVar {
978                name: "LLVM_PROFILE_FILE".to_string(),
979                value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
980                ..Default::default()
981            }])
982        } else {
983            None
984        };
985
986        let mut volume_mounts = vec![];
987
988        if self.config.coverage {
989            volume_mounts.push(VolumeMount {
990                name: "coverage".to_string(),
991                mount_path: "/coverage".to_string(),
992                ..Default::default()
993            })
994        }
995
996        let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
997            (true, Some(ephemeral_volume_storage_class)) => {
998                volume_mounts.push(VolumeMount {
999                    name: "scratch".to_string(),
1000                    mount_path: "/scratch".to_string(),
1001                    ..Default::default()
1002                });
1003                args.push("--scratch-directory=/scratch".into());
1004
1005                Some(vec![Volume {
1006                    name: "scratch".to_string(),
1007                    ephemeral: Some(EphemeralVolumeSource {
1008                        volume_claim_template: Some(PersistentVolumeClaimTemplate {
1009                            spec: PersistentVolumeClaimSpec {
1010                                access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1011                                storage_class_name: Some(
1012                                    ephemeral_volume_storage_class.to_string(),
1013                                ),
1014                                resources: Some(VolumeResourceRequirements {
1015                                    requests: Some(BTreeMap::from([(
1016                                        "storage".to_string(),
1017                                        Quantity(
1018                                            disk_limit
1019                                                .unwrap_or(DiskLimit::ARBITRARY)
1020                                                .0
1021                                                .as_u64()
1022                                                .to_string(),
1023                                        ),
1024                                    )])),
1025                                    ..Default::default()
1026                                }),
1027                                ..Default::default()
1028                            },
1029                            ..Default::default()
1030                        }),
1031                        ..Default::default()
1032                    }),
1033                    ..Default::default()
1034                }])
1035            }
1036            (true, None) => {
1037                return Err(anyhow!(
1038                    "service requested disk but no ephemeral volume storage class was configured"
1039                ));
1040            }
1041            (false, _) => None,
1042        };
1043
1044        if let Some(name_prefix) = &self.config.name_prefix {
1045            args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1046        }
1047
1048        let volume_claim_templates = if self.config.coverage {
1049            Some(vec![PersistentVolumeClaim {
1050                metadata: ObjectMeta {
1051                    name: Some("coverage".to_string()),
1052                    ..Default::default()
1053                },
1054                spec: Some(PersistentVolumeClaimSpec {
1055                    access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1056                    resources: Some(VolumeResourceRequirements {
1057                        requests: Some(BTreeMap::from([(
1058                            "storage".to_string(),
1059                            Quantity("10Gi".to_string()),
1060                        )])),
1061                        ..Default::default()
1062                    }),
1063                    ..Default::default()
1064                }),
1065                ..Default::default()
1066            }])
1067        } else {
1068            None
1069        };
1070
1071        let security_context = if let Some(fs_group) = self.config.service_fs_group {
1072            Some(PodSecurityContext {
1073                fs_group: Some(fs_group),
1074                run_as_user: Some(fs_group),
1075                run_as_group: Some(fs_group),
1076                ..Default::default()
1077            })
1078        } else {
1079            None
1080        };
1081
1082        let tolerations = Some(vec![
1083            // When the node becomes `NotReady` it indicates there is a problem
1084            // with the node. By default Kubernetes waits 300s (5 minutes)
1085            // before descheduling the pod, but we tune this to 30s for faster
1086            // recovery in the case of node failure.
1087            Toleration {
1088                effect: Some("NoExecute".into()),
1089                key: Some("node.kubernetes.io/not-ready".into()),
1090                operator: Some("Exists".into()),
1091                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1092                value: None,
1093            },
1094            Toleration {
1095                effect: Some("NoExecute".into()),
1096                key: Some("node.kubernetes.io/unreachable".into()),
1097                operator: Some("Exists".into()),
1098                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1099                value: None,
1100            },
1101        ]);
1102
1103        let mut pod_template_spec = PodTemplateSpec {
1104            metadata: Some(ObjectMeta {
1105                labels: Some(labels.clone()),
1106                annotations: Some(pod_annotations), // Do not delete, we insert into it below.
1107                ..Default::default()
1108            }),
1109            spec: Some(PodSpec {
1110                init_containers,
1111                containers: vec![Container {
1112                    name: container_name,
1113                    image: Some(image),
1114                    args: Some(args),
1115                    image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1116                    ports: Some(
1117                        ports_in
1118                            .iter()
1119                            .map(|port| ContainerPort {
1120                                container_port: port.port_hint.into(),
1121                                name: Some(port.name.clone()),
1122                                ..Default::default()
1123                            })
1124                            .collect(),
1125                    ),
1126                    security_context: container_security_context.clone(),
1127                    resources: Some(ResourceRequirements {
1128                        claims: None,
1129                        // Set both limits and requests to the same values, to ensure a
1130                        // `Guaranteed` QoS class for the pod.
1131                        limits: Some(limits.clone()),
1132                        requests: Some(limits),
1133                    }),
1134                    volume_mounts: if !volume_mounts.is_empty() {
1135                        Some(volume_mounts)
1136                    } else {
1137                        None
1138                    },
1139                    env,
1140                    ..Default::default()
1141                }],
1142                volumes,
1143                security_context,
1144                node_selector: Some(node_selector),
1145                scheduler_name: self.config.scheduler_name.clone(),
1146                service_account: self.config.service_account.clone(),
1147                affinity: Some(Affinity {
1148                    pod_anti_affinity: anti_affinity,
1149                    pod_affinity,
1150                    node_affinity,
1151                    ..Default::default()
1152                }),
1153                topology_spread_constraints: topology_spread,
1154                tolerations,
1155                // Setting a 0s termination grace period has the side effect of
1156                // automatically starting a new pod when the previous pod is
1157                // currently terminating. This enables recovery from a node
1158                // failure with no manual intervention. Without this setting,
1159                // the StatefulSet controller will refuse to start a new pod
1160                // until the failed node is manually removed from the Kubernetes
1161                // cluster.
1162                //
1163                // The Kubernetes documentation strongly advises against this
1164                // setting, as StatefulSets attempt to provide "at most once"
1165                // semantics [0]--that is, the guarantee that for a given pod in
1166                // a StatefulSet there is *at most* one pod with that identity
1167                // running in the cluster.
1168                //
1169                // Materialize services, however, are carefully designed to
1170                // *not* rely on this guarantee. In fact, we do not believe that
1171                // correct distributed systems can meaningfully rely on
1172                // Kubernetes's guarantee--network packets from a pod can be
1173                // arbitrarily delayed, long past that pod's termination.
1174                //
1175                // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1176                termination_grace_period_seconds: Some(0),
1177                ..Default::default()
1178            }),
1179        };
1180        let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1181        let mut hasher = Sha256::new();
1182        hasher.update(pod_template_json);
1183        let pod_template_hash = format!("{:x}", hasher.finalize());
1184        pod_template_spec
1185            .metadata
1186            .as_mut()
1187            .unwrap()
1188            .annotations
1189            .as_mut()
1190            .unwrap()
1191            .insert(
1192                POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1193                pod_template_hash.clone(),
1194            );
1195
1196        let stateful_set = StatefulSet {
1197            metadata: ObjectMeta {
1198                name: Some(name.clone()),
1199                ..Default::default()
1200            },
1201            spec: Some(StatefulSetSpec {
1202                selector: LabelSelector {
1203                    match_labels: Some(match_labels),
1204                    ..Default::default()
1205                },
1206                service_name: name.clone(),
1207                replicas: Some(scale.into()),
1208                template: pod_template_spec,
1209                pod_management_policy: Some("Parallel".to_string()),
1210                volume_claim_templates,
1211                ..Default::default()
1212            }),
1213            status: None,
1214        };
1215
1216        self.send_command(WorkerCommand::EnsureService {
1217            desc: ServiceDescription {
1218                name,
1219                scale,
1220                service,
1221                stateful_set,
1222                pod_template_hash,
1223            },
1224        });
1225
1226        self.service_infos.lock().expect("poisoned lock").insert(
1227            id.to_string(),
1228            ServiceInfo {
1229                scale,
1230                disk,
1231                disk_limit,
1232            },
1233        );
1234
1235        Ok(Box::new(KubernetesService { hosts, ports }))
1236    }
1237
1238    /// Drops the identified service, if it exists.
1239    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1240        fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1241        self.service_infos.lock().expect("poisoned lock").remove(id);
1242
1243        self.send_command(WorkerCommand::DropService {
1244            name: self.service_name(id),
1245        });
1246
1247        Ok(())
1248    }
1249
1250    /// Lists the identifiers of all known services.
1251    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1252        let (result_tx, result_rx) = oneshot::channel();
1253        self.send_command(WorkerCommand::ListServices {
1254            namespace: self.namespace.clone(),
1255            result_tx,
1256        });
1257
1258        let list = result_rx.await.expect("worker task not dropped");
1259        Ok(list)
1260    }
1261
1262    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1263        fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1264            let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1265            let service_id_label = "environmentd.materialize.cloud/service-id";
1266            let service_id = pod
1267                .labels()
1268                .get(service_id_label)
1269                .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1270                .clone();
1271
1272            let oomed = pod
1273                .status
1274                .as_ref()
1275                .and_then(|status| status.container_statuses.as_ref())
1276                .map(|container_statuses| {
1277                    container_statuses.iter().any(|cs| {
1278                        // The container might have already transitioned from "terminated" to
1279                        // "waiting"/"running" state, in which case we need to check its previous
1280                        // state to find out why it terminated.
1281                        let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1282                        let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1283                        let termination_state = current_state.or(last_state);
1284
1285                        // The interesting exit codes are 135 (SIGBUS) and 137 (SIGKILL). SIGKILL
1286                        // occurs when the OOM killer terminates the container, SIGBUS occurs when
1287                        // the container runs out of disk. We treat the latter as an OOM condition
1288                        // too since we only use disk for spilling memory.
1289                        let exit_code = termination_state.map(|s| s.exit_code);
1290                        exit_code == Some(135) || exit_code == Some(137)
1291                    })
1292                })
1293                .unwrap_or(false);
1294
1295            let (pod_ready, last_probe_time) = pod
1296                .status
1297                .and_then(|status| status.conditions)
1298                .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1299                .map(|c| (c.status == "True", c.last_probe_time))
1300                .unwrap_or((false, None));
1301
1302            let status = if pod_ready {
1303                ServiceStatus::Online
1304            } else {
1305                ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1306            };
1307            let time = if let Some(time) = last_probe_time {
1308                time.0
1309            } else {
1310                Utc::now()
1311            };
1312
1313            Ok(ServiceEvent {
1314                service_id,
1315                process_id,
1316                status,
1317                time,
1318            })
1319        }
1320
1321        let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1322            .touched_objects()
1323            .filter_map(|object| async move {
1324                match object {
1325                    Ok(pod) => Some(into_service_event(pod)),
1326                    Err(error) => {
1327                        // We assume that errors returned by Kubernetes are usually transient, so we
1328                        // just log a warning and ignore them otherwise.
1329                        tracing::warn!("service watch error: {error}");
1330                        None
1331                    }
1332                }
1333            });
1334        Box::pin(stream)
1335    }
1336
1337    fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1338        *self.scheduling_config.write().expect("poisoned") = config;
1339    }
1340}
1341
1342impl OrchestratorWorker {
1343    fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1344        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1345    }
1346
1347    async fn run(mut self) {
1348        {
1349            info!("initializing Kubernetes orchestrator worker");
1350            let start = Instant::now();
1351
1352            // Fetch the owner reference for our own pod (usually a
1353            // StatefulSet), so that we can propagate it to the services we
1354            // create.
1355            let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1356            let orchestrator_pod = Retry::default()
1357                .clamp_backoff(Duration::from_secs(10))
1358                .retry_async(|_| self.pod_api.get(&hostname))
1359                .await
1360                .expect("always retries on error");
1361            self.owner_references
1362                .extend(orchestrator_pod.owner_references().into_iter().cloned());
1363
1364            info!(
1365                "Kubernetes orchestrator worker initialized in {:?}",
1366                start.elapsed()
1367            );
1368        }
1369
1370        while let Some(cmd) = self.command_rx.recv().await {
1371            self.handle_command(cmd).await;
1372        }
1373    }
1374
1375    /// Handle a worker command.
1376    ///
1377    /// If handling the command fails, it is automatically retried. All command handlers return
1378    /// [`K8sError`], so we can reasonably assume that a failure is caused by issues communicating
1379    /// with the K8S server and that retrying resolves them eventually.
1380    async fn handle_command(&self, cmd: WorkerCommand) {
1381        async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1382        where
1383            F: Fn() -> U,
1384            U: Future<Output = Result<R, K8sError>>,
1385        {
1386            Retry::default()
1387                .clamp_backoff(Duration::from_secs(10))
1388                .retry_async(|_| {
1389                    f().map_err(
1390                        |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1391                    )
1392                })
1393                .await
1394                .expect("always retries on error")
1395        }
1396
1397        use WorkerCommand::*;
1398        match cmd {
1399            EnsureService { desc } => {
1400                retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1401            }
1402            DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1403            ListServices {
1404                namespace,
1405                result_tx,
1406            } => {
1407                let result = retry(|| self.list_services(&namespace), "ListServices").await;
1408                let _ = result_tx.send(result);
1409            }
1410            FetchServiceMetrics {
1411                name,
1412                info,
1413                result_tx,
1414            } => {
1415                let result = self.fetch_service_metrics(&name, &info).await;
1416                let _ = result_tx.send(result);
1417            }
1418        }
1419    }
1420
1421    async fn fetch_service_metrics(
1422        &self,
1423        name: &str,
1424        info: &ServiceInfo,
1425    ) -> Vec<ServiceProcessMetrics> {
1426        if !self.collect_pod_metrics {
1427            return (0..info.scale)
1428                .map(|_| ServiceProcessMetrics::default())
1429                .collect();
1430        }
1431
1432        /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
1433        ///
1434        /// Note that we want to keep going even if a lookup fails for whatever reason,
1435        /// so this function is infallible. If we fail to get cpu or memory for a particular pod,
1436        /// we just log a warning and install `None` in the returned struct.
1437        async fn get_metrics(
1438            self_: &OrchestratorWorker,
1439            service_name: &str,
1440            i: usize,
1441            disk: bool,
1442            disk_limit: Option<DiskLimit>,
1443        ) -> ServiceProcessMetrics {
1444            let name = format!("{service_name}-{i}");
1445
1446            let disk_usage_fut = async {
1447                if disk {
1448                    Some(
1449                        self_
1450                            .custom_metrics_api
1451                            .get_subresource(
1452                                "kubelet_volume_stats_used_bytes",
1453                                // The CSI provider we use sets up persistentvolumeclaim's
1454                                // with `{pod name}-scratch` as the name. It also provides
1455                                // the metrics, and does so under the `persistentvolumeclaims`
1456                                // resource type, instead of `pods`.
1457                                &format!("{name}-scratch"),
1458                            )
1459                            .await,
1460                    )
1461                } else {
1462                    None
1463                }
1464            };
1465            let disk_capacity_fut = async {
1466                if disk {
1467                    Some(
1468                        self_
1469                            .custom_metrics_api
1470                            .get_subresource(
1471                                "kubelet_volume_stats_capacity_bytes",
1472                                &format!("{name}-scratch"),
1473                            )
1474                            .await,
1475                    )
1476                } else {
1477                    None
1478                }
1479            };
1480            let (metrics, disk_usage, disk_capacity) = match futures::future::join3(
1481                self_.metrics_api.get(&name),
1482                disk_usage_fut,
1483                disk_capacity_fut,
1484            )
1485            .await
1486            {
1487                (Ok(metrics), disk_usage, disk_capacity) => {
1488                    // TODO(guswynn): don't tolerate errors here, when we are more
1489                    // comfortable with `prometheus-adapter` in production
1490                    // (And we run it in ci).
1491
1492                    let disk_usage = match disk_usage {
1493                        Some(Ok(disk_usage)) => Some(disk_usage),
1494                        Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
1495                            warn!(
1496                                "Failed to fetch `kubelet_volume_stats_used_bytes` for {name}: {e}"
1497                            );
1498                            None
1499                        }
1500                        _ => None,
1501                    };
1502
1503                    let disk_capacity = match disk_capacity {
1504                        Some(Ok(disk_capacity)) => Some(disk_capacity),
1505                        Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
1506                            warn!(
1507                                "Failed to fetch `kubelet_volume_stats_capacity_bytes` for {name}: {e}"
1508                            );
1509                            None
1510                        }
1511                        _ => None,
1512                    };
1513
1514                    (metrics, disk_usage, disk_capacity)
1515                }
1516                (Err(e), _, _) => {
1517                    warn!("Failed to get metrics for {name}: {e}");
1518                    return ServiceProcessMetrics::default();
1519                }
1520            };
1521            let Some(PodMetricsContainer {
1522                usage:
1523                    PodMetricsContainerUsage {
1524                        cpu: Quantity(cpu_str),
1525                        memory: Quantity(mem_str),
1526                    },
1527                ..
1528            }) = metrics.containers.get(0)
1529            else {
1530                warn!("metrics result contained no containers for {name}");
1531                return ServiceProcessMetrics::default();
1532            };
1533
1534            let cpu = match parse_k8s_quantity(cpu_str) {
1535                Ok(q) => match q.try_to_integer(-9, true) {
1536                    Some(i) => Some(i),
1537                    None => {
1538                        tracing::error!("CPU value {q:? }out of range");
1539                        None
1540                    }
1541                },
1542                Err(e) => {
1543                    tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1544                    None
1545                }
1546            };
1547            let memory = match parse_k8s_quantity(mem_str) {
1548                Ok(q) => match q.try_to_integer(0, false) {
1549                    Some(i) => Some(i),
1550                    None => {
1551                        tracing::error!("Memory value {q:?} out of range");
1552                        None
1553                    }
1554                },
1555                Err(e) => {
1556                    tracing::error!("Failed to parse memory value {mem_str}: {e}");
1557                    None
1558                }
1559            };
1560
1561            let disk_usage = match (disk_usage, disk_capacity) {
1562                (Some(disk_usage_metrics), Some(disk_capacity_metrics)) => {
1563                    if !disk_usage_metrics.items.is_empty()
1564                        && !disk_capacity_metrics.items.is_empty()
1565                    {
1566                        let disk_usage_str = &*disk_usage_metrics.items[0].value.0;
1567                        let disk_usage = match parse_k8s_quantity(disk_usage_str) {
1568                            Ok(q) => match q.try_to_integer(0, true) {
1569                                Some(i) => Some(i),
1570                                None => {
1571                                    tracing::error!("Disk usage value {q:?} out of range");
1572                                    None
1573                                }
1574                            },
1575                            Err(e) => {
1576                                tracing::error!(
1577                                    "Failed to parse disk usage value {disk_usage_str}: {e}"
1578                                );
1579                                None
1580                            }
1581                        };
1582                        let disk_capacity_str = &*disk_capacity_metrics.items[0].value.0;
1583                        let disk_capacity = match parse_k8s_quantity(disk_capacity_str) {
1584                            Ok(q) => match q.try_to_integer(0, true) {
1585                                Some(i) => Some(i),
1586                                None => {
1587                                    tracing::error!("Disk capacity value {q:?} out of range");
1588                                    None
1589                                }
1590                            },
1591                            Err(e) => {
1592                                tracing::error!(
1593                                    "Failed to parse disk capacity value {disk_capacity_str}: {e}"
1594                                );
1595                                None
1596                            }
1597                        };
1598
1599                        // We only populate a `disk_usage` if we have all 5 of of:
1600                        // - a disk limit (so it must be an actual managed cluster with a real limit)
1601                        // - a reported disk capacity
1602                        // - a reported disk usage
1603                        //
1604                        // There are 3 weird cases we have to handle here:
1605                        // - Some instance types report a large disk capacity than the requested
1606                        // limit. We clamp those capacities to the limit, which means we can
1607                        // report 100% usage even if we have a bit of space left.
1608                        // - Other instances report a smaller disk capacity than the limit, due to
1609                        // overheads. In this case, we correct the usage by adding the overhead, so
1610                        // we report a more accurate usage number.
1611                        // - The disk limit can be more up-to-date (from `service_infos`) than the
1612                        // reported metric. In that case, we report the minimum of the usage
1613                        // and the limit, which means we can report 100% usage temporarily
1614                        // if a replica is sized down.
1615                        match (disk_usage, disk_capacity, disk_limit) {
1616                            (
1617                                Some(disk_usage),
1618                                Some(disk_capacity),
1619                                Some(DiskLimit(disk_limit)),
1620                            ) => {
1621                                let disk_capacity = if disk_limit.0 < disk_capacity {
1622                                    // We issue a debug message instead
1623                                    // of a warning or error because all the
1624                                    // above cases are relatively common.
1625                                    tracing::debug!(
1626                                        "disk capacity {} is larger than the disk limit {}; \
1627                                        disk usage will indicate 100% full before the disk is truly full; \
1628                                        as long as the delta is small this is not a cause for concern",
1629                                        disk_capacity,
1630                                        disk_limit.0
1631                                    );
1632                                    // Clamp to the limit.
1633                                    disk_limit.0
1634                                } else {
1635                                    disk_capacity
1636                                };
1637
1638                                // If we overflow, just clamp to the disk limit
1639                                let disk_usage = (disk_limit.0 - disk_capacity)
1640                                    .checked_add(disk_usage)
1641                                    .unwrap_or(disk_limit.0);
1642
1643                                // Clamp to the limit. Note that this can be clamped during
1644                                // replica resizes of if the disk usage is ABOVE the
1645                                // configured limit, as may occur on some instances.
1646                                Some(std::cmp::min(disk_usage, disk_limit.0))
1647                            }
1648                            _ => None,
1649                        }
1650                    } else {
1651                        None
1652                    }
1653                }
1654                _ => None,
1655            };
1656
1657            ServiceProcessMetrics {
1658                cpu_nano_cores: cpu,
1659                memory_bytes: memory,
1660                disk_usage_bytes: disk_usage,
1661            }
1662        }
1663        let ret = futures::future::join_all(
1664            (0..info.scale).map(|i| get_metrics(self, name, i.into(), info.disk, info.disk_limit)),
1665        );
1666
1667        ret.await
1668    }
1669
1670    async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1671        // We inject our own pod's owner references into the Kubernetes objects
1672        // created for the service so that if the
1673        // Deployment/StatefulSet/whatever that owns the pod running the
1674        // orchestrator gets deleted, so do all services spawned by this
1675        // orchestrator.
1676        desc.service
1677            .metadata
1678            .owner_references
1679            .get_or_insert(vec![])
1680            .extend(self.owner_references.iter().cloned());
1681        desc.stateful_set
1682            .metadata
1683            .owner_references
1684            .get_or_insert(vec![])
1685            .extend(self.owner_references.iter().cloned());
1686
1687        self.service_api
1688            .patch(
1689                &desc.name,
1690                &PatchParams::apply(FIELD_MANAGER).force(),
1691                &Patch::Apply(desc.service),
1692            )
1693            .await?;
1694        self.stateful_set_api
1695            .patch(
1696                &desc.name,
1697                &PatchParams::apply(FIELD_MANAGER).force(),
1698                &Patch::Apply(desc.stateful_set),
1699            )
1700            .await?;
1701
1702        // Explicitly delete any pods in the stateful set that don't match the
1703        // template. In theory, Kubernetes would do this automatically, but
1704        // in practice we have observed that it does not.
1705        // See: https://github.com/kubernetes/kubernetes/issues/67250
1706        for pod_id in 0..desc.scale {
1707            let pod_name = format!("{}-{pod_id}", desc.name);
1708            let pod = match self.pod_api.get(&pod_name).await {
1709                Ok(pod) => pod,
1710                // Pod already doesn't exist.
1711                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1712                Err(e) => return Err(e),
1713            };
1714            if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION) != Some(&desc.pod_template_hash)
1715            {
1716                match self
1717                    .pod_api
1718                    .delete(&pod_name, &DeleteParams::default())
1719                    .await
1720                {
1721                    Ok(_) => (),
1722                    // Pod got deleted while we were looking at it.
1723                    Err(kube::Error::Api(e)) if e.code == 404 => (),
1724                    Err(e) => return Err(e),
1725                }
1726            }
1727        }
1728
1729        Ok(())
1730    }
1731
1732    async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1733        let res = self
1734            .stateful_set_api
1735            .delete(name, &DeleteParams::default())
1736            .await;
1737        match res {
1738            Ok(_) => (),
1739            Err(K8sError::Api(e)) if e.code == 404 => (),
1740            Err(e) => return Err(e),
1741        }
1742
1743        let res = self
1744            .service_api
1745            .delete(name, &DeleteParams::default())
1746            .await;
1747        match res {
1748            Ok(_) => Ok(()),
1749            Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1750            Err(e) => Err(e),
1751        }
1752    }
1753
1754    async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1755        let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1756        let name_prefix = format!("{}{namespace}-", self.name_prefix);
1757        Ok(stateful_sets
1758            .into_iter()
1759            .filter_map(|ss| {
1760                ss.metadata
1761                    .name
1762                    .unwrap()
1763                    .strip_prefix(&name_prefix)
1764                    .map(Into::into)
1765            })
1766            .collect())
1767    }
1768}
1769
1770#[derive(Debug, Clone)]
1771struct KubernetesService {
1772    hosts: Vec<String>,
1773    ports: BTreeMap<String, u16>,
1774}
1775
1776impl Service for KubernetesService {
1777    fn addresses(&self, port: &str) -> Vec<String> {
1778        let port = self.ports[port];
1779        self.hosts
1780            .iter()
1781            .map(|host| format!("{host}:{port}"))
1782            .collect()
1783    }
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788    use super::*;
1789
1790    #[mz_ore::test]
1791    fn k8s_quantity_base10_large() {
1792        let cases = &[
1793            ("42", 42),
1794            ("42k", 42000),
1795            ("42M", 42000000),
1796            ("42G", 42000000000),
1797            ("42T", 42000000000000),
1798            ("42P", 42000000000000000),
1799        ];
1800
1801        for (input, expected) in cases {
1802            let quantity = parse_k8s_quantity(input).unwrap();
1803            let number = quantity.try_to_integer(0, true).unwrap();
1804            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1805        }
1806    }
1807
1808    #[mz_ore::test]
1809    fn k8s_quantity_base10_small() {
1810        let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1811
1812        for (input, expected) in cases {
1813            let quantity = parse_k8s_quantity(input).unwrap();
1814            let number = quantity.try_to_integer(-9, true).unwrap();
1815            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1816        }
1817    }
1818
1819    #[mz_ore::test]
1820    fn k8s_quantity_base2() {
1821        let cases = &[
1822            ("42Ki", 42 << 10),
1823            ("42Mi", 42 << 20),
1824            ("42Gi", 42 << 30),
1825            ("42Ti", 42 << 40),
1826            ("42Pi", 42 << 50),
1827        ];
1828
1829        for (input, expected) in cases {
1830            let quantity = parse_k8s_quantity(input).unwrap();
1831            let number = quantity.try_to_integer(0, false).unwrap();
1832            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1833        }
1834    }
1835}