1use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34 WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38 LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use k8s_openapi::jiff::Timestamp;
41use kube::ResourceExt;
42use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
43use kube::client::Client;
44use kube::error::Error as K8sError;
45use kube::runtime::{WatchStreamExt, watcher};
46use maplit::btreemap;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
49use mz_orchestrator::{
50 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
51 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
52 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
53};
54use mz_ore::cast::CastInto;
55use mz_ore::retry::Retry;
56use mz_ore::task::AbortOnDropHandle;
57use serde::Deserialize;
58use sha2::{Digest, Sha256};
59use tokio::sync::{mpsc, oneshot};
60use tracing::{error, info, warn};
61
62pub mod cloud_resource_controller;
63pub mod secrets;
64pub mod util;
65
66const FIELD_MANAGER: &str = "environmentd";
67const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
68
69const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
70
71#[derive(Debug, Clone)]
73pub struct KubernetesOrchestratorConfig {
74 pub context: String,
77 pub scheduler_name: Option<String>,
79 pub service_annotations: BTreeMap<String, String>,
81 pub service_labels: BTreeMap<String, String>,
83 pub service_node_selector: BTreeMap<String, String>,
85 pub service_affinity: Option<String>,
87 pub service_tolerations: Option<String>,
89 pub service_account: Option<String>,
91 pub image_pull_policy: KubernetesImagePullPolicy,
93 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
96 pub coverage: bool,
98 pub ephemeral_volume_storage_class: Option<String>,
104 pub service_fs_group: Option<i64>,
106 pub name_prefix: Option<String>,
108 pub collect_pod_metrics: bool,
110 pub enable_prometheus_scrape_annotations: bool,
112}
113
114impl KubernetesOrchestratorConfig {
115 pub fn name_prefix(&self) -> String {
116 self.name_prefix.clone().unwrap_or_default()
117 }
118}
119
120#[derive(ValueEnum, Debug, Clone, Copy)]
122pub enum KubernetesImagePullPolicy {
123 Always,
125 IfNotPresent,
127 Never,
129}
130
131impl fmt::Display for KubernetesImagePullPolicy {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 match self {
134 KubernetesImagePullPolicy::Always => f.write_str("Always"),
135 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
136 KubernetesImagePullPolicy::Never => f.write_str("Never"),
137 }
138 }
139}
140
141impl KubernetesImagePullPolicy {
142 pub fn as_kebab_case_str(&self) -> &'static str {
143 match self {
144 Self::Always => "always",
145 Self::IfNotPresent => "if-not-present",
146 Self::Never => "never",
147 }
148 }
149}
150
151pub struct KubernetesOrchestrator {
153 client: Client,
154 kubernetes_namespace: String,
155 config: KubernetesOrchestratorConfig,
156 secret_api: Api<Secret>,
157 vpc_endpoint_api: Api<VpcEndpoint>,
158 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
159 resource_reader: Arc<KubernetesResourceReader>,
160}
161
162impl fmt::Debug for KubernetesOrchestrator {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 f.debug_struct("KubernetesOrchestrator").finish()
165 }
166}
167
168impl KubernetesOrchestrator {
169 pub async fn new(
171 config: KubernetesOrchestratorConfig,
172 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
173 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
174 let resource_reader =
175 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
176 Ok(KubernetesOrchestrator {
177 client: client.clone(),
178 kubernetes_namespace,
179 config,
180 secret_api: Api::default_namespaced(client.clone()),
181 vpc_endpoint_api: Api::default_namespaced(client),
182 namespaces: Mutex::new(BTreeMap::new()),
183 resource_reader,
184 })
185 }
186}
187
188impl Orchestrator for KubernetesOrchestrator {
189 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
190 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
191 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
192 let (command_tx, command_rx) = mpsc::unbounded_channel();
193 let worker = OrchestratorWorker {
194 metrics_api: Api::default_namespaced(self.client.clone()),
195 service_api: Api::default_namespaced(self.client.clone()),
196 stateful_set_api: Api::default_namespaced(self.client.clone()),
197 pod_api: Api::default_namespaced(self.client.clone()),
198 owner_references: vec![],
199 command_rx,
200 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
201 collect_pod_metrics: self.config.collect_pod_metrics,
202 }
203 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
204
205 Arc::new(NamespacedKubernetesOrchestrator {
206 pod_api: Api::default_namespaced(self.client.clone()),
207 kubernetes_namespace: self.kubernetes_namespace.clone(),
208 namespace: namespace.into(),
209 config: self.config.clone(),
210 scheduling_config: Default::default(),
212 service_infos: std::sync::Mutex::new(BTreeMap::new()),
213 command_tx,
214 _worker: worker,
215 })
216 }))
217 }
218}
219
220#[derive(Clone, Copy)]
221struct ServiceInfo {
222 scale: NonZero<u16>,
223}
224
225struct NamespacedKubernetesOrchestrator {
226 pod_api: Api<Pod>,
227 kubernetes_namespace: String,
228 namespace: String,
229 config: KubernetesOrchestratorConfig,
230 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
231 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
232 command_tx: mpsc::UnboundedSender<WorkerCommand>,
233 _worker: AbortOnDropHandle<()>,
234}
235
236impl fmt::Debug for NamespacedKubernetesOrchestrator {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 f.debug_struct("NamespacedKubernetesOrchestrator")
239 .field("kubernetes_namespace", &self.kubernetes_namespace)
240 .field("namespace", &self.namespace)
241 .field("config", &self.config)
242 .finish()
243 }
244}
245
246enum WorkerCommand {
252 EnsureService {
253 desc: ServiceDescription,
254 },
255 DropService {
256 name: String,
257 },
258 ListServices {
259 namespace: String,
260 result_tx: oneshot::Sender<Vec<String>>,
261 },
262 FetchServiceMetrics {
263 name: String,
264 info: ServiceInfo,
265 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
266 },
267}
268
269#[derive(Debug, Clone)]
271struct ServiceDescription {
272 name: String,
273 scale: NonZero<u16>,
274 service: K8sService,
275 stateful_set: StatefulSet,
276 pod_template_hash: String,
277}
278
279struct OrchestratorWorker {
291 metrics_api: Api<PodMetrics>,
292 service_api: Api<K8sService>,
293 stateful_set_api: Api<StatefulSet>,
294 pod_api: Api<Pod>,
295 owner_references: Vec<OwnerReference>,
296 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
297 name_prefix: String,
298 collect_pod_metrics: bool,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainer {
303 pub name: String,
304 pub usage: PodMetricsContainerUsage,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetricsContainerUsage {
309 pub cpu: Quantity,
310 pub memory: Quantity,
311}
312
313#[derive(Deserialize, Clone, Debug)]
314pub struct PodMetrics {
315 pub metadata: ObjectMeta,
316 pub timestamp: String,
317 pub window: String,
318 pub containers: Vec<PodMetricsContainer>,
319}
320
321impl k8s_openapi::Resource for PodMetrics {
322 const GROUP: &'static str = "metrics.k8s.io";
323 const KIND: &'static str = "PodMetrics";
324 const VERSION: &'static str = "v1beta1";
325 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
326 const URL_PATH_SEGMENT: &'static str = "pods";
327
328 type Scope = k8s_openapi::NamespaceResourceScope;
329}
330
331impl k8s_openapi::Metadata for PodMetrics {
332 type Ty = ObjectMeta;
333
334 fn metadata(&self) -> &Self::Ty {
335 &self.metadata
336 }
337
338 fn metadata_mut(&mut self) -> &mut Self::Ty {
339 &mut self.metadata
340 }
341}
342
343#[derive(Deserialize, Clone, Debug)]
352pub struct MetricIdentifier {
353 #[serde(rename = "metricName")]
354 pub name: String,
355 }
357
358#[derive(Deserialize, Clone, Debug)]
359pub struct MetricValue {
360 #[serde(rename = "describedObject")]
361 pub described_object: ObjectReference,
362 #[serde(flatten)]
363 pub metric_identifier: MetricIdentifier,
364 pub timestamp: String,
365 pub value: Quantity,
366 }
368
369impl NamespacedKubernetesOrchestrator {
370 fn service_name(&self, id: &str) -> String {
371 format!(
372 "{}{}-{id}",
373 self.config.name_prefix.as_deref().unwrap_or(""),
374 self.namespace
375 )
376 }
377
378 fn watch_pod_params(&self) -> watcher::Config {
381 let ns_selector = format!(
382 "environmentd.materialize.cloud/namespace={}",
383 self.namespace
384 );
385 watcher::Config::default().timeout(59).labels(&ns_selector)
387 }
388
389 fn make_label_key(&self, key: &str) -> String {
392 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
393 }
394
395 fn label_selector_to_k8s(
396 &self,
397 MzLabelSelector { label_name, logic }: MzLabelSelector,
398 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
399 let (operator, values) = match logic {
400 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
401 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
402 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
403 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
404 LabelSelectionLogic::InSet { values } => {
405 if values.is_empty() {
406 Err(anyhow!(
407 "Invalid selector logic for {label_name}: empty `in` set"
408 ))
409 } else {
410 Ok(("In", values))
411 }
412 }
413 LabelSelectionLogic::NotInSet { values } => {
414 if values.is_empty() {
415 Err(anyhow!(
416 "Invalid selector logic for {label_name}: empty `notin` set"
417 ))
418 } else {
419 Ok(("NotIn", values))
420 }
421 }
422 }?;
423 let lsr = LabelSelectorRequirement {
424 key: self.make_label_key(&label_name),
425 operator: operator.to_string(),
426 values: Some(values),
427 };
428 Ok(lsr)
429 }
430
431 fn send_command(&self, cmd: WorkerCommand) {
432 self.command_tx.send(cmd).expect("worker task not dropped");
433 }
434}
435
436#[derive(Debug)]
437struct ScaledQuantity {
438 integral_part: u64,
439 exponent: i8,
440 base10: bool,
441}
442
443impl ScaledQuantity {
444 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
445 if base10 != self.base10 {
446 return None;
447 }
448 let exponent = self.exponent - scale;
449 let mut result = self.integral_part;
450 let base = if self.base10 { 10 } else { 2 };
451 if exponent < 0 {
452 for _ in exponent..0 {
453 result /= base;
454 }
455 } else {
456 for _ in 0..exponent {
457 result = result.checked_mul(base)?;
458 }
459 }
460 Some(result)
461 }
462}
463
464fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
474 const DEC_SUFFIXES: &[(&str, i8)] = &[
475 ("n", -9),
476 ("u", -6),
477 ("m", -3),
478 ("", 0),
479 ("k", 3), ("M", 6),
481 ("G", 9),
482 ("T", 12),
483 ("P", 15),
484 ("E", 18),
485 ];
486 const BIN_SUFFIXES: &[(&str, i8)] = &[
487 ("", 0),
488 ("Ki", 10),
489 ("Mi", 20),
490 ("Gi", 30),
491 ("Ti", 40),
492 ("Pi", 50),
493 ("Ei", 60),
494 ];
495
496 let (positive, s) = match s.chars().next() {
497 Some('+') => (true, &s[1..]),
498 Some('-') => (false, &s[1..]),
499 _ => (true, s),
500 };
501
502 if !positive {
503 anyhow::bail!("Negative numbers not supported")
504 }
505
506 fn is_suffix_char(ch: char) -> bool {
507 "numkMGTPEKi".contains(ch)
508 }
509 let (num, suffix) = match s.find(is_suffix_char) {
510 None => (s, ""),
511 Some(idx) => s.split_at(idx),
512 };
513 let num: u64 = num.parse()?;
514 let (exponent, base10) = if let Some((_, exponent)) =
515 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
516 {
517 (exponent, true)
518 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
519 (exponent, false)
520 } else {
521 anyhow::bail!("Unrecognized suffix: {suffix}");
522 };
523 Ok(ScaledQuantity {
524 integral_part: num,
525 exponent: *exponent,
526 base10,
527 })
528}
529
530#[async_trait]
531impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
532 async fn fetch_service_metrics(
533 &self,
534 id: &str,
535 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
536 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
537 *info
538 } else {
539 tracing::error!("Failed to get info for {id}");
541 anyhow::bail!("Failed to get info for {id}");
542 };
543
544 let (result_tx, result_rx) = oneshot::channel();
545 self.send_command(WorkerCommand::FetchServiceMetrics {
546 name: self.service_name(id),
547 info,
548 result_tx,
549 });
550
551 let metrics = result_rx.await.expect("worker task not dropped");
552 Ok(metrics)
553 }
554
555 fn ensure_service(
556 &self,
557 id: &str,
558 ServiceConfig {
559 image,
560 init_container_image,
561 args,
562 ports: ports_in,
563 memory_limit,
564 memory_request,
565 cpu_limit,
566 scale,
567 labels: labels_in,
568 annotations: annotations_in,
569 availability_zones,
570 other_replicas_selector,
571 replicas_selector,
572 disk_limit,
573 node_selector,
574 }: ServiceConfig,
575 ) -> Result<Box<dyn Service>, anyhow::Error> {
576 let scheduling_config: ServiceSchedulingConfig =
578 self.scheduling_config.read().expect("poisoned").clone();
579
580 let disk = disk_limit != Some(DiskLimit::ZERO);
582
583 let name = self.service_name(id);
584 let mut match_labels = btreemap! {
590 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
591 "environmentd.materialize.cloud/service-id".into() => id.into(),
592 };
593 for (key, value) in &self.config.service_labels {
594 match_labels.insert(key.clone(), value.clone());
595 }
596
597 let mut labels = match_labels.clone();
598 for (key, value) in labels_in {
599 labels.insert(self.make_label_key(&key), value);
600 }
601
602 labels.insert(self.make_label_key("scale"), scale.to_string());
603
604 for port in &ports_in {
605 labels.insert(
606 format!("environmentd.materialize.cloud/port-{}", port.name),
607 "true".into(),
608 );
609 }
610 let mut limits = BTreeMap::new();
611 let mut requests = BTreeMap::new();
612 if let Some(memory_limit) = memory_limit {
613 limits.insert(
614 "memory".into(),
615 Quantity(memory_limit.0.as_u64().to_string()),
616 );
617 requests.insert(
618 "memory".into(),
619 Quantity(memory_limit.0.as_u64().to_string()),
620 );
621 }
622 if let Some(memory_request) = memory_request {
623 requests.insert(
624 "memory".into(),
625 Quantity(memory_request.0.as_u64().to_string()),
626 );
627 }
628 if let Some(cpu_limit) = cpu_limit {
629 limits.insert(
630 "cpu".into(),
631 Quantity(format!("{}m", cpu_limit.as_millicpus())),
632 );
633 requests.insert(
634 "cpu".into(),
635 Quantity(format!("{}m", cpu_limit.as_millicpus())),
636 );
637 }
638 let service = K8sService {
639 metadata: ObjectMeta {
640 name: Some(name.clone()),
641 ..Default::default()
642 },
643 spec: Some(ServiceSpec {
644 ports: Some(
645 ports_in
646 .iter()
647 .map(|port| ServicePort {
648 port: port.port_hint.into(),
649 name: Some(port.name.clone()),
650 ..Default::default()
651 })
652 .collect(),
653 ),
654 cluster_ip: Some("None".to_string()),
655 selector: Some(match_labels.clone()),
656 ..Default::default()
657 }),
658 status: None,
659 };
660
661 let hosts = (0..scale.get())
662 .map(|i| {
663 format!(
664 "{name}-{i}.{name}.{}.svc.cluster.local",
665 self.kubernetes_namespace
666 )
667 })
668 .collect::<Vec<_>>();
669 let ports = ports_in
670 .iter()
671 .map(|p| (p.name.clone(), p.port_hint))
672 .collect::<BTreeMap<_, _>>();
673
674 let mut listen_addrs = BTreeMap::new();
675 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
676 for (name, port) in &ports {
677 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
678 for (i, host) in hosts.iter().enumerate() {
679 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
680 }
681 }
682 let mut args = args(ServiceAssignments {
683 listen_addrs: &listen_addrs,
684 peer_addrs: &peer_addrs,
685 });
686
687 let anti_affinity = Some({
696 let label_selector_requirements = other_replicas_selector
697 .clone()
698 .into_iter()
699 .map(|ls| self.label_selector_to_k8s(ls))
700 .collect::<Result<Vec<_>, _>>()?;
701 let ls = LabelSelector {
702 match_expressions: Some(label_selector_requirements),
703 ..Default::default()
704 };
705 let pat = PodAffinityTerm {
706 label_selector: Some(ls),
707 topology_key: "kubernetes.io/hostname".to_string(),
708 ..Default::default()
709 };
710
711 if !scheduling_config.soften_replication_anti_affinity {
712 PodAntiAffinity {
713 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
714 ..Default::default()
715 }
716 } else {
717 PodAntiAffinity {
718 preferred_during_scheduling_ignored_during_execution: Some(vec![
719 WeightedPodAffinityTerm {
720 weight: scheduling_config.soften_replication_anti_affinity_weight,
721 pod_affinity_term: pat,
722 },
723 ]),
724 ..Default::default()
725 }
726 }
727 });
728
729 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
730 let ls = LabelSelector {
732 match_labels: Some(match_labels.clone()),
733 ..Default::default()
734 };
735 let pat = PodAffinityTerm {
736 label_selector: Some(ls),
737 topology_key: "topology.kubernetes.io/zone".to_string(),
738 ..Default::default()
739 };
740
741 Some(PodAffinity {
742 preferred_during_scheduling_ignored_during_execution: Some(vec![
743 WeightedPodAffinityTerm {
744 weight,
745 pod_affinity_term: pat,
746 },
747 ]),
748 ..Default::default()
749 })
750 } else {
751 None
752 };
753
754 let topology_spread = if scheduling_config.topology_spread.enabled {
755 let config = &scheduling_config.topology_spread;
756
757 if !config.ignore_non_singular_scale || scale.get() == 1 {
758 let label_selector_requirements = (if config.ignore_non_singular_scale {
759 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
760
761 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
762 label_name: "scale".into(),
763 logic: mz_orchestrator::LabelSelectionLogic::Eq {
764 value: "1".to_string(),
765 },
766 });
767
768 replicas_selector_ignoring_scale
769 } else {
770 replicas_selector
771 })
772 .into_iter()
773 .map(|ls| self.label_selector_to_k8s(ls))
774 .collect::<Result<Vec<_>, _>>()?;
775 let ls = LabelSelector {
776 match_expressions: Some(label_selector_requirements),
777 ..Default::default()
778 };
779
780 let constraint = TopologySpreadConstraint {
781 label_selector: Some(ls),
782 min_domains: config.min_domains,
783 max_skew: config.max_skew,
784 topology_key: "topology.kubernetes.io/zone".to_string(),
785 when_unsatisfiable: if config.soft {
786 "ScheduleAnyway".to_string()
787 } else {
788 "DoNotSchedule".to_string()
789 },
790 match_label_keys: None,
800 ..Default::default()
803 };
804 Some(vec![constraint])
805 } else {
806 None
807 }
808 } else {
809 None
810 };
811
812 let mut pod_annotations = btreemap! {
813 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
819 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
820
821 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
823 };
824 for (key, value) in annotations_in {
825 pod_annotations.insert(self.make_label_key(&key), value);
827 }
828 if self.config.enable_prometheus_scrape_annotations {
829 if let Some(internal_http_port) = ports_in
830 .iter()
831 .find(|port| port.name == "internal-http")
832 .map(|port| port.port_hint.to_string())
833 {
834 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
836 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
837 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
838 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
839 }
840 }
841 for (key, value) in &self.config.service_annotations {
842 pod_annotations.insert(key.clone(), value.clone());
843 }
844
845 let default_node_selector = if disk {
846 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
847 } else {
848 vec![]
852 };
853
854 let node_selector: BTreeMap<String, String> = default_node_selector
855 .into_iter()
856 .chain(self.config.service_node_selector.clone())
857 .chain(node_selector)
858 .collect();
859
860 let node_affinity = if let Some(availability_zones) = availability_zones {
861 let selector = NodeSelectorTerm {
862 match_expressions: Some(vec![NodeSelectorRequirement {
863 key: "materialize.cloud/availability-zone".to_string(),
864 operator: "In".to_string(),
865 values: Some(availability_zones),
866 }]),
867 match_fields: None,
868 };
869
870 if scheduling_config.soften_az_affinity {
871 Some(NodeAffinity {
872 preferred_during_scheduling_ignored_during_execution: Some(vec![
873 PreferredSchedulingTerm {
874 preference: selector,
875 weight: scheduling_config.soften_az_affinity_weight,
876 },
877 ]),
878 required_during_scheduling_ignored_during_execution: None,
879 })
880 } else {
881 Some(NodeAffinity {
882 preferred_during_scheduling_ignored_during_execution: None,
883 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
884 node_selector_terms: vec![selector],
885 }),
886 })
887 }
888 } else {
889 None
890 };
891
892 let mut affinity = Affinity {
893 pod_anti_affinity: anti_affinity,
894 pod_affinity,
895 node_affinity,
896 ..Default::default()
897 };
898 if let Some(service_affinity) = &self.config.service_affinity {
899 affinity.merge_from(serde_json::from_str(service_affinity)?);
900 }
901
902 let container_name = image
903 .rsplit_once('/')
904 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
905 .context("`image` is not ORG/NAME:VERSION")?
906 .0
907 .to_string();
908
909 let container_security_context = if scheduling_config.security_context_enabled {
910 Some(SecurityContext {
911 privileged: Some(false),
912 run_as_non_root: Some(true),
913 allow_privilege_escalation: Some(false),
914 seccomp_profile: Some(SeccompProfile {
915 type_: "RuntimeDefault".to_string(),
916 ..Default::default()
917 }),
918 capabilities: Some(Capabilities {
919 drop: Some(vec!["ALL".to_string()]),
920 ..Default::default()
921 }),
922 ..Default::default()
923 })
924 } else {
925 None
926 };
927
928 let init_containers = init_container_image.map(|image| {
929 vec![Container {
930 name: "init".to_string(),
931 image: Some(image),
932 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
933 resources: Some(ResourceRequirements {
934 claims: None,
935 limits: Some(limits.clone()),
936 requests: Some(requests.clone()),
937 }),
938 security_context: container_security_context.clone(),
939 env: Some(vec![
940 EnvVar {
941 name: "MZ_NAMESPACE".to_string(),
942 value_from: Some(EnvVarSource {
943 field_ref: Some(ObjectFieldSelector {
944 field_path: "metadata.namespace".to_string(),
945 ..Default::default()
946 }),
947 ..Default::default()
948 }),
949 ..Default::default()
950 },
951 EnvVar {
952 name: "MZ_POD_NAME".to_string(),
953 value_from: Some(EnvVarSource {
954 field_ref: Some(ObjectFieldSelector {
955 field_path: "metadata.name".to_string(),
956 ..Default::default()
957 }),
958 ..Default::default()
959 }),
960 ..Default::default()
961 },
962 EnvVar {
963 name: "MZ_NODE_NAME".to_string(),
964 value_from: Some(EnvVarSource {
965 field_ref: Some(ObjectFieldSelector {
966 field_path: "spec.nodeName".to_string(),
967 ..Default::default()
968 }),
969 ..Default::default()
970 }),
971 ..Default::default()
972 },
973 ]),
974 ..Default::default()
975 }]
976 });
977
978 let env = if self.config.coverage {
979 Some(vec![EnvVar {
980 name: "LLVM_PROFILE_FILE".to_string(),
981 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
982 ..Default::default()
983 }])
984 } else {
985 None
986 };
987
988 let mut volume_mounts = vec![];
989
990 if self.config.coverage {
991 volume_mounts.push(VolumeMount {
992 name: "coverage".to_string(),
993 mount_path: "/coverage".to_string(),
994 ..Default::default()
995 })
996 }
997
998 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
999 (true, Some(ephemeral_volume_storage_class)) => {
1000 volume_mounts.push(VolumeMount {
1001 name: "scratch".to_string(),
1002 mount_path: "/scratch".to_string(),
1003 ..Default::default()
1004 });
1005 args.push("--scratch-directory=/scratch".into());
1006
1007 Some(vec![Volume {
1008 name: "scratch".to_string(),
1009 ephemeral: Some(EphemeralVolumeSource {
1010 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1011 spec: PersistentVolumeClaimSpec {
1012 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1013 storage_class_name: Some(
1014 ephemeral_volume_storage_class.to_string(),
1015 ),
1016 resources: Some(VolumeResourceRequirements {
1017 requests: Some(BTreeMap::from([(
1018 "storage".to_string(),
1019 Quantity(
1020 disk_limit
1021 .unwrap_or(DiskLimit::ARBITRARY)
1022 .0
1023 .as_u64()
1024 .to_string(),
1025 ),
1026 )])),
1027 ..Default::default()
1028 }),
1029 ..Default::default()
1030 },
1031 ..Default::default()
1032 }),
1033 ..Default::default()
1034 }),
1035 ..Default::default()
1036 }])
1037 }
1038 (true, None) => {
1039 return Err(anyhow!(
1040 "service requested disk but no ephemeral volume storage class was configured"
1041 ));
1042 }
1043 (false, _) => None,
1044 };
1045
1046 if let Some(name_prefix) = &self.config.name_prefix {
1047 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1048 }
1049
1050 let volume_claim_templates = if self.config.coverage {
1051 Some(vec![PersistentVolumeClaim {
1052 metadata: ObjectMeta {
1053 name: Some("coverage".to_string()),
1054 ..Default::default()
1055 },
1056 spec: Some(PersistentVolumeClaimSpec {
1057 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1058 resources: Some(VolumeResourceRequirements {
1059 requests: Some(BTreeMap::from([(
1060 "storage".to_string(),
1061 Quantity("10Gi".to_string()),
1062 )])),
1063 ..Default::default()
1064 }),
1065 ..Default::default()
1066 }),
1067 ..Default::default()
1068 }])
1069 } else {
1070 None
1071 };
1072
1073 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1074 Some(PodSecurityContext {
1075 fs_group: Some(fs_group),
1076 run_as_user: Some(fs_group),
1077 run_as_group: Some(fs_group),
1078 ..Default::default()
1079 })
1080 } else {
1081 None
1082 };
1083
1084 let mut tolerations = vec![
1085 Toleration {
1090 effect: Some("NoExecute".into()),
1091 key: Some("node.kubernetes.io/not-ready".into()),
1092 operator: Some("Exists".into()),
1093 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1094 value: None,
1095 },
1096 Toleration {
1097 effect: Some("NoExecute".into()),
1098 key: Some("node.kubernetes.io/unreachable".into()),
1099 operator: Some("Exists".into()),
1100 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1101 value: None,
1102 },
1103 ];
1104 if let Some(service_tolerations) = &self.config.service_tolerations {
1105 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1106 }
1107 let tolerations = Some(tolerations);
1108
1109 let mut pod_template_spec = PodTemplateSpec {
1110 metadata: Some(ObjectMeta {
1111 labels: Some(labels.clone()),
1112 ..Default::default()
1115 }),
1116 spec: Some(PodSpec {
1117 init_containers,
1118 containers: vec![Container {
1119 name: container_name,
1120 image: Some(image),
1121 args: Some(args),
1122 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1123 ports: Some(
1124 ports_in
1125 .iter()
1126 .map(|port| ContainerPort {
1127 container_port: port.port_hint.into(),
1128 name: Some(port.name.clone()),
1129 ..Default::default()
1130 })
1131 .collect(),
1132 ),
1133 security_context: container_security_context.clone(),
1134 resources: Some(ResourceRequirements {
1135 claims: None,
1136 limits: Some(limits),
1137 requests: Some(requests),
1138 }),
1139 volume_mounts: if !volume_mounts.is_empty() {
1140 Some(volume_mounts)
1141 } else {
1142 None
1143 },
1144 env,
1145 ..Default::default()
1146 }],
1147 volumes,
1148 security_context,
1149 node_selector: Some(node_selector),
1150 scheduler_name: self.config.scheduler_name.clone(),
1151 service_account: self.config.service_account.clone(),
1152 affinity: Some(affinity),
1153 topology_spread_constraints: topology_spread,
1154 tolerations,
1155 termination_grace_period_seconds: Some(0),
1177 ..Default::default()
1178 }),
1179 };
1180 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1181 let mut hasher = Sha256::new();
1182 hasher.update(pod_template_json);
1183 let pod_template_hash = format!("{:x}", hasher.finalize());
1184 pod_annotations.insert(
1185 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1186 pod_template_hash.clone(),
1187 );
1188
1189 pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1190
1191 let stateful_set = StatefulSet {
1192 metadata: ObjectMeta {
1193 name: Some(name.clone()),
1194 ..Default::default()
1195 },
1196 spec: Some(StatefulSetSpec {
1197 selector: LabelSelector {
1198 match_labels: Some(match_labels),
1199 ..Default::default()
1200 },
1201 service_name: Some(name.clone()),
1202 replicas: Some(scale.cast_into()),
1203 template: pod_template_spec,
1204 update_strategy: Some(StatefulSetUpdateStrategy {
1205 type_: Some("OnDelete".to_owned()),
1206 ..Default::default()
1207 }),
1208 pod_management_policy: Some("Parallel".to_string()),
1209 volume_claim_templates,
1210 ..Default::default()
1211 }),
1212 status: None,
1213 };
1214
1215 self.send_command(WorkerCommand::EnsureService {
1216 desc: ServiceDescription {
1217 name,
1218 scale,
1219 service,
1220 stateful_set,
1221 pod_template_hash,
1222 },
1223 });
1224
1225 self.service_infos
1226 .lock()
1227 .expect("poisoned lock")
1228 .insert(id.to_string(), ServiceInfo { scale });
1229
1230 Ok(Box::new(KubernetesService { hosts, ports }))
1231 }
1232
1233 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1235 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1236 self.service_infos.lock().expect("poisoned lock").remove(id);
1237
1238 self.send_command(WorkerCommand::DropService {
1239 name: self.service_name(id),
1240 });
1241
1242 Ok(())
1243 }
1244
1245 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1247 let (result_tx, result_rx) = oneshot::channel();
1248 self.send_command(WorkerCommand::ListServices {
1249 namespace: self.namespace.clone(),
1250 result_tx,
1251 });
1252
1253 let list = result_rx.await.expect("worker task not dropped");
1254 Ok(list)
1255 }
1256
1257 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1258 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1259 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1260 let service_id_label = "environmentd.materialize.cloud/service-id";
1261 let service_id = pod
1262 .labels()
1263 .get(service_id_label)
1264 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1265 .clone();
1266
1267 let oomed = pod
1268 .status
1269 .as_ref()
1270 .and_then(|status| status.container_statuses.as_ref())
1271 .map(|container_statuses| {
1272 container_statuses.iter().any(|cs| {
1273 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1277 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1278 let termination_state = current_state.or(last_state);
1279
1280 let exit_code = termination_state.map(|s| s.exit_code);
1287 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1288 })
1289 })
1290 .unwrap_or(false);
1291
1292 let (pod_ready, last_probe_time) = pod
1293 .status
1294 .and_then(|status| status.conditions)
1295 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1296 .map(|c| (c.status == "True", c.last_probe_time))
1297 .unwrap_or((false, None));
1298
1299 let status = if pod_ready {
1300 ServiceStatus::Online
1301 } else {
1302 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1303 };
1304 let time = if let Some(time) = last_probe_time {
1305 time.0
1306 } else {
1307 Timestamp::now()
1308 };
1309
1310 Ok(ServiceEvent {
1311 service_id,
1312 process_id,
1313 status,
1314 time: DateTime::from_timestamp_nanos(
1315 time.as_nanosecond().try_into().expect("must fit"),
1316 ),
1317 })
1318 }
1319
1320 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1321 .touched_objects()
1322 .filter_map(|object| async move {
1323 match object {
1324 Ok(pod) => Some(into_service_event(pod)),
1325 Err(error) => {
1326 tracing::warn!("service watch error: {error}");
1329 None
1330 }
1331 }
1332 });
1333 Box::pin(stream)
1334 }
1335
1336 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1337 *self.scheduling_config.write().expect("poisoned") = config;
1338 }
1339}
1340
1341impl OrchestratorWorker {
1342 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1343 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1344 }
1345
1346 async fn run(mut self) {
1347 {
1348 info!("initializing Kubernetes orchestrator worker");
1349 let start = Instant::now();
1350
1351 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1355 let orchestrator_pod = Retry::default()
1356 .clamp_backoff(Duration::from_secs(10))
1357 .retry_async(|_| self.pod_api.get(&hostname))
1358 .await
1359 .expect("always retries on error");
1360 self.owner_references
1361 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1362
1363 info!(
1364 "Kubernetes orchestrator worker initialized in {:?}",
1365 start.elapsed()
1366 );
1367 }
1368
1369 while let Some(cmd) = self.command_rx.recv().await {
1370 self.handle_command(cmd).await;
1371 }
1372 }
1373
1374 async fn handle_command(&self, cmd: WorkerCommand) {
1380 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1381 where
1382 F: Fn() -> U,
1383 U: Future<Output = Result<R, K8sError>>,
1384 {
1385 Retry::default()
1386 .clamp_backoff(Duration::from_secs(10))
1387 .retry_async(|_| {
1388 f().map_err(
1389 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1390 )
1391 })
1392 .await
1393 .expect("always retries on error")
1394 }
1395
1396 use WorkerCommand::*;
1397 match cmd {
1398 EnsureService { desc } => {
1399 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1400 }
1401 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1402 ListServices {
1403 namespace,
1404 result_tx,
1405 } => {
1406 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1407 let _ = result_tx.send(result);
1408 }
1409 FetchServiceMetrics {
1410 name,
1411 info,
1412 result_tx,
1413 } => {
1414 let result = self.fetch_service_metrics(&name, &info).await;
1415 let _ = result_tx.send(result);
1416 }
1417 }
1418 }
1419
1420 async fn fetch_service_metrics(
1421 &self,
1422 name: &str,
1423 info: &ServiceInfo,
1424 ) -> Vec<ServiceProcessMetrics> {
1425 if !self.collect_pod_metrics {
1426 return (0..info.scale.get())
1427 .map(|_| ServiceProcessMetrics::default())
1428 .collect();
1429 }
1430
1431 #[derive(Deserialize)]
1433 pub(crate) struct ClusterdUsage {
1434 disk_bytes: Option<u64>,
1435 memory_bytes: Option<u64>,
1436 swap_bytes: Option<u64>,
1437 heap_limit: Option<u64>,
1438 }
1439
1440 async fn get_metrics(
1446 self_: &OrchestratorWorker,
1447 service_name: &str,
1448 i: usize,
1449 ) -> ServiceProcessMetrics {
1450 let name = format!("{service_name}-{i}");
1451
1452 let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1453 let (metrics, clusterd_usage) =
1454 match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1455 {
1456 (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1457 (Ok(metrics), Err(e)) => {
1458 warn!("Failed to fetch clusterd usage for {name}: {e}");
1459 (metrics, None)
1460 }
1461 (Err(e), _) => {
1462 warn!("Failed to get metrics for {name}: {e}");
1463 return ServiceProcessMetrics::default();
1464 }
1465 };
1466 let Some(PodMetricsContainer {
1467 usage:
1468 PodMetricsContainerUsage {
1469 cpu: Quantity(cpu_str),
1470 memory: Quantity(mem_str),
1471 },
1472 ..
1473 }) = metrics.containers.get(0)
1474 else {
1475 warn!("metrics result contained no containers for {name}");
1476 return ServiceProcessMetrics::default();
1477 };
1478
1479 let mut process_metrics = ServiceProcessMetrics::default();
1480
1481 match parse_k8s_quantity(cpu_str) {
1482 Ok(q) => match q.try_to_integer(-9, true) {
1483 Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1484 None => error!("CPU value {q:?} out of range"),
1485 },
1486 Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1487 }
1488 match parse_k8s_quantity(mem_str) {
1489 Ok(q) => match q.try_to_integer(0, false) {
1490 Some(mem) => process_metrics.memory_bytes = Some(mem),
1491 None => error!("memory value {q:?} out of range"),
1492 },
1493 Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1494 }
1495
1496 if let Some(usage) = clusterd_usage {
1497 process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1503 (Some(disk), Some(swap)) => Some(disk + swap),
1504 (disk, swap) => disk.or(swap),
1505 };
1506
1507 process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1510 (Some(memory), Some(swap)) => Some(memory + swap),
1511 (Some(memory), None) => Some(memory),
1512 (None, _) => None,
1513 };
1514
1515 process_metrics.heap_limit = usage.heap_limit;
1516 }
1517
1518 process_metrics
1519 }
1520
1521 async fn get_clusterd_usage(
1527 self_: &OrchestratorWorker,
1528 service_name: &str,
1529 i: usize,
1530 ) -> anyhow::Result<ClusterdUsage> {
1531 let service = self_
1532 .service_api
1533 .get(service_name)
1534 .await
1535 .with_context(|| format!("failed to get service {service_name}"))?;
1536 let namespace = service
1537 .metadata
1538 .namespace
1539 .context("missing service namespace")?;
1540 let internal_http_port = service
1541 .spec
1542 .and_then(|spec| spec.ports)
1543 .and_then(|ports| {
1544 ports
1545 .into_iter()
1546 .find(|p| p.name == Some("internal-http".into()))
1547 })
1548 .map(|p| p.port);
1549 let Some(port) = internal_http_port else {
1550 bail!("internal-http port missing in service spec");
1551 };
1552 let metrics_url = format!(
1553 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1554 /api/usage-metrics"
1555 );
1556
1557 let http_client = reqwest::Client::builder()
1558 .timeout(Duration::from_secs(10))
1559 .build()
1560 .context("error building HTTP client")?;
1561 let resp = http_client.get(metrics_url).send().await?;
1562 let usage = resp.json().await?;
1563
1564 Ok(usage)
1565 }
1566
1567 let ret = futures::future::join_all(
1568 (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1569 );
1570
1571 ret.await
1572 }
1573
1574 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1575 desc.service
1581 .metadata
1582 .owner_references
1583 .get_or_insert(vec![])
1584 .extend(self.owner_references.iter().cloned());
1585 desc.stateful_set
1586 .metadata
1587 .owner_references
1588 .get_or_insert(vec![])
1589 .extend(self.owner_references.iter().cloned());
1590
1591 let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1592 let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1593 let pod_annotations = pod_metadata.annotations.clone();
1594
1595 self.service_api
1596 .patch(
1597 &desc.name,
1598 &PatchParams::apply(FIELD_MANAGER).force(),
1599 &Patch::Apply(desc.service),
1600 )
1601 .await?;
1602 self.stateful_set_api
1603 .patch(
1604 &desc.name,
1605 &PatchParams::apply(FIELD_MANAGER).force(),
1606 &Patch::Apply(desc.stateful_set),
1607 )
1608 .await?;
1609
1610 for pod_id in 0..desc.scale.get() {
1621 let pod_name = format!("{}-{pod_id}", desc.name);
1622 let pod = match self.pod_api.get(&pod_name).await {
1623 Ok(pod) => pod,
1624 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1626 Err(e) => return Err(e),
1627 };
1628
1629 let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1630 != Some(&desc.pod_template_hash)
1631 {
1632 self.pod_api
1633 .delete(&pod_name, &DeleteParams::default())
1634 .await
1635 .map(|_| ())
1636 } else {
1637 let metadata = ObjectMeta {
1638 annotations: pod_annotations.clone(),
1639 ..Default::default()
1640 }
1641 .into_request_partial::<Pod>();
1642 self.pod_api
1643 .patch_metadata(
1644 &pod_name,
1645 &PatchParams::apply(FIELD_MANAGER).force(),
1646 &Patch::Apply(&metadata),
1647 )
1648 .await
1649 .map(|_| ())
1650 };
1651
1652 match result {
1653 Ok(()) => (),
1654 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1656 Err(e) => return Err(e),
1657 }
1658 }
1659
1660 Ok(())
1661 }
1662
1663 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1664 let res = self
1665 .stateful_set_api
1666 .delete(name, &DeleteParams::default())
1667 .await;
1668 match res {
1669 Ok(_) => (),
1670 Err(K8sError::Api(e)) if e.code == 404 => (),
1671 Err(e) => return Err(e),
1672 }
1673
1674 let res = self
1675 .service_api
1676 .delete(name, &DeleteParams::default())
1677 .await;
1678 match res {
1679 Ok(_) => Ok(()),
1680 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1681 Err(e) => Err(e),
1682 }
1683 }
1684
1685 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1686 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1687 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1688 Ok(stateful_sets
1689 .into_iter()
1690 .filter_map(|ss| {
1691 ss.metadata
1692 .name
1693 .unwrap()
1694 .strip_prefix(&name_prefix)
1695 .map(Into::into)
1696 })
1697 .collect())
1698 }
1699}
1700
1701#[derive(Debug, Clone)]
1702struct KubernetesService {
1703 hosts: Vec<String>,
1704 ports: BTreeMap<String, u16>,
1705}
1706
1707impl Service for KubernetesService {
1708 fn addresses(&self, port: &str) -> Vec<String> {
1709 let port = self.ports[port];
1710 self.hosts
1711 .iter()
1712 .map(|host| format!("{host}:{port}"))
1713 .collect()
1714 }
1715}
1716
1717#[cfg(test)]
1718mod tests {
1719 use super::*;
1720
1721 #[mz_ore::test]
1722 fn k8s_quantity_base10_large() {
1723 let cases = &[
1724 ("42", 42),
1725 ("42k", 42000),
1726 ("42M", 42000000),
1727 ("42G", 42000000000),
1728 ("42T", 42000000000000),
1729 ("42P", 42000000000000000),
1730 ];
1731
1732 for (input, expected) in cases {
1733 let quantity = parse_k8s_quantity(input).unwrap();
1734 let number = quantity.try_to_integer(0, true).unwrap();
1735 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1736 }
1737 }
1738
1739 #[mz_ore::test]
1740 fn k8s_quantity_base10_small() {
1741 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1742
1743 for (input, expected) in cases {
1744 let quantity = parse_k8s_quantity(input).unwrap();
1745 let number = quantity.try_to_integer(-9, true).unwrap();
1746 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1747 }
1748 }
1749
1750 #[mz_ore::test]
1751 fn k8s_quantity_base2() {
1752 let cases = &[
1753 ("42Ki", 42 << 10),
1754 ("42Mi", 42 << 20),
1755 ("42Gi", 42 << 30),
1756 ("42Ti", 42 << 40),
1757 ("42Pi", 42 << 50),
1758 ];
1759
1760 for (input, expected) in cases {
1761 let quantity = parse_k8s_quantity(input).unwrap();
1762 let number = quantity.try_to_integer(0, false).unwrap();
1763 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1764 }
1765 }
1766}