mz_orchestrator_kubernetes/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow, bail};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::DeepMerge;
24use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
25use k8s_openapi::api::core::v1::{
26    Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
27    NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
28    ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
29    PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
30    PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
31    SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
32    Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
33    WeightedPodAffinityTerm,
34};
35use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
36use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
37    LabelSelector, LabelSelectorRequirement, OwnerReference,
38};
39use kube::ResourceExt;
40use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
41use kube::client::Client;
42use kube::error::Error as K8sError;
43use kube::runtime::{WatchStreamExt, watcher};
44use maplit::btreemap;
45use mz_cloud_resources::AwsExternalIdPrefix;
46use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
47use mz_orchestrator::{
48    DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
49    OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
50    ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
51};
52use mz_ore::retry::Retry;
53use mz_ore::task::AbortOnDropHandle;
54use serde::Deserialize;
55use sha2::{Digest, Sha256};
56use tokio::sync::{mpsc, oneshot};
57use tracing::{info, warn};
58
59pub mod cloud_resource_controller;
60pub mod secrets;
61pub mod util;
62
63const FIELD_MANAGER: &str = "environmentd";
64const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
65
66const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
67
68/// Configures a [`KubernetesOrchestrator`].
69#[derive(Debug, Clone)]
70pub struct KubernetesOrchestratorConfig {
71    /// The name of a Kubernetes context to use, if the Kubernetes configuration
72    /// is loaded from the local kubeconfig.
73    pub context: String,
74    /// The name of a non-default Kubernetes scheduler to use, if any.
75    pub scheduler_name: Option<String>,
76    /// Labels to install on every service created by the orchestrator.
77    pub service_labels: BTreeMap<String, String>,
78    /// Node selector to install on every service created by the orchestrator.
79    pub service_node_selector: BTreeMap<String, String>,
80    /// Affinity to install on every service created by the orchestrator.
81    pub service_affinity: Option<String>,
82    /// Tolerations to install on every service created by the orchestrator.
83    pub service_tolerations: Option<String>,
84    /// The service account that each service should run as, if any.
85    pub service_account: Option<String>,
86    /// The image pull policy to set for services created by the orchestrator.
87    pub image_pull_policy: KubernetesImagePullPolicy,
88    /// An AWS external ID prefix to use when making AWS operations on behalf
89    /// of the environment.
90    pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
91    /// Whether to use code coverage mode or not. Always false for production.
92    pub coverage: bool,
93    /// The Kubernetes StorageClass to use for the ephemeral volume attached to
94    /// services that request disk.
95    ///
96    /// If unspecified, the orchestrator will refuse to create services that
97    /// request disk.
98    pub ephemeral_volume_storage_class: Option<String>,
99    /// The optional fs group for service's pods' `securityContext`.
100    pub service_fs_group: Option<i64>,
101    /// The prefix to prepend to all object names
102    pub name_prefix: Option<String>,
103    /// Whether we should attempt to collect metrics from kubernetes
104    pub collect_pod_metrics: bool,
105    /// Whether to annotate pods for prometheus service discovery.
106    pub enable_prometheus_scrape_annotations: bool,
107}
108
109impl KubernetesOrchestratorConfig {
110    pub fn name_prefix(&self) -> String {
111        self.name_prefix.clone().unwrap_or_default()
112    }
113}
114
115/// Specifies whether Kubernetes should pull Docker images when creating pods.
116#[derive(ValueEnum, Debug, Clone, Copy)]
117pub enum KubernetesImagePullPolicy {
118    /// Always pull the Docker image from the registry.
119    Always,
120    /// Pull the Docker image only if the image is not present.
121    IfNotPresent,
122    /// Never pull the Docker image.
123    Never,
124}
125
126impl fmt::Display for KubernetesImagePullPolicy {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        match self {
129            KubernetesImagePullPolicy::Always => f.write_str("Always"),
130            KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
131            KubernetesImagePullPolicy::Never => f.write_str("Never"),
132        }
133    }
134}
135
136impl KubernetesImagePullPolicy {
137    pub fn as_kebab_case_str(&self) -> &'static str {
138        match self {
139            Self::Always => "always",
140            Self::IfNotPresent => "if-not-present",
141            Self::Never => "never",
142        }
143    }
144}
145
146/// An orchestrator backed by Kubernetes.
147pub struct KubernetesOrchestrator {
148    client: Client,
149    kubernetes_namespace: String,
150    config: KubernetesOrchestratorConfig,
151    secret_api: Api<Secret>,
152    vpc_endpoint_api: Api<VpcEndpoint>,
153    namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
154    resource_reader: Arc<KubernetesResourceReader>,
155}
156
157impl fmt::Debug for KubernetesOrchestrator {
158    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159        f.debug_struct("KubernetesOrchestrator").finish()
160    }
161}
162
163impl KubernetesOrchestrator {
164    /// Creates a new Kubernetes orchestrator from the provided configuration.
165    pub async fn new(
166        config: KubernetesOrchestratorConfig,
167    ) -> Result<KubernetesOrchestrator, anyhow::Error> {
168        let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
169        let resource_reader =
170            Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
171        Ok(KubernetesOrchestrator {
172            client: client.clone(),
173            kubernetes_namespace,
174            config,
175            secret_api: Api::default_namespaced(client.clone()),
176            vpc_endpoint_api: Api::default_namespaced(client),
177            namespaces: Mutex::new(BTreeMap::new()),
178            resource_reader,
179        })
180    }
181}
182
183impl Orchestrator for KubernetesOrchestrator {
184    fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
185        let mut namespaces = self.namespaces.lock().expect("lock poisoned");
186        Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
187            let (command_tx, command_rx) = mpsc::unbounded_channel();
188            let worker = OrchestratorWorker {
189                metrics_api: Api::default_namespaced(self.client.clone()),
190                service_api: Api::default_namespaced(self.client.clone()),
191                stateful_set_api: Api::default_namespaced(self.client.clone()),
192                pod_api: Api::default_namespaced(self.client.clone()),
193                owner_references: vec![],
194                command_rx,
195                name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
196                collect_pod_metrics: self.config.collect_pod_metrics,
197            }
198            .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
199
200            Arc::new(NamespacedKubernetesOrchestrator {
201                pod_api: Api::default_namespaced(self.client.clone()),
202                kubernetes_namespace: self.kubernetes_namespace.clone(),
203                namespace: namespace.into(),
204                config: self.config.clone(),
205                // TODO(guswynn): make this configurable.
206                scheduling_config: Default::default(),
207                service_infos: std::sync::Mutex::new(BTreeMap::new()),
208                command_tx,
209                _worker: worker,
210            })
211        }))
212    }
213}
214
215#[derive(Clone, Copy)]
216struct ServiceInfo {
217    scale: u16,
218    disk: bool,
219    disk_limit: Option<DiskLimit>,
220}
221
222struct NamespacedKubernetesOrchestrator {
223    pod_api: Api<Pod>,
224    kubernetes_namespace: String,
225    namespace: String,
226    config: KubernetesOrchestratorConfig,
227    scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
228    service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
229    command_tx: mpsc::UnboundedSender<WorkerCommand>,
230    _worker: AbortOnDropHandle<()>,
231}
232
233impl fmt::Debug for NamespacedKubernetesOrchestrator {
234    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235        f.debug_struct("NamespacedKubernetesOrchestrator")
236            .field("kubernetes_namespace", &self.kubernetes_namespace)
237            .field("namespace", &self.namespace)
238            .field("config", &self.config)
239            .finish()
240    }
241}
242
243/// Commands sent from a [`NamespacedKubernetesOrchestrator`] to its
244/// [`OrchestratorWorker`].
245///
246/// Commands for which the caller expects a result include a `result_tx` on which the
247/// [`OrchestratorWorker`] will deliver the result.
248enum WorkerCommand {
249    EnsureService {
250        desc: ServiceDescription,
251    },
252    DropService {
253        name: String,
254    },
255    ListServices {
256        namespace: String,
257        result_tx: oneshot::Sender<Vec<String>>,
258    },
259    FetchServiceMetrics {
260        name: String,
261        info: ServiceInfo,
262        result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
263    },
264}
265
266/// A description of a service to be created by an [`OrchestratorWorker`].
267#[derive(Debug, Clone)]
268struct ServiceDescription {
269    name: String,
270    scale: u16,
271    service: K8sService,
272    stateful_set: StatefulSet,
273    pod_template_hash: String,
274}
275
276/// A task executing blocking work for a [`NamespacedKubernetesOrchestrator`] in the background.
277///
278/// This type exists to enable making [`NamespacedKubernetesOrchestrator::ensure_service`] and
279/// [`NamespacedKubernetesOrchestrator::drop_service`] non-blocking, allowing invocation of these
280/// methods in latency-sensitive contexts.
281///
282/// Note that, apart from `ensure_service` and `drop_service`, this worker also handles blocking
283/// orchestrator calls that query service state (such as `list_services`). These need to be
284/// sequenced through the worker loop to ensure they linearize as expected. For example, we want to
285/// ensure that a `list_services` result contains exactly those services that were previously
286/// created with `ensure_service` and not yet dropped with `drop_service`.
287struct OrchestratorWorker {
288    metrics_api: Api<PodMetrics>,
289    service_api: Api<K8sService>,
290    stateful_set_api: Api<StatefulSet>,
291    pod_api: Api<Pod>,
292    owner_references: Vec<OwnerReference>,
293    command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
294    name_prefix: String,
295    collect_pod_metrics: bool,
296}
297
298#[derive(Deserialize, Clone, Debug)]
299pub struct PodMetricsContainer {
300    pub name: String,
301    pub usage: PodMetricsContainerUsage,
302}
303
304#[derive(Deserialize, Clone, Debug)]
305pub struct PodMetricsContainerUsage {
306    pub cpu: Quantity,
307    pub memory: Quantity,
308}
309
310#[derive(Deserialize, Clone, Debug)]
311pub struct PodMetrics {
312    pub metadata: ObjectMeta,
313    pub timestamp: String,
314    pub window: String,
315    pub containers: Vec<PodMetricsContainer>,
316}
317
318impl k8s_openapi::Resource for PodMetrics {
319    const GROUP: &'static str = "metrics.k8s.io";
320    const KIND: &'static str = "PodMetrics";
321    const VERSION: &'static str = "v1beta1";
322    const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
323    const URL_PATH_SEGMENT: &'static str = "pods";
324
325    type Scope = k8s_openapi::NamespaceResourceScope;
326}
327
328impl k8s_openapi::Metadata for PodMetrics {
329    type Ty = ObjectMeta;
330
331    fn metadata(&self) -> &Self::Ty {
332        &self.metadata
333    }
334
335    fn metadata_mut(&mut self) -> &mut Self::Ty {
336        &mut self.metadata
337    }
338}
339
340// Note that these types are very weird. We are `get`-ing a
341// `List` object, and lying about it having an `ObjectMeta`
342// (it deserializes as empty, but we don't need it). The custom
343// metrics API is designed this way, which is very non-standard.
344// A discussion in the `kube` channel in the `tokio` discord
345// confirmed that this layout + using `get_subresource` is the
346// best way to handle this.
347
348#[derive(Deserialize, Clone, Debug)]
349pub struct MetricIdentifier {
350    #[serde(rename = "metricName")]
351    pub name: String,
352    // We skip `selector` for now, as we don't use it
353}
354
355#[derive(Deserialize, Clone, Debug)]
356pub struct MetricValue {
357    #[serde(rename = "describedObject")]
358    pub described_object: ObjectReference,
359    #[serde(flatten)]
360    pub metric_identifier: MetricIdentifier,
361    pub timestamp: String,
362    pub value: Quantity,
363    // We skip `windowSeconds`, as we don't need it
364}
365
366impl NamespacedKubernetesOrchestrator {
367    fn service_name(&self, id: &str) -> String {
368        format!(
369            "{}{}-{id}",
370            self.config.name_prefix.as_deref().unwrap_or(""),
371            self.namespace
372        )
373    }
374
375    /// Return a `watcher::Config` instance that limits results to the namespace
376    /// assigned to this orchestrator.
377    fn watch_pod_params(&self) -> watcher::Config {
378        let ns_selector = format!(
379            "environmentd.materialize.cloud/namespace={}",
380            self.namespace
381        );
382        watcher::Config::default().labels(&ns_selector)
383    }
384
385    /// Convert a higher-level label key to the actual one we
386    /// will give to Kubernetes
387    fn make_label_key(&self, key: &str) -> String {
388        format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
389    }
390
391    fn label_selector_to_k8s(
392        &self,
393        MzLabelSelector { label_name, logic }: MzLabelSelector,
394    ) -> Result<LabelSelectorRequirement, anyhow::Error> {
395        let (operator, values) = match logic {
396            LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
397            LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
398            LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
399            LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
400            LabelSelectionLogic::InSet { values } => {
401                if values.is_empty() {
402                    Err(anyhow!(
403                        "Invalid selector logic for {label_name}: empty `in` set"
404                    ))
405                } else {
406                    Ok(("In", values))
407                }
408            }
409            LabelSelectionLogic::NotInSet { values } => {
410                if values.is_empty() {
411                    Err(anyhow!(
412                        "Invalid selector logic for {label_name}: empty `notin` set"
413                    ))
414                } else {
415                    Ok(("NotIn", values))
416                }
417            }
418        }?;
419        let lsr = LabelSelectorRequirement {
420            key: self.make_label_key(&label_name),
421            operator: operator.to_string(),
422            values: Some(values),
423        };
424        Ok(lsr)
425    }
426
427    fn send_command(&self, cmd: WorkerCommand) {
428        self.command_tx.send(cmd).expect("worker task not dropped");
429    }
430}
431
432#[derive(Debug)]
433struct ScaledQuantity {
434    integral_part: u64,
435    exponent: i8,
436    base10: bool,
437}
438
439impl ScaledQuantity {
440    pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
441        if base10 != self.base10 {
442            return None;
443        }
444        let exponent = self.exponent - scale;
445        let mut result = self.integral_part;
446        let base = if self.base10 { 10 } else { 2 };
447        if exponent < 0 {
448            for _ in exponent..0 {
449                result /= base;
450            }
451        } else {
452            for _ in 0..exponent {
453                result = result.checked_mul(base)?;
454            }
455        }
456        Some(result)
457    }
458}
459
460// Parse a k8s `Quantity` object
461// into a numeric value.
462//
463// This is intended to support collecting CPU and Memory data.
464// Thus, there are a few that things Kubernetes attempts to do, that we don't,
465// because I've never observed metrics-server specifically sending them:
466// (1) Handle negative numbers (because it's not useful for that use-case)
467// (2) Handle non-integers (because I have never observed them being actually sent)
468// (3) Handle scientific notation (e.g. 1.23e2)
469fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
470    const DEC_SUFFIXES: &[(&str, i8)] = &[
471        ("n", -9),
472        ("u", -6),
473        ("m", -3),
474        ("", 0),
475        ("k", 3), // yep, intentionally lowercase.
476        ("M", 6),
477        ("G", 9),
478        ("T", 12),
479        ("P", 15),
480        ("E", 18),
481    ];
482    const BIN_SUFFIXES: &[(&str, i8)] = &[
483        ("", 0),
484        ("Ki", 10),
485        ("Mi", 20),
486        ("Gi", 30),
487        ("Ti", 40),
488        ("Pi", 50),
489        ("Ei", 60),
490    ];
491
492    let (positive, s) = match s.chars().next() {
493        Some('+') => (true, &s[1..]),
494        Some('-') => (false, &s[1..]),
495        _ => (true, s),
496    };
497
498    if !positive {
499        anyhow::bail!("Negative numbers not supported")
500    }
501
502    fn is_suffix_char(ch: char) -> bool {
503        "numkMGTPEKi".contains(ch)
504    }
505    let (num, suffix) = match s.find(is_suffix_char) {
506        None => (s, ""),
507        Some(idx) => s.split_at(idx),
508    };
509    let num: u64 = num.parse()?;
510    let (exponent, base10) = if let Some((_, exponent)) =
511        DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
512    {
513        (exponent, true)
514    } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
515        (exponent, false)
516    } else {
517        anyhow::bail!("Unrecognized suffix: {suffix}");
518    };
519    Ok(ScaledQuantity {
520        integral_part: num,
521        exponent: *exponent,
522        base10,
523    })
524}
525
526#[async_trait]
527impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
528    async fn fetch_service_metrics(
529        &self,
530        id: &str,
531    ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
532        let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
533            *info
534        } else {
535            // This should have been set in `ensure_service`.
536            tracing::error!("Failed to get info for {id}");
537            anyhow::bail!("Failed to get info for {id}");
538        };
539
540        let (result_tx, result_rx) = oneshot::channel();
541        self.send_command(WorkerCommand::FetchServiceMetrics {
542            name: self.service_name(id),
543            info,
544            result_tx,
545        });
546
547        let metrics = result_rx.await.expect("worker task not dropped");
548        Ok(metrics)
549    }
550
551    fn ensure_service(
552        &self,
553        id: &str,
554        ServiceConfig {
555            image,
556            init_container_image,
557            args,
558            ports: ports_in,
559            memory_limit,
560            memory_request,
561            cpu_limit,
562            scale,
563            labels: labels_in,
564            availability_zones,
565            other_replicas_selector,
566            replicas_selector,
567            disk: disk_in,
568            disk_limit,
569            node_selector,
570        }: ServiceConfig,
571    ) -> Result<Box<dyn Service>, anyhow::Error> {
572        // This is extremely cheap to clone, so just look into the lock once.
573        let scheduling_config: ServiceSchedulingConfig =
574            self.scheduling_config.read().expect("poisoned").clone();
575
576        // Determining whether to enable disk is subtle because we need to
577        // support historical sizes in the managed service and custom sizes in
578        // self hosted deployments.
579        let disk = {
580            // Whether the user specified `DISK = TRUE` when creating the
581            // replica OR whether the feature flag to force disk is enabled.
582            let user_requested_disk = disk_in || scheduling_config.always_use_disk;
583            // Whether the cluster replica size map provided by the
584            // administrator explicitly indicates that the size does not support
585            // disk.
586            let size_disables_disk = disk_limit == Some(DiskLimit::ZERO);
587            // Enable disk if the user requested it and the size does not
588            // disable it.
589            //
590            // Arguably we should not allow the user to request disk with sizes
591            // that have a zero disk limit, but configuring disk on a replica by
592            // replica basis is a legacy option that we hope to remove someday.
593            user_requested_disk && !size_disables_disk
594        };
595
596        let name = self.service_name(id);
597        // The match labels should be the minimal set of labels that uniquely
598        // identify the pods in the stateful set. Changing these after the
599        // `StatefulSet` is created is not permitted by Kubernetes, and we're
600        // not yet smart enough to handle deleting and recreating the
601        // `StatefulSet`.
602        let match_labels = btreemap! {
603            "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
604            "environmentd.materialize.cloud/service-id".into() => id.into(),
605        };
606        let mut labels = match_labels.clone();
607        for (key, value) in labels_in {
608            labels.insert(self.make_label_key(&key), value);
609        }
610
611        labels.insert(self.make_label_key("scale"), scale.to_string());
612
613        for port in &ports_in {
614            labels.insert(
615                format!("environmentd.materialize.cloud/port-{}", port.name),
616                "true".into(),
617            );
618        }
619        for (key, value) in &self.config.service_labels {
620            labels.insert(key.clone(), value.clone());
621        }
622        let mut limits = BTreeMap::new();
623        let mut requests = BTreeMap::new();
624        if let Some(memory_limit) = memory_limit {
625            limits.insert(
626                "memory".into(),
627                Quantity(memory_limit.0.as_u64().to_string()),
628            );
629            requests.insert(
630                "memory".into(),
631                Quantity(memory_limit.0.as_u64().to_string()),
632            );
633        }
634        if let Some(memory_request) = memory_request {
635            requests.insert(
636                "memory".into(),
637                Quantity(memory_request.0.as_u64().to_string()),
638            );
639        }
640        if let Some(cpu_limit) = cpu_limit {
641            limits.insert(
642                "cpu".into(),
643                Quantity(format!("{}m", cpu_limit.as_millicpus())),
644            );
645            requests.insert(
646                "cpu".into(),
647                Quantity(format!("{}m", cpu_limit.as_millicpus())),
648            );
649        }
650        let service = K8sService {
651            metadata: ObjectMeta {
652                name: Some(name.clone()),
653                ..Default::default()
654            },
655            spec: Some(ServiceSpec {
656                ports: Some(
657                    ports_in
658                        .iter()
659                        .map(|port| ServicePort {
660                            port: port.port_hint.into(),
661                            name: Some(port.name.clone()),
662                            ..Default::default()
663                        })
664                        .collect(),
665                ),
666                cluster_ip: Some("None".to_string()),
667                selector: Some(match_labels.clone()),
668                ..Default::default()
669            }),
670            status: None,
671        };
672
673        let hosts = (0..scale)
674            .map(|i| {
675                format!(
676                    "{name}-{i}.{name}.{}.svc.cluster.local",
677                    self.kubernetes_namespace
678                )
679            })
680            .collect::<Vec<_>>();
681        let ports = ports_in
682            .iter()
683            .map(|p| (p.name.clone(), p.port_hint))
684            .collect::<BTreeMap<_, _>>();
685
686        let mut listen_addrs = BTreeMap::new();
687        let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
688        for (name, port) in &ports {
689            listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
690            for (i, host) in hosts.iter().enumerate() {
691                peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
692            }
693        }
694        let mut args = args(ServiceAssignments {
695            listen_addrs: &listen_addrs,
696            peer_addrs: &peer_addrs,
697        });
698
699        // This constrains the orchestrator (for those orchestrators that support
700        // anti-affinity, today just k8s) to never schedule pods for different replicas
701        // of the same cluster on the same node. Pods from the _same_ replica are fine;
702        // pods from different clusters are also fine.
703        //
704        // The point is that if pods of two replicas are on the same node, that node
705        // going down would kill both replicas, and so the replication factor of the
706        // cluster in question is illusory.
707        let anti_affinity = Some({
708            let label_selector_requirements = other_replicas_selector
709                .clone()
710                .into_iter()
711                .map(|ls| self.label_selector_to_k8s(ls))
712                .collect::<Result<Vec<_>, _>>()?;
713            let ls = LabelSelector {
714                match_expressions: Some(label_selector_requirements),
715                ..Default::default()
716            };
717            let pat = PodAffinityTerm {
718                label_selector: Some(ls),
719                topology_key: "kubernetes.io/hostname".to_string(),
720                ..Default::default()
721            };
722
723            if !scheduling_config.soften_replication_anti_affinity {
724                PodAntiAffinity {
725                    required_during_scheduling_ignored_during_execution: Some(vec![pat]),
726                    ..Default::default()
727                }
728            } else {
729                PodAntiAffinity {
730                    preferred_during_scheduling_ignored_during_execution: Some(vec![
731                        WeightedPodAffinityTerm {
732                            weight: scheduling_config.soften_replication_anti_affinity_weight,
733                            pod_affinity_term: pat,
734                        },
735                    ]),
736                    ..Default::default()
737                }
738            }
739        });
740
741        let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
742            // `match_labels` sufficiently selects pods in the same replica.
743            let ls = LabelSelector {
744                match_labels: Some(match_labels.clone()),
745                ..Default::default()
746            };
747            let pat = PodAffinityTerm {
748                label_selector: Some(ls),
749                topology_key: "topology.kubernetes.io/zone".to_string(),
750                ..Default::default()
751            };
752
753            Some(PodAffinity {
754                preferred_during_scheduling_ignored_during_execution: Some(vec![
755                    WeightedPodAffinityTerm {
756                        weight,
757                        pod_affinity_term: pat,
758                    },
759                ]),
760                ..Default::default()
761            })
762        } else {
763            None
764        };
765
766        let topology_spread = if scheduling_config.topology_spread.enabled {
767            let config = &scheduling_config.topology_spread;
768
769            if !config.ignore_non_singular_scale || scale <= 1 {
770                let label_selector_requirements = (if config.ignore_non_singular_scale {
771                    let mut replicas_selector_ignoring_scale = replicas_selector.clone();
772
773                    replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
774                        label_name: "scale".into(),
775                        logic: mz_orchestrator::LabelSelectionLogic::Eq {
776                            value: "1".to_string(),
777                        },
778                    });
779
780                    replicas_selector_ignoring_scale
781                } else {
782                    replicas_selector
783                })
784                .into_iter()
785                .map(|ls| self.label_selector_to_k8s(ls))
786                .collect::<Result<Vec<_>, _>>()?;
787                let ls = LabelSelector {
788                    match_expressions: Some(label_selector_requirements),
789                    ..Default::default()
790                };
791
792                let constraint = TopologySpreadConstraint {
793                    label_selector: Some(ls),
794                    min_domains: config.min_domains,
795                    max_skew: config.max_skew,
796                    topology_key: "topology.kubernetes.io/zone".to_string(),
797                    when_unsatisfiable: if config.soft {
798                        "ScheduleAnyway".to_string()
799                    } else {
800                        "DoNotSchedule".to_string()
801                    },
802                    // TODO(guswynn): restore these once they are supported.
803                    // Consider node affinities when calculating topology spread. This is the
804                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_affinity_policy>,
805                    // made explicit.
806                    // node_affinity_policy: Some("Honor".to_string()),
807                    // Do not consider node taints when calculating topology spread. This is the
808                    // default: <https://docs.rs/k8s-openapi/latest/k8s_openapi/api/core/v1/struct.TopologySpreadConstraint.html#structfield.node_taints_policy>,
809                    // made explicit.
810                    // node_taints_policy: Some("Ignore".to_string()),
811                    match_label_keys: None,
812                    // Once the above are restorted, we should't have `..Default::default()` here because the specifics of these fields are
813                    // subtle enough where we want compilation failures when we upgrade
814                    ..Default::default()
815                };
816                Some(vec![constraint])
817            } else {
818                None
819            }
820        } else {
821            None
822        };
823
824        let mut pod_annotations = btreemap! {
825            // Prevent the cluster-autoscaler (or karpenter) from evicting these pods in attempts to scale down
826            // and terminate nodes.
827            // This will cost us more money, but should give us better uptime.
828            // This does not prevent all evictions by Kubernetes, only the ones initiated by the
829            // cluster-autoscaler (or karpenter). Notably, eviction of pods for resource overuse is still enabled.
830            "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
831            "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
832
833            // It's called do-not-disrupt in newer versions of karpenter, so adding for forward/backward compatibility
834            "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
835        };
836        if self.config.enable_prometheus_scrape_annotations {
837            if let Some(internal_http_port) = ports_in
838                .iter()
839                .find(|port| port.name == "internal-http")
840                .map(|port| port.port_hint.to_string())
841            {
842                // Enable prometheus scrape discovery
843                pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
844                pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
845                pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
846                pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
847            }
848        }
849
850        let default_node_selector = if disk {
851            vec![("materialize.cloud/disk".to_string(), disk.to_string())]
852        } else {
853            // if the cluster doesn't require disk, we can omit the selector
854            // allowing it to be scheduled onto nodes with and without the
855            // selector
856            vec![]
857        };
858
859        let node_selector: BTreeMap<String, String> = default_node_selector
860            .into_iter()
861            .chain(self.config.service_node_selector.clone())
862            .chain(node_selector)
863            .collect();
864
865        let node_affinity = if let Some(availability_zones) = availability_zones {
866            let selector = NodeSelectorTerm {
867                match_expressions: Some(vec![NodeSelectorRequirement {
868                    key: "materialize.cloud/availability-zone".to_string(),
869                    operator: "In".to_string(),
870                    values: Some(availability_zones),
871                }]),
872                match_fields: None,
873            };
874
875            if scheduling_config.soften_az_affinity {
876                Some(NodeAffinity {
877                    preferred_during_scheduling_ignored_during_execution: Some(vec![
878                        PreferredSchedulingTerm {
879                            preference: selector,
880                            weight: scheduling_config.soften_az_affinity_weight,
881                        },
882                    ]),
883                    required_during_scheduling_ignored_during_execution: None,
884                })
885            } else {
886                Some(NodeAffinity {
887                    preferred_during_scheduling_ignored_during_execution: None,
888                    required_during_scheduling_ignored_during_execution: Some(NodeSelector {
889                        node_selector_terms: vec![selector],
890                    }),
891                })
892            }
893        } else {
894            None
895        };
896
897        let mut affinity = Affinity {
898            pod_anti_affinity: anti_affinity,
899            pod_affinity,
900            node_affinity,
901            ..Default::default()
902        };
903        if let Some(service_affinity) = &self.config.service_affinity {
904            affinity.merge_from(serde_json::from_str(service_affinity)?);
905        }
906
907        let container_name = image
908            .rsplit_once('/')
909            .and_then(|(_, name_version)| name_version.rsplit_once(':'))
910            .context("`image` is not ORG/NAME:VERSION")?
911            .0
912            .to_string();
913
914        let container_security_context = if scheduling_config.security_context_enabled {
915            Some(SecurityContext {
916                privileged: Some(false),
917                run_as_non_root: Some(true),
918                allow_privilege_escalation: Some(false),
919                seccomp_profile: Some(SeccompProfile {
920                    type_: "RuntimeDefault".to_string(),
921                    ..Default::default()
922                }),
923                capabilities: Some(Capabilities {
924                    drop: Some(vec!["ALL".to_string()]),
925                    ..Default::default()
926                }),
927                ..Default::default()
928            })
929        } else {
930            None
931        };
932
933        let init_containers = init_container_image.map(|image| {
934            vec![Container {
935                name: "init".to_string(),
936                image: Some(image),
937                image_pull_policy: Some(self.config.image_pull_policy.to_string()),
938                resources: Some(ResourceRequirements {
939                    claims: None,
940                    limits: Some(limits.clone()),
941                    requests: Some(requests.clone()),
942                }),
943                security_context: container_security_context.clone(),
944                env: Some(vec![
945                    EnvVar {
946                        name: "MZ_NAMESPACE".to_string(),
947                        value_from: Some(EnvVarSource {
948                            field_ref: Some(ObjectFieldSelector {
949                                field_path: "metadata.namespace".to_string(),
950                                ..Default::default()
951                            }),
952                            ..Default::default()
953                        }),
954                        ..Default::default()
955                    },
956                    EnvVar {
957                        name: "MZ_POD_NAME".to_string(),
958                        value_from: Some(EnvVarSource {
959                            field_ref: Some(ObjectFieldSelector {
960                                field_path: "metadata.name".to_string(),
961                                ..Default::default()
962                            }),
963                            ..Default::default()
964                        }),
965                        ..Default::default()
966                    },
967                    EnvVar {
968                        name: "MZ_NODE_NAME".to_string(),
969                        value_from: Some(EnvVarSource {
970                            field_ref: Some(ObjectFieldSelector {
971                                field_path: "spec.nodeName".to_string(),
972                                ..Default::default()
973                            }),
974                            ..Default::default()
975                        }),
976                        ..Default::default()
977                    },
978                ]),
979                ..Default::default()
980            }]
981        });
982
983        let env = if self.config.coverage {
984            Some(vec![EnvVar {
985                name: "LLVM_PROFILE_FILE".to_string(),
986                value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
987                ..Default::default()
988            }])
989        } else {
990            None
991        };
992
993        let mut volume_mounts = vec![];
994
995        if self.config.coverage {
996            volume_mounts.push(VolumeMount {
997                name: "coverage".to_string(),
998                mount_path: "/coverage".to_string(),
999                ..Default::default()
1000            })
1001        }
1002
1003        let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1004            (true, Some(ephemeral_volume_storage_class)) => {
1005                volume_mounts.push(VolumeMount {
1006                    name: "scratch".to_string(),
1007                    mount_path: "/scratch".to_string(),
1008                    ..Default::default()
1009                });
1010                args.push("--scratch-directory=/scratch".into());
1011
1012                Some(vec![Volume {
1013                    name: "scratch".to_string(),
1014                    ephemeral: Some(EphemeralVolumeSource {
1015                        volume_claim_template: Some(PersistentVolumeClaimTemplate {
1016                            spec: PersistentVolumeClaimSpec {
1017                                access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1018                                storage_class_name: Some(
1019                                    ephemeral_volume_storage_class.to_string(),
1020                                ),
1021                                resources: Some(VolumeResourceRequirements {
1022                                    requests: Some(BTreeMap::from([(
1023                                        "storage".to_string(),
1024                                        Quantity(
1025                                            disk_limit
1026                                                .unwrap_or(DiskLimit::ARBITRARY)
1027                                                .0
1028                                                .as_u64()
1029                                                .to_string(),
1030                                        ),
1031                                    )])),
1032                                    ..Default::default()
1033                                }),
1034                                ..Default::default()
1035                            },
1036                            ..Default::default()
1037                        }),
1038                        ..Default::default()
1039                    }),
1040                    ..Default::default()
1041                }])
1042            }
1043            (true, None) => {
1044                return Err(anyhow!(
1045                    "service requested disk but no ephemeral volume storage class was configured"
1046                ));
1047            }
1048            (false, _) => None,
1049        };
1050
1051        if let Some(name_prefix) = &self.config.name_prefix {
1052            args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1053        }
1054
1055        let volume_claim_templates = if self.config.coverage {
1056            Some(vec![PersistentVolumeClaim {
1057                metadata: ObjectMeta {
1058                    name: Some("coverage".to_string()),
1059                    ..Default::default()
1060                },
1061                spec: Some(PersistentVolumeClaimSpec {
1062                    access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1063                    resources: Some(VolumeResourceRequirements {
1064                        requests: Some(BTreeMap::from([(
1065                            "storage".to_string(),
1066                            Quantity("10Gi".to_string()),
1067                        )])),
1068                        ..Default::default()
1069                    }),
1070                    ..Default::default()
1071                }),
1072                ..Default::default()
1073            }])
1074        } else {
1075            None
1076        };
1077
1078        let security_context = if let Some(fs_group) = self.config.service_fs_group {
1079            Some(PodSecurityContext {
1080                fs_group: Some(fs_group),
1081                run_as_user: Some(fs_group),
1082                run_as_group: Some(fs_group),
1083                ..Default::default()
1084            })
1085        } else {
1086            None
1087        };
1088
1089        let mut tolerations = vec![
1090            // When the node becomes `NotReady` it indicates there is a problem
1091            // with the node. By default Kubernetes waits 300s (5 minutes)
1092            // before descheduling the pod, but we tune this to 30s for faster
1093            // recovery in the case of node failure.
1094            Toleration {
1095                effect: Some("NoExecute".into()),
1096                key: Some("node.kubernetes.io/not-ready".into()),
1097                operator: Some("Exists".into()),
1098                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1099                value: None,
1100            },
1101            Toleration {
1102                effect: Some("NoExecute".into()),
1103                key: Some("node.kubernetes.io/unreachable".into()),
1104                operator: Some("Exists".into()),
1105                toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1106                value: None,
1107            },
1108        ];
1109        if let Some(service_tolerations) = &self.config.service_tolerations {
1110            tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1111        }
1112        let tolerations = Some(tolerations);
1113
1114        let mut pod_template_spec = PodTemplateSpec {
1115            metadata: Some(ObjectMeta {
1116                labels: Some(labels.clone()),
1117                annotations: Some(pod_annotations), // Do not delete, we insert into it below.
1118                ..Default::default()
1119            }),
1120            spec: Some(PodSpec {
1121                init_containers,
1122                containers: vec![Container {
1123                    name: container_name,
1124                    image: Some(image),
1125                    args: Some(args),
1126                    image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1127                    ports: Some(
1128                        ports_in
1129                            .iter()
1130                            .map(|port| ContainerPort {
1131                                container_port: port.port_hint.into(),
1132                                name: Some(port.name.clone()),
1133                                ..Default::default()
1134                            })
1135                            .collect(),
1136                    ),
1137                    security_context: container_security_context.clone(),
1138                    resources: Some(ResourceRequirements {
1139                        claims: None,
1140                        limits: Some(limits),
1141                        requests: Some(requests),
1142                    }),
1143                    volume_mounts: if !volume_mounts.is_empty() {
1144                        Some(volume_mounts)
1145                    } else {
1146                        None
1147                    },
1148                    env,
1149                    ..Default::default()
1150                }],
1151                volumes,
1152                security_context,
1153                node_selector: Some(node_selector),
1154                scheduler_name: self.config.scheduler_name.clone(),
1155                service_account: self.config.service_account.clone(),
1156                affinity: Some(affinity),
1157                topology_spread_constraints: topology_spread,
1158                tolerations,
1159                // Setting a 0s termination grace period has the side effect of
1160                // automatically starting a new pod when the previous pod is
1161                // currently terminating. This enables recovery from a node
1162                // failure with no manual intervention. Without this setting,
1163                // the StatefulSet controller will refuse to start a new pod
1164                // until the failed node is manually removed from the Kubernetes
1165                // cluster.
1166                //
1167                // The Kubernetes documentation strongly advises against this
1168                // setting, as StatefulSets attempt to provide "at most once"
1169                // semantics [0]--that is, the guarantee that for a given pod in
1170                // a StatefulSet there is *at most* one pod with that identity
1171                // running in the cluster.
1172                //
1173                // Materialize services, however, are carefully designed to
1174                // *not* rely on this guarantee. In fact, we do not believe that
1175                // correct distributed systems can meaningfully rely on
1176                // Kubernetes's guarantee--network packets from a pod can be
1177                // arbitrarily delayed, long past that pod's termination.
1178                //
1179                // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1180                termination_grace_period_seconds: Some(0),
1181                ..Default::default()
1182            }),
1183        };
1184        let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1185        let mut hasher = Sha256::new();
1186        hasher.update(pod_template_json);
1187        let pod_template_hash = format!("{:x}", hasher.finalize());
1188        pod_template_spec
1189            .metadata
1190            .as_mut()
1191            .unwrap()
1192            .annotations
1193            .as_mut()
1194            .unwrap()
1195            .insert(
1196                POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1197                pod_template_hash.clone(),
1198            );
1199
1200        let stateful_set = StatefulSet {
1201            metadata: ObjectMeta {
1202                name: Some(name.clone()),
1203                ..Default::default()
1204            },
1205            spec: Some(StatefulSetSpec {
1206                selector: LabelSelector {
1207                    match_labels: Some(match_labels),
1208                    ..Default::default()
1209                },
1210                service_name: name.clone(),
1211                replicas: Some(scale.into()),
1212                template: pod_template_spec,
1213                pod_management_policy: Some("Parallel".to_string()),
1214                volume_claim_templates,
1215                ..Default::default()
1216            }),
1217            status: None,
1218        };
1219
1220        self.send_command(WorkerCommand::EnsureService {
1221            desc: ServiceDescription {
1222                name,
1223                scale,
1224                service,
1225                stateful_set,
1226                pod_template_hash,
1227            },
1228        });
1229
1230        self.service_infos.lock().expect("poisoned lock").insert(
1231            id.to_string(),
1232            ServiceInfo {
1233                scale,
1234                disk,
1235                disk_limit,
1236            },
1237        );
1238
1239        Ok(Box::new(KubernetesService { hosts, ports }))
1240    }
1241
1242    /// Drops the identified service, if it exists.
1243    fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1244        fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1245        self.service_infos.lock().expect("poisoned lock").remove(id);
1246
1247        self.send_command(WorkerCommand::DropService {
1248            name: self.service_name(id),
1249        });
1250
1251        Ok(())
1252    }
1253
1254    /// Lists the identifiers of all known services.
1255    async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1256        let (result_tx, result_rx) = oneshot::channel();
1257        self.send_command(WorkerCommand::ListServices {
1258            namespace: self.namespace.clone(),
1259            result_tx,
1260        });
1261
1262        let list = result_rx.await.expect("worker task not dropped");
1263        Ok(list)
1264    }
1265
1266    fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1267        fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1268            let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1269            let service_id_label = "environmentd.materialize.cloud/service-id";
1270            let service_id = pod
1271                .labels()
1272                .get(service_id_label)
1273                .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1274                .clone();
1275
1276            let oomed = pod
1277                .status
1278                .as_ref()
1279                .and_then(|status| status.container_statuses.as_ref())
1280                .map(|container_statuses| {
1281                    container_statuses.iter().any(|cs| {
1282                        // The container might have already transitioned from "terminated" to
1283                        // "waiting"/"running" state, in which case we need to check its previous
1284                        // state to find out why it terminated.
1285                        let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1286                        let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1287                        let termination_state = current_state.or(last_state);
1288
1289                        // The interesting exit codes are:
1290                        //  * 135 (SIGBUS): occurs when lgalloc runs out of disk
1291                        //  * 137 (SIGKILL): occurs when the OOM killer terminates the container
1292                        //  * 167: occurs when the lgalloc limiter terminates the process
1293                        // We treat the all of these as OOM conditions since lgalloc uses disk only
1294                        // for spilling memory.
1295                        let exit_code = termination_state.map(|s| s.exit_code);
1296                        exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1297                    })
1298                })
1299                .unwrap_or(false);
1300
1301            let (pod_ready, last_probe_time) = pod
1302                .status
1303                .and_then(|status| status.conditions)
1304                .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1305                .map(|c| (c.status == "True", c.last_probe_time))
1306                .unwrap_or((false, None));
1307
1308            let status = if pod_ready {
1309                ServiceStatus::Online
1310            } else {
1311                ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1312            };
1313            let time = if let Some(time) = last_probe_time {
1314                time.0
1315            } else {
1316                Utc::now()
1317            };
1318
1319            Ok(ServiceEvent {
1320                service_id,
1321                process_id,
1322                status,
1323                time,
1324            })
1325        }
1326
1327        let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1328            .touched_objects()
1329            .filter_map(|object| async move {
1330                match object {
1331                    Ok(pod) => Some(into_service_event(pod)),
1332                    Err(error) => {
1333                        // We assume that errors returned by Kubernetes are usually transient, so we
1334                        // just log a warning and ignore them otherwise.
1335                        tracing::warn!("service watch error: {error}");
1336                        None
1337                    }
1338                }
1339            });
1340        Box::pin(stream)
1341    }
1342
1343    fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1344        *self.scheduling_config.write().expect("poisoned") = config;
1345    }
1346}
1347
1348impl OrchestratorWorker {
1349    fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1350        mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1351    }
1352
1353    async fn run(mut self) {
1354        {
1355            info!("initializing Kubernetes orchestrator worker");
1356            let start = Instant::now();
1357
1358            // Fetch the owner reference for our own pod (usually a
1359            // StatefulSet), so that we can propagate it to the services we
1360            // create.
1361            let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1362            let orchestrator_pod = Retry::default()
1363                .clamp_backoff(Duration::from_secs(10))
1364                .retry_async(|_| self.pod_api.get(&hostname))
1365                .await
1366                .expect("always retries on error");
1367            self.owner_references
1368                .extend(orchestrator_pod.owner_references().into_iter().cloned());
1369
1370            info!(
1371                "Kubernetes orchestrator worker initialized in {:?}",
1372                start.elapsed()
1373            );
1374        }
1375
1376        while let Some(cmd) = self.command_rx.recv().await {
1377            self.handle_command(cmd).await;
1378        }
1379    }
1380
1381    /// Handle a worker command.
1382    ///
1383    /// If handling the command fails, it is automatically retried. All command handlers return
1384    /// [`K8sError`], so we can reasonably assume that a failure is caused by issues communicating
1385    /// with the K8S server and that retrying resolves them eventually.
1386    async fn handle_command(&self, cmd: WorkerCommand) {
1387        async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1388        where
1389            F: Fn() -> U,
1390            U: Future<Output = Result<R, K8sError>>,
1391        {
1392            Retry::default()
1393                .clamp_backoff(Duration::from_secs(10))
1394                .retry_async(|_| {
1395                    f().map_err(
1396                        |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1397                    )
1398                })
1399                .await
1400                .expect("always retries on error")
1401        }
1402
1403        use WorkerCommand::*;
1404        match cmd {
1405            EnsureService { desc } => {
1406                retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1407            }
1408            DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1409            ListServices {
1410                namespace,
1411                result_tx,
1412            } => {
1413                let result = retry(|| self.list_services(&namespace), "ListServices").await;
1414                let _ = result_tx.send(result);
1415            }
1416            FetchServiceMetrics {
1417                name,
1418                info,
1419                result_tx,
1420            } => {
1421                let result = self.fetch_service_metrics(&name, &info).await;
1422                let _ = result_tx.send(result);
1423            }
1424        }
1425    }
1426
1427    async fn fetch_service_metrics(
1428        &self,
1429        name: &str,
1430        info: &ServiceInfo,
1431    ) -> Vec<ServiceProcessMetrics> {
1432        if !self.collect_pod_metrics {
1433            return (0..info.scale)
1434                .map(|_| ServiceProcessMetrics::default())
1435                .collect();
1436        }
1437
1438        /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
1439        ///
1440        /// Note that we want to keep going even if a lookup fails for whatever reason,
1441        /// so this function is infallible. If we fail to get cpu or memory for a particular pod,
1442        /// we just log a warning and install `None` in the returned struct.
1443        async fn get_metrics(
1444            self_: &OrchestratorWorker,
1445            service_name: &str,
1446            i: usize,
1447            disk: bool,
1448            disk_limit: Option<DiskLimit>,
1449        ) -> ServiceProcessMetrics {
1450            let name = format!("{service_name}-{i}");
1451
1452            let disk_usage_fut = async {
1453                if disk {
1454                    Some(get_disk_usage(self_, service_name, i).await)
1455                } else {
1456                    None
1457                }
1458            };
1459            let (metrics, disk_usage) =
1460                match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await {
1461                    (Ok(metrics), disk_usage) => {
1462                        let disk_usage = match disk_usage {
1463                            Some(Ok(disk_usage)) => Some(disk_usage),
1464                            Some(Err(e)) => {
1465                                warn!("Failed to fetch disk usage for {name}: {e}");
1466                                None
1467                            }
1468                            _ => None,
1469                        };
1470
1471                        (metrics, disk_usage)
1472                    }
1473                    (Err(e), _) => {
1474                        warn!("Failed to get metrics for {name}: {e}");
1475                        return ServiceProcessMetrics::default();
1476                    }
1477                };
1478            let Some(PodMetricsContainer {
1479                usage:
1480                    PodMetricsContainerUsage {
1481                        cpu: Quantity(cpu_str),
1482                        memory: Quantity(mem_str),
1483                    },
1484                ..
1485            }) = metrics.containers.get(0)
1486            else {
1487                warn!("metrics result contained no containers for {name}");
1488                return ServiceProcessMetrics::default();
1489            };
1490
1491            let cpu = match parse_k8s_quantity(cpu_str) {
1492                Ok(q) => match q.try_to_integer(-9, true) {
1493                    Some(i) => Some(i),
1494                    None => {
1495                        tracing::error!("CPU value {q:? }out of range");
1496                        None
1497                    }
1498                },
1499                Err(e) => {
1500                    tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1501                    None
1502                }
1503            };
1504            let memory = match parse_k8s_quantity(mem_str) {
1505                Ok(q) => match q.try_to_integer(0, false) {
1506                    Some(i) => Some(i),
1507                    None => {
1508                        tracing::error!("Memory value {q:?} out of range");
1509                        None
1510                    }
1511                },
1512                Err(e) => {
1513                    tracing::error!("Failed to parse memory value {mem_str}: {e}");
1514                    None
1515                }
1516            };
1517
1518            // We only populate a `disk_usage` if we have both:
1519            // - a disk limit (so it must be an actual managed cluster with a real limit)
1520            // - a reported disk usage
1521            //
1522            // The disk limit can be more up-to-date (from `service_infos`) than the
1523            // reported metric. In that case, we report the minimum of the usage
1524            // and the limit, which means we can report 100% usage temporarily
1525            // if a replica is sized down.
1526            let disk_usage = match (disk_usage, disk_limit) {
1527                (Some(disk_usage), Some(DiskLimit(disk_limit))) => {
1528                    Some(std::cmp::min(disk_usage, disk_limit.0))
1529                }
1530                _ => None,
1531            };
1532
1533            ServiceProcessMetrics {
1534                cpu_nano_cores: cpu,
1535                memory_bytes: memory,
1536                disk_usage_bytes: disk_usage,
1537            }
1538        }
1539
1540        /// Get the current disk usage for a particular service and process.
1541        ///
1542        /// Disk usage is collected by connecting to a metrics endpoint exposed by the process. The
1543        /// endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
1544        /// `/api/usage-metrics`.
1545        async fn get_disk_usage(
1546            self_: &OrchestratorWorker,
1547            service_name: &str,
1548            i: usize,
1549        ) -> anyhow::Result<u64> {
1550            #[derive(Deserialize)]
1551            pub(crate) struct Usage {
1552                disk_bytes: Option<u64>,
1553            }
1554
1555            let service = self_
1556                .service_api
1557                .get(service_name)
1558                .await
1559                .with_context(|| format!("failed to get service {service_name}"))?;
1560            let namespace = service
1561                .metadata
1562                .namespace
1563                .context("missing service namespace")?;
1564            let internal_http_port = service
1565                .spec
1566                .and_then(|spec| spec.ports)
1567                .and_then(|ports| {
1568                    ports
1569                        .into_iter()
1570                        .find(|p| p.name == Some("internal-http".into()))
1571                })
1572                .map(|p| p.port);
1573            let Some(port) = internal_http_port else {
1574                bail!("internal-http port missing in service spec");
1575            };
1576            let metrics_url = format!(
1577                "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1578                 /api/usage-metrics"
1579            );
1580
1581            let http_client = reqwest::Client::builder()
1582                .timeout(Duration::from_secs(10))
1583                .build()
1584                .context("error building HTTP client")?;
1585            let resp = http_client.get(metrics_url).send().await?;
1586            let usage: Usage = resp.json().await?;
1587
1588            usage
1589                .disk_bytes
1590                .ok_or_else(|| anyhow!("process did not provide disk usage"))
1591        }
1592
1593        let ret = futures::future::join_all(
1594            (0..info.scale).map(|i| get_metrics(self, name, i.into(), info.disk, info.disk_limit)),
1595        );
1596
1597        ret.await
1598    }
1599
1600    async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1601        // We inject our own pod's owner references into the Kubernetes objects
1602        // created for the service so that if the
1603        // Deployment/StatefulSet/whatever that owns the pod running the
1604        // orchestrator gets deleted, so do all services spawned by this
1605        // orchestrator.
1606        desc.service
1607            .metadata
1608            .owner_references
1609            .get_or_insert(vec![])
1610            .extend(self.owner_references.iter().cloned());
1611        desc.stateful_set
1612            .metadata
1613            .owner_references
1614            .get_or_insert(vec![])
1615            .extend(self.owner_references.iter().cloned());
1616
1617        self.service_api
1618            .patch(
1619                &desc.name,
1620                &PatchParams::apply(FIELD_MANAGER).force(),
1621                &Patch::Apply(desc.service),
1622            )
1623            .await?;
1624        self.stateful_set_api
1625            .patch(
1626                &desc.name,
1627                &PatchParams::apply(FIELD_MANAGER).force(),
1628                &Patch::Apply(desc.stateful_set),
1629            )
1630            .await?;
1631
1632        // Explicitly delete any pods in the stateful set that don't match the
1633        // template. In theory, Kubernetes would do this automatically, but
1634        // in practice we have observed that it does not.
1635        // See: https://github.com/kubernetes/kubernetes/issues/67250
1636        for pod_id in 0..desc.scale {
1637            let pod_name = format!("{}-{pod_id}", desc.name);
1638            let pod = match self.pod_api.get(&pod_name).await {
1639                Ok(pod) => pod,
1640                // Pod already doesn't exist.
1641                Err(kube::Error::Api(e)) if e.code == 404 => continue,
1642                Err(e) => return Err(e),
1643            };
1644            if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION) != Some(&desc.pod_template_hash)
1645            {
1646                match self
1647                    .pod_api
1648                    .delete(&pod_name, &DeleteParams::default())
1649                    .await
1650                {
1651                    Ok(_) => (),
1652                    // Pod got deleted while we were looking at it.
1653                    Err(kube::Error::Api(e)) if e.code == 404 => (),
1654                    Err(e) => return Err(e),
1655                }
1656            }
1657        }
1658
1659        Ok(())
1660    }
1661
1662    async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1663        let res = self
1664            .stateful_set_api
1665            .delete(name, &DeleteParams::default())
1666            .await;
1667        match res {
1668            Ok(_) => (),
1669            Err(K8sError::Api(e)) if e.code == 404 => (),
1670            Err(e) => return Err(e),
1671        }
1672
1673        let res = self
1674            .service_api
1675            .delete(name, &DeleteParams::default())
1676            .await;
1677        match res {
1678            Ok(_) => Ok(()),
1679            Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1680            Err(e) => Err(e),
1681        }
1682    }
1683
1684    async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1685        let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1686        let name_prefix = format!("{}{namespace}-", self.name_prefix);
1687        Ok(stateful_sets
1688            .into_iter()
1689            .filter_map(|ss| {
1690                ss.metadata
1691                    .name
1692                    .unwrap()
1693                    .strip_prefix(&name_prefix)
1694                    .map(Into::into)
1695            })
1696            .collect())
1697    }
1698}
1699
1700#[derive(Debug, Clone)]
1701struct KubernetesService {
1702    hosts: Vec<String>,
1703    ports: BTreeMap<String, u16>,
1704}
1705
1706impl Service for KubernetesService {
1707    fn addresses(&self, port: &str) -> Vec<String> {
1708        let port = self.ports[port];
1709        self.hosts
1710            .iter()
1711            .map(|host| format!("{host}:{port}"))
1712            .collect()
1713    }
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718    use super::*;
1719
1720    #[mz_ore::test]
1721    fn k8s_quantity_base10_large() {
1722        let cases = &[
1723            ("42", 42),
1724            ("42k", 42000),
1725            ("42M", 42000000),
1726            ("42G", 42000000000),
1727            ("42T", 42000000000000),
1728            ("42P", 42000000000000000),
1729        ];
1730
1731        for (input, expected) in cases {
1732            let quantity = parse_k8s_quantity(input).unwrap();
1733            let number = quantity.try_to_integer(0, true).unwrap();
1734            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1735        }
1736    }
1737
1738    #[mz_ore::test]
1739    fn k8s_quantity_base10_small() {
1740        let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1741
1742        for (input, expected) in cases {
1743            let quantity = parse_k8s_quantity(input).unwrap();
1744            let number = quantity.try_to_integer(-9, true).unwrap();
1745            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1746        }
1747    }
1748
1749    #[mz_ore::test]
1750    fn k8s_quantity_base2() {
1751        let cases = &[
1752            ("42Ki", 42 << 10),
1753            ("42Mi", 42 << 20),
1754            ("42Gi", 42 << 30),
1755            ("42Ti", 42 << 40),
1756            ("42Pi", 42 << 50),
1757        ];
1758
1759        for (input, expected) in cases {
1760            let quantity = parse_k8s_quantity(input).unwrap();
1761            let number = quantity.try_to_integer(0, false).unwrap();
1762            assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1763        }
1764    }
1765}