1use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
24use k8s_openapi::api::core::v1::{
25 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
26 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
27 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
28 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
29 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
30 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
31 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
32 WeightedPodAffinityTerm,
33};
34use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
35use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
36 LabelSelector, LabelSelectorRequirement, OwnerReference,
37};
38use kube::ResourceExt;
39use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
40use kube::client::Client;
41use kube::error::Error as K8sError;
42use kube::runtime::{WatchStreamExt, watcher};
43use maplit::btreemap;
44use mz_cloud_resources::AwsExternalIdPrefix;
45use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
46use mz_orchestrator::{
47 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
48 OfflineReason, Orchestrator, Service, ServiceConfig, ServiceEvent, ServiceProcessMetrics,
49 ServiceStatus, scheduling_config::*,
50};
51use mz_ore::retry::Retry;
52use mz_ore::task::AbortOnDropHandle;
53use serde::Deserialize;
54use sha2::{Digest, Sha256};
55use tokio::sync::{mpsc, oneshot};
56use tracing::{info, warn};
57
58pub mod cloud_resource_controller;
59pub mod secrets;
60pub mod util;
61
62const FIELD_MANAGER: &str = "environmentd";
63const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
64
65const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
66
67#[derive(Debug, Clone)]
69pub struct KubernetesOrchestratorConfig {
70 pub context: String,
73 pub scheduler_name: Option<String>,
75 pub service_labels: BTreeMap<String, String>,
77 pub service_node_selector: BTreeMap<String, String>,
79 pub service_account: Option<String>,
81 pub image_pull_policy: KubernetesImagePullPolicy,
83 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
86 pub coverage: bool,
88 pub ephemeral_volume_storage_class: Option<String>,
94 pub service_fs_group: Option<i64>,
96 pub name_prefix: Option<String>,
98 pub collect_pod_metrics: bool,
100 pub enable_prometheus_scrape_annotations: bool,
102}
103
104impl KubernetesOrchestratorConfig {
105 pub fn name_prefix(&self) -> String {
106 self.name_prefix.clone().unwrap_or_default()
107 }
108}
109
110#[derive(ValueEnum, Debug, Clone, Copy)]
112pub enum KubernetesImagePullPolicy {
113 Always,
115 IfNotPresent,
117 Never,
119}
120
121impl fmt::Display for KubernetesImagePullPolicy {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 match self {
124 KubernetesImagePullPolicy::Always => f.write_str("Always"),
125 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
126 KubernetesImagePullPolicy::Never => f.write_str("Never"),
127 }
128 }
129}
130
131impl KubernetesImagePullPolicy {
132 pub fn as_kebab_case_str(&self) -> &'static str {
133 match self {
134 Self::Always => "always",
135 Self::IfNotPresent => "if-not-present",
136 Self::Never => "never",
137 }
138 }
139}
140
141pub struct KubernetesOrchestrator {
143 client: Client,
144 kubernetes_namespace: String,
145 config: KubernetesOrchestratorConfig,
146 secret_api: Api<Secret>,
147 vpc_endpoint_api: Api<VpcEndpoint>,
148 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
149 resource_reader: Arc<KubernetesResourceReader>,
150}
151
152impl fmt::Debug for KubernetesOrchestrator {
153 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
154 f.debug_struct("KubernetesOrchestrator").finish()
155 }
156}
157
158impl KubernetesOrchestrator {
159 pub async fn new(
161 config: KubernetesOrchestratorConfig,
162 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
163 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
164 let resource_reader =
165 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
166 Ok(KubernetesOrchestrator {
167 client: client.clone(),
168 kubernetes_namespace,
169 config,
170 secret_api: Api::default_namespaced(client.clone()),
171 vpc_endpoint_api: Api::default_namespaced(client),
172 namespaces: Mutex::new(BTreeMap::new()),
173 resource_reader,
174 })
175 }
176}
177
178impl Orchestrator for KubernetesOrchestrator {
179 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
180 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
181 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
182 let (command_tx, command_rx) = mpsc::unbounded_channel();
183 let worker = OrchestratorWorker {
184 metrics_api: Api::default_namespaced(self.client.clone()),
185 custom_metrics_api: Api::default_namespaced(self.client.clone()),
186 service_api: Api::default_namespaced(self.client.clone()),
187 stateful_set_api: Api::default_namespaced(self.client.clone()),
188 pod_api: Api::default_namespaced(self.client.clone()),
189 owner_references: vec![],
190 command_rx,
191 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
192 collect_pod_metrics: self.config.collect_pod_metrics,
193 }
194 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
195
196 Arc::new(NamespacedKubernetesOrchestrator {
197 pod_api: Api::default_namespaced(self.client.clone()),
198 kubernetes_namespace: self.kubernetes_namespace.clone(),
199 namespace: namespace.into(),
200 config: self.config.clone(),
201 scheduling_config: Default::default(),
203 service_infos: std::sync::Mutex::new(BTreeMap::new()),
204 command_tx,
205 _worker: worker,
206 })
207 }))
208 }
209}
210
211#[derive(Clone, Copy)]
212struct ServiceInfo {
213 scale: u16,
214 disk: bool,
215 disk_limit: Option<DiskLimit>,
216}
217
218struct NamespacedKubernetesOrchestrator {
219 pod_api: Api<Pod>,
220 kubernetes_namespace: String,
221 namespace: String,
222 config: KubernetesOrchestratorConfig,
223 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
224 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
225 command_tx: mpsc::UnboundedSender<WorkerCommand>,
226 _worker: AbortOnDropHandle<()>,
227}
228
229impl fmt::Debug for NamespacedKubernetesOrchestrator {
230 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
231 f.debug_struct("NamespacedKubernetesOrchestrator")
232 .field("kubernetes_namespace", &self.kubernetes_namespace)
233 .field("namespace", &self.namespace)
234 .field("config", &self.config)
235 .finish()
236 }
237}
238
239enum WorkerCommand {
245 EnsureService {
246 desc: ServiceDescription,
247 },
248 DropService {
249 name: String,
250 },
251 ListServices {
252 namespace: String,
253 result_tx: oneshot::Sender<Vec<String>>,
254 },
255 FetchServiceMetrics {
256 name: String,
257 info: ServiceInfo,
258 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
259 },
260}
261
262#[derive(Debug, Clone)]
264struct ServiceDescription {
265 name: String,
266 scale: u16,
267 service: K8sService,
268 stateful_set: StatefulSet,
269 pod_template_hash: String,
270}
271
272struct OrchestratorWorker {
284 metrics_api: Api<PodMetrics>,
285 custom_metrics_api: Api<MetricValueList>,
286 service_api: Api<K8sService>,
287 stateful_set_api: Api<StatefulSet>,
288 pod_api: Api<Pod>,
289 owner_references: Vec<OwnerReference>,
290 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
291 name_prefix: String,
292 collect_pod_metrics: bool,
293}
294
295#[derive(Deserialize, Clone, Debug)]
296pub struct PodMetricsContainer {
297 pub name: String,
298 pub usage: PodMetricsContainerUsage,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainerUsage {
303 pub cpu: Quantity,
304 pub memory: Quantity,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetrics {
309 pub metadata: ObjectMeta,
310 pub timestamp: String,
311 pub window: String,
312 pub containers: Vec<PodMetricsContainer>,
313}
314
315impl k8s_openapi::Resource for PodMetrics {
316 const GROUP: &'static str = "metrics.k8s.io";
317 const KIND: &'static str = "PodMetrics";
318 const VERSION: &'static str = "v1beta1";
319 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
320 const URL_PATH_SEGMENT: &'static str = "pods";
321
322 type Scope = k8s_openapi::NamespaceResourceScope;
323}
324
325impl k8s_openapi::Metadata for PodMetrics {
326 type Ty = ObjectMeta;
327
328 fn metadata(&self) -> &Self::Ty {
329 &self.metadata
330 }
331
332 fn metadata_mut(&mut self) -> &mut Self::Ty {
333 &mut self.metadata
334 }
335}
336
337#[derive(Deserialize, Clone, Debug)]
346pub struct MetricIdentifier {
347 #[serde(rename = "metricName")]
348 pub name: String,
349 }
351
352#[derive(Deserialize, Clone, Debug)]
353pub struct MetricValue {
354 #[serde(rename = "describedObject")]
355 pub described_object: ObjectReference,
356 #[serde(flatten)]
357 pub metric_identifier: MetricIdentifier,
358 pub timestamp: String,
359 pub value: Quantity,
360 }
362
363#[derive(Deserialize, Clone, Debug)]
364pub struct MetricValueList {
365 pub metadata: ObjectMeta,
366 pub items: Vec<MetricValue>,
367}
368
369impl k8s_openapi::Resource for MetricValueList {
370 const GROUP: &'static str = "custom.metrics.k8s.io";
371 const KIND: &'static str = "MetricValueList";
372 const VERSION: &'static str = "v1beta1";
373 const API_VERSION: &'static str = "custom.metrics.k8s.io/v1beta1";
374 const URL_PATH_SEGMENT: &'static str = "persistentvolumeclaims";
375
376 type Scope = k8s_openapi::NamespaceResourceScope;
377}
378
379impl k8s_openapi::Metadata for MetricValueList {
380 type Ty = ObjectMeta;
381
382 fn metadata(&self) -> &Self::Ty {
383 &self.metadata
384 }
385
386 fn metadata_mut(&mut self) -> &mut Self::Ty {
387 &mut self.metadata
388 }
389}
390
391impl NamespacedKubernetesOrchestrator {
392 fn service_name(&self, id: &str) -> String {
393 format!(
394 "{}{}-{id}",
395 self.config.name_prefix.as_deref().unwrap_or(""),
396 self.namespace
397 )
398 }
399
400 fn watch_pod_params(&self) -> watcher::Config {
403 let ns_selector = format!(
404 "environmentd.materialize.cloud/namespace={}",
405 self.namespace
406 );
407 watcher::Config::default().labels(&ns_selector)
408 }
409
410 fn make_label_key(&self, key: &str) -> String {
413 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
414 }
415
416 fn label_selector_to_k8s(
417 &self,
418 MzLabelSelector { label_name, logic }: MzLabelSelector,
419 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
420 let (operator, values) = match logic {
421 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
422 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
423 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
424 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
425 LabelSelectionLogic::InSet { values } => {
426 if values.is_empty() {
427 Err(anyhow!(
428 "Invalid selector logic for {label_name}: empty `in` set"
429 ))
430 } else {
431 Ok(("In", values))
432 }
433 }
434 LabelSelectionLogic::NotInSet { values } => {
435 if values.is_empty() {
436 Err(anyhow!(
437 "Invalid selector logic for {label_name}: empty `notin` set"
438 ))
439 } else {
440 Ok(("NotIn", values))
441 }
442 }
443 }?;
444 let lsr = LabelSelectorRequirement {
445 key: self.make_label_key(&label_name),
446 operator: operator.to_string(),
447 values: Some(values),
448 };
449 Ok(lsr)
450 }
451
452 fn send_command(&self, cmd: WorkerCommand) {
453 self.command_tx.send(cmd).expect("worker task not dropped");
454 }
455}
456
457#[derive(Debug)]
458struct ScaledQuantity {
459 integral_part: u64,
460 exponent: i8,
461 base10: bool,
462}
463
464impl ScaledQuantity {
465 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
466 if base10 != self.base10 {
467 return None;
468 }
469 let exponent = self.exponent - scale;
470 let mut result = self.integral_part;
471 let base = if self.base10 { 10 } else { 2 };
472 if exponent < 0 {
473 for _ in exponent..0 {
474 result /= base;
475 }
476 } else {
477 for _ in 0..exponent {
478 result = result.checked_mul(base)?;
479 }
480 }
481 Some(result)
482 }
483}
484
485fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
495 const DEC_SUFFIXES: &[(&str, i8)] = &[
496 ("n", -9),
497 ("u", -6),
498 ("m", -3),
499 ("", 0),
500 ("k", 3), ("M", 6),
502 ("G", 9),
503 ("T", 12),
504 ("P", 15),
505 ("E", 18),
506 ];
507 const BIN_SUFFIXES: &[(&str, i8)] = &[
508 ("", 0),
509 ("Ki", 10),
510 ("Mi", 20),
511 ("Gi", 30),
512 ("Ti", 40),
513 ("Pi", 50),
514 ("Ei", 60),
515 ];
516
517 let (positive, s) = match s.chars().next() {
518 Some('+') => (true, &s[1..]),
519 Some('-') => (false, &s[1..]),
520 _ => (true, s),
521 };
522
523 if !positive {
524 anyhow::bail!("Negative numbers not supported")
525 }
526
527 fn is_suffix_char(ch: char) -> bool {
528 "numkMGTPEKi".contains(ch)
529 }
530 let (num, suffix) = match s.find(is_suffix_char) {
531 None => (s, ""),
532 Some(idx) => s.split_at(idx),
533 };
534 let num: u64 = num.parse()?;
535 let (exponent, base10) = if let Some((_, exponent)) =
536 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
537 {
538 (exponent, true)
539 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
540 (exponent, false)
541 } else {
542 anyhow::bail!("Unrecognized suffix: {suffix}");
543 };
544 Ok(ScaledQuantity {
545 integral_part: num,
546 exponent: *exponent,
547 base10,
548 })
549}
550
551#[async_trait]
552impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
553 async fn fetch_service_metrics(
554 &self,
555 id: &str,
556 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
557 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
558 *info
559 } else {
560 tracing::error!("Failed to get info for {id}");
562 anyhow::bail!("Failed to get info for {id}");
563 };
564
565 let (result_tx, result_rx) = oneshot::channel();
566 self.send_command(WorkerCommand::FetchServiceMetrics {
567 name: self.service_name(id),
568 info,
569 result_tx,
570 });
571
572 let metrics = result_rx.await.expect("worker task not dropped");
573 Ok(metrics)
574 }
575
576 fn ensure_service(
577 &self,
578 id: &str,
579 ServiceConfig {
580 image,
581 init_container_image,
582 args,
583 ports: ports_in,
584 memory_limit,
585 cpu_limit,
586 scale,
587 labels: labels_in,
588 availability_zones,
589 other_replicas_selector,
590 replicas_selector,
591 disk: disk_in,
592 disk_limit,
593 node_selector,
594 }: ServiceConfig,
595 ) -> Result<Box<dyn Service>, anyhow::Error> {
596 let scheduling_config: ServiceSchedulingConfig =
598 self.scheduling_config.read().expect("poisoned").clone();
599
600 let disk = {
604 let user_requested_disk = disk_in || scheduling_config.always_use_disk;
607 let size_disables_disk = disk_limit == Some(DiskLimit::ZERO);
611 user_requested_disk && !size_disables_disk
618 };
619
620 let name = self.service_name(id);
621 let match_labels = btreemap! {
627 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
628 "environmentd.materialize.cloud/service-id".into() => id.into(),
629 };
630 let mut labels = match_labels.clone();
631 for (key, value) in labels_in {
632 labels.insert(self.make_label_key(&key), value);
633 }
634
635 labels.insert(self.make_label_key("scale"), scale.to_string());
636
637 for port in &ports_in {
638 labels.insert(
639 format!("environmentd.materialize.cloud/port-{}", port.name),
640 "true".into(),
641 );
642 }
643 for (key, value) in &self.config.service_labels {
644 labels.insert(key.clone(), value.clone());
645 }
646 let mut limits = BTreeMap::new();
647 if let Some(memory_limit) = memory_limit {
648 limits.insert(
649 "memory".into(),
650 Quantity(memory_limit.0.as_u64().to_string()),
651 );
652 }
653 if let Some(cpu_limit) = cpu_limit {
654 limits.insert(
655 "cpu".into(),
656 Quantity(format!("{}m", cpu_limit.as_millicpus())),
657 );
658 }
659 let service = K8sService {
660 metadata: ObjectMeta {
661 name: Some(name.clone()),
662 ..Default::default()
663 },
664 spec: Some(ServiceSpec {
665 ports: Some(
666 ports_in
667 .iter()
668 .map(|port| ServicePort {
669 port: port.port_hint.into(),
670 name: Some(port.name.clone()),
671 ..Default::default()
672 })
673 .collect(),
674 ),
675 cluster_ip: Some("None".to_string()),
676 selector: Some(match_labels.clone()),
677 ..Default::default()
678 }),
679 status: None,
680 };
681
682 let hosts = (0..scale)
683 .map(|i| {
684 format!(
685 "{name}-{i}.{name}.{}.svc.cluster.local",
686 self.kubernetes_namespace
687 )
688 })
689 .collect::<Vec<_>>();
690 let ports = ports_in
691 .iter()
692 .map(|p| (p.name.clone(), p.port_hint))
693 .collect::<BTreeMap<_, _>>();
694 let listen_addrs = ports_in
695 .iter()
696 .map(|p| (p.name.clone(), format!("0.0.0.0:{}", p.port_hint)))
697 .collect::<BTreeMap<_, _>>();
698 let mut args = args(&listen_addrs);
699
700 let anti_affinity = Some({
709 let label_selector_requirements = other_replicas_selector
710 .clone()
711 .into_iter()
712 .map(|ls| self.label_selector_to_k8s(ls))
713 .collect::<Result<Vec<_>, _>>()?;
714 let ls = LabelSelector {
715 match_expressions: Some(label_selector_requirements),
716 ..Default::default()
717 };
718 let pat = PodAffinityTerm {
719 label_selector: Some(ls),
720 topology_key: "kubernetes.io/hostname".to_string(),
721 ..Default::default()
722 };
723
724 if !scheduling_config.soften_replication_anti_affinity {
725 PodAntiAffinity {
726 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
727 ..Default::default()
728 }
729 } else {
730 PodAntiAffinity {
731 preferred_during_scheduling_ignored_during_execution: Some(vec![
732 WeightedPodAffinityTerm {
733 weight: scheduling_config.soften_replication_anti_affinity_weight,
734 pod_affinity_term: pat,
735 },
736 ]),
737 ..Default::default()
738 }
739 }
740 });
741
742 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
743 let ls = LabelSelector {
745 match_labels: Some(match_labels.clone()),
746 ..Default::default()
747 };
748 let pat = PodAffinityTerm {
749 label_selector: Some(ls),
750 topology_key: "topology.kubernetes.io/zone".to_string(),
751 ..Default::default()
752 };
753
754 Some(PodAffinity {
755 preferred_during_scheduling_ignored_during_execution: Some(vec![
756 WeightedPodAffinityTerm {
757 weight,
758 pod_affinity_term: pat,
759 },
760 ]),
761 ..Default::default()
762 })
763 } else {
764 None
765 };
766
767 let topology_spread = if scheduling_config.topology_spread.enabled {
768 let config = &scheduling_config.topology_spread;
769
770 if !config.ignore_non_singular_scale || scale <= 1 {
771 let label_selector_requirements = (if config.ignore_non_singular_scale {
772 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
773
774 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
775 label_name: "scale".into(),
776 logic: mz_orchestrator::LabelSelectionLogic::Eq {
777 value: "1".to_string(),
778 },
779 });
780
781 replicas_selector_ignoring_scale
782 } else {
783 replicas_selector
784 })
785 .into_iter()
786 .map(|ls| self.label_selector_to_k8s(ls))
787 .collect::<Result<Vec<_>, _>>()?;
788 let ls = LabelSelector {
789 match_expressions: Some(label_selector_requirements),
790 ..Default::default()
791 };
792
793 let constraint = TopologySpreadConstraint {
794 label_selector: Some(ls),
795 min_domains: None,
796 max_skew: config.max_skew,
797 topology_key: "topology.kubernetes.io/zone".to_string(),
798 when_unsatisfiable: if config.soft {
799 "ScheduleAnyway".to_string()
800 } else {
801 "DoNotSchedule".to_string()
802 },
803 match_label_keys: None,
813 ..Default::default()
816 };
817 Some(vec![constraint])
818 } else {
819 None
820 }
821 } else {
822 None
823 };
824
825 let mut pod_annotations = btreemap! {
826 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
832 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
833
834 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
836 };
837 if self.config.enable_prometheus_scrape_annotations {
838 if let Some(internal_http_port) = ports_in
839 .iter()
840 .find(|port| port.name == "internal-http")
841 .map(|port| port.port_hint.to_string())
842 {
843 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
845 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
846 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
847 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
848 }
849 }
850
851 let default_node_selector = if disk {
852 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
853 } else {
854 vec![]
858 };
859
860 let node_selector: BTreeMap<String, String> = default_node_selector
861 .into_iter()
862 .chain(self.config.service_node_selector.clone())
863 .chain(node_selector)
864 .collect();
865
866 let node_affinity = if let Some(availability_zones) = availability_zones {
867 let selector = NodeSelectorTerm {
868 match_expressions: Some(vec![NodeSelectorRequirement {
869 key: "materialize.cloud/availability-zone".to_string(),
870 operator: "In".to_string(),
871 values: Some(availability_zones),
872 }]),
873 match_fields: None,
874 };
875
876 if scheduling_config.soften_az_affinity {
877 Some(NodeAffinity {
878 preferred_during_scheduling_ignored_during_execution: Some(vec![
879 PreferredSchedulingTerm {
880 preference: selector,
881 weight: scheduling_config.soften_az_affinity_weight,
882 },
883 ]),
884 required_during_scheduling_ignored_during_execution: None,
885 })
886 } else {
887 Some(NodeAffinity {
888 preferred_during_scheduling_ignored_during_execution: None,
889 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
890 node_selector_terms: vec![selector],
891 }),
892 })
893 }
894 } else {
895 None
896 };
897
898 let container_name = image
899 .rsplit_once('/')
900 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
901 .context("`image` is not ORG/NAME:VERSION")?
902 .0
903 .to_string();
904
905 let container_security_context = if scheduling_config.security_context_enabled {
906 Some(SecurityContext {
907 privileged: Some(false),
908 run_as_non_root: Some(true),
909 allow_privilege_escalation: Some(false),
910 seccomp_profile: Some(SeccompProfile {
911 type_: "RuntimeDefault".to_string(),
912 ..Default::default()
913 }),
914 capabilities: Some(Capabilities {
915 drop: Some(vec!["ALL".to_string()]),
916 ..Default::default()
917 }),
918 ..Default::default()
919 })
920 } else {
921 None
922 };
923
924 let init_containers = init_container_image.map(|image| {
925 vec![Container {
926 name: "init".to_string(),
927 image: Some(image),
928 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
929 resources: Some(ResourceRequirements {
930 claims: None,
931 limits: Some(limits.clone()),
934 requests: Some(limits.clone()),
935 }),
936 security_context: container_security_context.clone(),
937 env: Some(vec![
938 EnvVar {
939 name: "MZ_NAMESPACE".to_string(),
940 value_from: Some(EnvVarSource {
941 field_ref: Some(ObjectFieldSelector {
942 field_path: "metadata.namespace".to_string(),
943 ..Default::default()
944 }),
945 ..Default::default()
946 }),
947 ..Default::default()
948 },
949 EnvVar {
950 name: "MZ_POD_NAME".to_string(),
951 value_from: Some(EnvVarSource {
952 field_ref: Some(ObjectFieldSelector {
953 field_path: "metadata.name".to_string(),
954 ..Default::default()
955 }),
956 ..Default::default()
957 }),
958 ..Default::default()
959 },
960 EnvVar {
961 name: "MZ_NODE_NAME".to_string(),
962 value_from: Some(EnvVarSource {
963 field_ref: Some(ObjectFieldSelector {
964 field_path: "spec.nodeName".to_string(),
965 ..Default::default()
966 }),
967 ..Default::default()
968 }),
969 ..Default::default()
970 },
971 ]),
972 ..Default::default()
973 }]
974 });
975
976 let env = if self.config.coverage {
977 Some(vec![EnvVar {
978 name: "LLVM_PROFILE_FILE".to_string(),
979 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
980 ..Default::default()
981 }])
982 } else {
983 None
984 };
985
986 let mut volume_mounts = vec![];
987
988 if self.config.coverage {
989 volume_mounts.push(VolumeMount {
990 name: "coverage".to_string(),
991 mount_path: "/coverage".to_string(),
992 ..Default::default()
993 })
994 }
995
996 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
997 (true, Some(ephemeral_volume_storage_class)) => {
998 volume_mounts.push(VolumeMount {
999 name: "scratch".to_string(),
1000 mount_path: "/scratch".to_string(),
1001 ..Default::default()
1002 });
1003 args.push("--scratch-directory=/scratch".into());
1004
1005 Some(vec![Volume {
1006 name: "scratch".to_string(),
1007 ephemeral: Some(EphemeralVolumeSource {
1008 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1009 spec: PersistentVolumeClaimSpec {
1010 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1011 storage_class_name: Some(
1012 ephemeral_volume_storage_class.to_string(),
1013 ),
1014 resources: Some(VolumeResourceRequirements {
1015 requests: Some(BTreeMap::from([(
1016 "storage".to_string(),
1017 Quantity(
1018 disk_limit
1019 .unwrap_or(DiskLimit::ARBITRARY)
1020 .0
1021 .as_u64()
1022 .to_string(),
1023 ),
1024 )])),
1025 ..Default::default()
1026 }),
1027 ..Default::default()
1028 },
1029 ..Default::default()
1030 }),
1031 ..Default::default()
1032 }),
1033 ..Default::default()
1034 }])
1035 }
1036 (true, None) => {
1037 return Err(anyhow!(
1038 "service requested disk but no ephemeral volume storage class was configured"
1039 ));
1040 }
1041 (false, _) => None,
1042 };
1043
1044 if let Some(name_prefix) = &self.config.name_prefix {
1045 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1046 }
1047
1048 let volume_claim_templates = if self.config.coverage {
1049 Some(vec![PersistentVolumeClaim {
1050 metadata: ObjectMeta {
1051 name: Some("coverage".to_string()),
1052 ..Default::default()
1053 },
1054 spec: Some(PersistentVolumeClaimSpec {
1055 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1056 resources: Some(VolumeResourceRequirements {
1057 requests: Some(BTreeMap::from([(
1058 "storage".to_string(),
1059 Quantity("10Gi".to_string()),
1060 )])),
1061 ..Default::default()
1062 }),
1063 ..Default::default()
1064 }),
1065 ..Default::default()
1066 }])
1067 } else {
1068 None
1069 };
1070
1071 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1072 Some(PodSecurityContext {
1073 fs_group: Some(fs_group),
1074 run_as_user: Some(fs_group),
1075 run_as_group: Some(fs_group),
1076 ..Default::default()
1077 })
1078 } else {
1079 None
1080 };
1081
1082 let tolerations = Some(vec![
1083 Toleration {
1088 effect: Some("NoExecute".into()),
1089 key: Some("node.kubernetes.io/not-ready".into()),
1090 operator: Some("Exists".into()),
1091 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1092 value: None,
1093 },
1094 Toleration {
1095 effect: Some("NoExecute".into()),
1096 key: Some("node.kubernetes.io/unreachable".into()),
1097 operator: Some("Exists".into()),
1098 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1099 value: None,
1100 },
1101 ]);
1102
1103 let mut pod_template_spec = PodTemplateSpec {
1104 metadata: Some(ObjectMeta {
1105 labels: Some(labels.clone()),
1106 annotations: Some(pod_annotations), ..Default::default()
1108 }),
1109 spec: Some(PodSpec {
1110 init_containers,
1111 containers: vec![Container {
1112 name: container_name,
1113 image: Some(image),
1114 args: Some(args),
1115 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1116 ports: Some(
1117 ports_in
1118 .iter()
1119 .map(|port| ContainerPort {
1120 container_port: port.port_hint.into(),
1121 name: Some(port.name.clone()),
1122 ..Default::default()
1123 })
1124 .collect(),
1125 ),
1126 security_context: container_security_context.clone(),
1127 resources: Some(ResourceRequirements {
1128 claims: None,
1129 limits: Some(limits.clone()),
1132 requests: Some(limits),
1133 }),
1134 volume_mounts: if !volume_mounts.is_empty() {
1135 Some(volume_mounts)
1136 } else {
1137 None
1138 },
1139 env,
1140 ..Default::default()
1141 }],
1142 volumes,
1143 security_context,
1144 node_selector: Some(node_selector),
1145 scheduler_name: self.config.scheduler_name.clone(),
1146 service_account: self.config.service_account.clone(),
1147 affinity: Some(Affinity {
1148 pod_anti_affinity: anti_affinity,
1149 pod_affinity,
1150 node_affinity,
1151 ..Default::default()
1152 }),
1153 topology_spread_constraints: topology_spread,
1154 tolerations,
1155 termination_grace_period_seconds: Some(0),
1177 ..Default::default()
1178 }),
1179 };
1180 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1181 let mut hasher = Sha256::new();
1182 hasher.update(pod_template_json);
1183 let pod_template_hash = format!("{:x}", hasher.finalize());
1184 pod_template_spec
1185 .metadata
1186 .as_mut()
1187 .unwrap()
1188 .annotations
1189 .as_mut()
1190 .unwrap()
1191 .insert(
1192 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1193 pod_template_hash.clone(),
1194 );
1195
1196 let stateful_set = StatefulSet {
1197 metadata: ObjectMeta {
1198 name: Some(name.clone()),
1199 ..Default::default()
1200 },
1201 spec: Some(StatefulSetSpec {
1202 selector: LabelSelector {
1203 match_labels: Some(match_labels),
1204 ..Default::default()
1205 },
1206 service_name: name.clone(),
1207 replicas: Some(scale.into()),
1208 template: pod_template_spec,
1209 pod_management_policy: Some("Parallel".to_string()),
1210 volume_claim_templates,
1211 ..Default::default()
1212 }),
1213 status: None,
1214 };
1215
1216 self.send_command(WorkerCommand::EnsureService {
1217 desc: ServiceDescription {
1218 name,
1219 scale,
1220 service,
1221 stateful_set,
1222 pod_template_hash,
1223 },
1224 });
1225
1226 self.service_infos.lock().expect("poisoned lock").insert(
1227 id.to_string(),
1228 ServiceInfo {
1229 scale,
1230 disk,
1231 disk_limit,
1232 },
1233 );
1234
1235 Ok(Box::new(KubernetesService { hosts, ports }))
1236 }
1237
1238 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1240 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1241 self.service_infos.lock().expect("poisoned lock").remove(id);
1242
1243 self.send_command(WorkerCommand::DropService {
1244 name: self.service_name(id),
1245 });
1246
1247 Ok(())
1248 }
1249
1250 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1252 let (result_tx, result_rx) = oneshot::channel();
1253 self.send_command(WorkerCommand::ListServices {
1254 namespace: self.namespace.clone(),
1255 result_tx,
1256 });
1257
1258 let list = result_rx.await.expect("worker task not dropped");
1259 Ok(list)
1260 }
1261
1262 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1263 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1264 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1265 let service_id_label = "environmentd.materialize.cloud/service-id";
1266 let service_id = pod
1267 .labels()
1268 .get(service_id_label)
1269 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1270 .clone();
1271
1272 let oomed = pod
1273 .status
1274 .as_ref()
1275 .and_then(|status| status.container_statuses.as_ref())
1276 .map(|container_statuses| {
1277 container_statuses.iter().any(|cs| {
1278 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1282 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1283 let termination_state = current_state.or(last_state);
1284
1285 let exit_code = termination_state.map(|s| s.exit_code);
1290 exit_code == Some(135) || exit_code == Some(137)
1291 })
1292 })
1293 .unwrap_or(false);
1294
1295 let (pod_ready, last_probe_time) = pod
1296 .status
1297 .and_then(|status| status.conditions)
1298 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1299 .map(|c| (c.status == "True", c.last_probe_time))
1300 .unwrap_or((false, None));
1301
1302 let status = if pod_ready {
1303 ServiceStatus::Online
1304 } else {
1305 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1306 };
1307 let time = if let Some(time) = last_probe_time {
1308 time.0
1309 } else {
1310 Utc::now()
1311 };
1312
1313 Ok(ServiceEvent {
1314 service_id,
1315 process_id,
1316 status,
1317 time,
1318 })
1319 }
1320
1321 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1322 .touched_objects()
1323 .filter_map(|object| async move {
1324 match object {
1325 Ok(pod) => Some(into_service_event(pod)),
1326 Err(error) => {
1327 tracing::warn!("service watch error: {error}");
1330 None
1331 }
1332 }
1333 });
1334 Box::pin(stream)
1335 }
1336
1337 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1338 *self.scheduling_config.write().expect("poisoned") = config;
1339 }
1340}
1341
1342impl OrchestratorWorker {
1343 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1344 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1345 }
1346
1347 async fn run(mut self) {
1348 {
1349 info!("initializing Kubernetes orchestrator worker");
1350 let start = Instant::now();
1351
1352 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1356 let orchestrator_pod = Retry::default()
1357 .clamp_backoff(Duration::from_secs(10))
1358 .retry_async(|_| self.pod_api.get(&hostname))
1359 .await
1360 .expect("always retries on error");
1361 self.owner_references
1362 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1363
1364 info!(
1365 "Kubernetes orchestrator worker initialized in {:?}",
1366 start.elapsed()
1367 );
1368 }
1369
1370 while let Some(cmd) = self.command_rx.recv().await {
1371 self.handle_command(cmd).await;
1372 }
1373 }
1374
1375 async fn handle_command(&self, cmd: WorkerCommand) {
1381 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1382 where
1383 F: Fn() -> U,
1384 U: Future<Output = Result<R, K8sError>>,
1385 {
1386 Retry::default()
1387 .clamp_backoff(Duration::from_secs(10))
1388 .retry_async(|_| {
1389 f().map_err(
1390 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1391 )
1392 })
1393 .await
1394 .expect("always retries on error")
1395 }
1396
1397 use WorkerCommand::*;
1398 match cmd {
1399 EnsureService { desc } => {
1400 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1401 }
1402 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1403 ListServices {
1404 namespace,
1405 result_tx,
1406 } => {
1407 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1408 let _ = result_tx.send(result);
1409 }
1410 FetchServiceMetrics {
1411 name,
1412 info,
1413 result_tx,
1414 } => {
1415 let result = self.fetch_service_metrics(&name, &info).await;
1416 let _ = result_tx.send(result);
1417 }
1418 }
1419 }
1420
1421 async fn fetch_service_metrics(
1422 &self,
1423 name: &str,
1424 info: &ServiceInfo,
1425 ) -> Vec<ServiceProcessMetrics> {
1426 if !self.collect_pod_metrics {
1427 return (0..info.scale)
1428 .map(|_| ServiceProcessMetrics::default())
1429 .collect();
1430 }
1431
1432 async fn get_metrics(
1438 self_: &OrchestratorWorker,
1439 service_name: &str,
1440 i: usize,
1441 disk: bool,
1442 disk_limit: Option<DiskLimit>,
1443 ) -> ServiceProcessMetrics {
1444 let name = format!("{service_name}-{i}");
1445
1446 let disk_usage_fut = async {
1447 if disk {
1448 Some(
1449 self_
1450 .custom_metrics_api
1451 .get_subresource(
1452 "kubelet_volume_stats_used_bytes",
1453 &format!("{name}-scratch"),
1458 )
1459 .await,
1460 )
1461 } else {
1462 None
1463 }
1464 };
1465 let disk_capacity_fut = async {
1466 if disk {
1467 Some(
1468 self_
1469 .custom_metrics_api
1470 .get_subresource(
1471 "kubelet_volume_stats_capacity_bytes",
1472 &format!("{name}-scratch"),
1473 )
1474 .await,
1475 )
1476 } else {
1477 None
1478 }
1479 };
1480 let (metrics, disk_usage, disk_capacity) = match futures::future::join3(
1481 self_.metrics_api.get(&name),
1482 disk_usage_fut,
1483 disk_capacity_fut,
1484 )
1485 .await
1486 {
1487 (Ok(metrics), disk_usage, disk_capacity) => {
1488 let disk_usage = match disk_usage {
1493 Some(Ok(disk_usage)) => Some(disk_usage),
1494 Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
1495 warn!(
1496 "Failed to fetch `kubelet_volume_stats_used_bytes` for {name}: {e}"
1497 );
1498 None
1499 }
1500 _ => None,
1501 };
1502
1503 let disk_capacity = match disk_capacity {
1504 Some(Ok(disk_capacity)) => Some(disk_capacity),
1505 Some(Err(e)) if !matches!(&e, K8sError::Api(e) if e.code == 404) => {
1506 warn!(
1507 "Failed to fetch `kubelet_volume_stats_capacity_bytes` for {name}: {e}"
1508 );
1509 None
1510 }
1511 _ => None,
1512 };
1513
1514 (metrics, disk_usage, disk_capacity)
1515 }
1516 (Err(e), _, _) => {
1517 warn!("Failed to get metrics for {name}: {e}");
1518 return ServiceProcessMetrics::default();
1519 }
1520 };
1521 let Some(PodMetricsContainer {
1522 usage:
1523 PodMetricsContainerUsage {
1524 cpu: Quantity(cpu_str),
1525 memory: Quantity(mem_str),
1526 },
1527 ..
1528 }) = metrics.containers.get(0)
1529 else {
1530 warn!("metrics result contained no containers for {name}");
1531 return ServiceProcessMetrics::default();
1532 };
1533
1534 let cpu = match parse_k8s_quantity(cpu_str) {
1535 Ok(q) => match q.try_to_integer(-9, true) {
1536 Some(i) => Some(i),
1537 None => {
1538 tracing::error!("CPU value {q:? }out of range");
1539 None
1540 }
1541 },
1542 Err(e) => {
1543 tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1544 None
1545 }
1546 };
1547 let memory = match parse_k8s_quantity(mem_str) {
1548 Ok(q) => match q.try_to_integer(0, false) {
1549 Some(i) => Some(i),
1550 None => {
1551 tracing::error!("Memory value {q:?} out of range");
1552 None
1553 }
1554 },
1555 Err(e) => {
1556 tracing::error!("Failed to parse memory value {mem_str}: {e}");
1557 None
1558 }
1559 };
1560
1561 let disk_usage = match (disk_usage, disk_capacity) {
1562 (Some(disk_usage_metrics), Some(disk_capacity_metrics)) => {
1563 if !disk_usage_metrics.items.is_empty()
1564 && !disk_capacity_metrics.items.is_empty()
1565 {
1566 let disk_usage_str = &*disk_usage_metrics.items[0].value.0;
1567 let disk_usage = match parse_k8s_quantity(disk_usage_str) {
1568 Ok(q) => match q.try_to_integer(0, true) {
1569 Some(i) => Some(i),
1570 None => {
1571 tracing::error!("Disk usage value {q:?} out of range");
1572 None
1573 }
1574 },
1575 Err(e) => {
1576 tracing::error!(
1577 "Failed to parse disk usage value {disk_usage_str}: {e}"
1578 );
1579 None
1580 }
1581 };
1582 let disk_capacity_str = &*disk_capacity_metrics.items[0].value.0;
1583 let disk_capacity = match parse_k8s_quantity(disk_capacity_str) {
1584 Ok(q) => match q.try_to_integer(0, true) {
1585 Some(i) => Some(i),
1586 None => {
1587 tracing::error!("Disk capacity value {q:?} out of range");
1588 None
1589 }
1590 },
1591 Err(e) => {
1592 tracing::error!(
1593 "Failed to parse disk capacity value {disk_capacity_str}: {e}"
1594 );
1595 None
1596 }
1597 };
1598
1599 match (disk_usage, disk_capacity, disk_limit) {
1616 (
1617 Some(disk_usage),
1618 Some(disk_capacity),
1619 Some(DiskLimit(disk_limit)),
1620 ) => {
1621 let disk_capacity = if disk_limit.0 < disk_capacity {
1622 tracing::debug!(
1626 "disk capacity {} is larger than the disk limit {}; \
1627 disk usage will indicate 100% full before the disk is truly full; \
1628 as long as the delta is small this is not a cause for concern",
1629 disk_capacity,
1630 disk_limit.0
1631 );
1632 disk_limit.0
1634 } else {
1635 disk_capacity
1636 };
1637
1638 let disk_usage = (disk_limit.0 - disk_capacity)
1640 .checked_add(disk_usage)
1641 .unwrap_or(disk_limit.0);
1642
1643 Some(std::cmp::min(disk_usage, disk_limit.0))
1647 }
1648 _ => None,
1649 }
1650 } else {
1651 None
1652 }
1653 }
1654 _ => None,
1655 };
1656
1657 ServiceProcessMetrics {
1658 cpu_nano_cores: cpu,
1659 memory_bytes: memory,
1660 disk_usage_bytes: disk_usage,
1661 }
1662 }
1663 let ret = futures::future::join_all(
1664 (0..info.scale).map(|i| get_metrics(self, name, i.into(), info.disk, info.disk_limit)),
1665 );
1666
1667 ret.await
1668 }
1669
1670 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1671 desc.service
1677 .metadata
1678 .owner_references
1679 .get_or_insert(vec![])
1680 .extend(self.owner_references.iter().cloned());
1681 desc.stateful_set
1682 .metadata
1683 .owner_references
1684 .get_or_insert(vec![])
1685 .extend(self.owner_references.iter().cloned());
1686
1687 self.service_api
1688 .patch(
1689 &desc.name,
1690 &PatchParams::apply(FIELD_MANAGER).force(),
1691 &Patch::Apply(desc.service),
1692 )
1693 .await?;
1694 self.stateful_set_api
1695 .patch(
1696 &desc.name,
1697 &PatchParams::apply(FIELD_MANAGER).force(),
1698 &Patch::Apply(desc.stateful_set),
1699 )
1700 .await?;
1701
1702 for pod_id in 0..desc.scale {
1707 let pod_name = format!("{}-{pod_id}", desc.name);
1708 let pod = match self.pod_api.get(&pod_name).await {
1709 Ok(pod) => pod,
1710 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1712 Err(e) => return Err(e),
1713 };
1714 if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION) != Some(&desc.pod_template_hash)
1715 {
1716 match self
1717 .pod_api
1718 .delete(&pod_name, &DeleteParams::default())
1719 .await
1720 {
1721 Ok(_) => (),
1722 Err(kube::Error::Api(e)) if e.code == 404 => (),
1724 Err(e) => return Err(e),
1725 }
1726 }
1727 }
1728
1729 Ok(())
1730 }
1731
1732 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1733 let res = self
1734 .stateful_set_api
1735 .delete(name, &DeleteParams::default())
1736 .await;
1737 match res {
1738 Ok(_) => (),
1739 Err(K8sError::Api(e)) if e.code == 404 => (),
1740 Err(e) => return Err(e),
1741 }
1742
1743 let res = self
1744 .service_api
1745 .delete(name, &DeleteParams::default())
1746 .await;
1747 match res {
1748 Ok(_) => Ok(()),
1749 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1750 Err(e) => Err(e),
1751 }
1752 }
1753
1754 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1755 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1756 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1757 Ok(stateful_sets
1758 .into_iter()
1759 .filter_map(|ss| {
1760 ss.metadata
1761 .name
1762 .unwrap()
1763 .strip_prefix(&name_prefix)
1764 .map(Into::into)
1765 })
1766 .collect())
1767 }
1768}
1769
1770#[derive(Debug, Clone)]
1771struct KubernetesService {
1772 hosts: Vec<String>,
1773 ports: BTreeMap<String, u16>,
1774}
1775
1776impl Service for KubernetesService {
1777 fn addresses(&self, port: &str) -> Vec<String> {
1778 let port = self.ports[port];
1779 self.hosts
1780 .iter()
1781 .map(|host| format!("{host}:{port}"))
1782 .collect()
1783 }
1784}
1785
1786#[cfg(test)]
1787mod tests {
1788 use super::*;
1789
1790 #[mz_ore::test]
1791 fn k8s_quantity_base10_large() {
1792 let cases = &[
1793 ("42", 42),
1794 ("42k", 42000),
1795 ("42M", 42000000),
1796 ("42G", 42000000000),
1797 ("42T", 42000000000000),
1798 ("42P", 42000000000000000),
1799 ];
1800
1801 for (input, expected) in cases {
1802 let quantity = parse_k8s_quantity(input).unwrap();
1803 let number = quantity.try_to_integer(0, true).unwrap();
1804 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1805 }
1806 }
1807
1808 #[mz_ore::test]
1809 fn k8s_quantity_base10_small() {
1810 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1811
1812 for (input, expected) in cases {
1813 let quantity = parse_k8s_quantity(input).unwrap();
1814 let number = quantity.try_to_integer(-9, true).unwrap();
1815 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1816 }
1817 }
1818
1819 #[mz_ore::test]
1820 fn k8s_quantity_base2() {
1821 let cases = &[
1822 ("42Ki", 42 << 10),
1823 ("42Mi", 42 << 20),
1824 ("42Gi", 42 << 30),
1825 ("42Ti", 42 << 40),
1826 ("42Pi", 42 << 50),
1827 ];
1828
1829 for (input, expected) in cases {
1830 let quantity = parse_k8s_quantity(input).unwrap();
1831 let number = quantity.try_to_integer(0, false).unwrap();
1832 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1833 }
1834 }
1835}