1use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow, bail};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::DeepMerge;
24use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
25use k8s_openapi::api::core::v1::{
26 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
27 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
28 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
29 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
30 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
31 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
32 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
33 WeightedPodAffinityTerm,
34};
35use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
36use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
37 LabelSelector, LabelSelectorRequirement, OwnerReference,
38};
39use kube::ResourceExt;
40use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
41use kube::client::Client;
42use kube::error::Error as K8sError;
43use kube::runtime::{WatchStreamExt, watcher};
44use maplit::btreemap;
45use mz_cloud_resources::AwsExternalIdPrefix;
46use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
47use mz_orchestrator::{
48 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
49 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
50 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
51};
52use mz_ore::retry::Retry;
53use mz_ore::task::AbortOnDropHandle;
54use serde::Deserialize;
55use sha2::{Digest, Sha256};
56use tokio::sync::{mpsc, oneshot};
57use tracing::{info, warn};
58
59pub mod cloud_resource_controller;
60pub mod secrets;
61pub mod util;
62
63const FIELD_MANAGER: &str = "environmentd";
64const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
65
66const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
67
68#[derive(Debug, Clone)]
70pub struct KubernetesOrchestratorConfig {
71 pub context: String,
74 pub scheduler_name: Option<String>,
76 pub service_labels: BTreeMap<String, String>,
78 pub service_node_selector: BTreeMap<String, String>,
80 pub service_affinity: Option<String>,
82 pub service_tolerations: Option<String>,
84 pub service_account: Option<String>,
86 pub image_pull_policy: KubernetesImagePullPolicy,
88 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
91 pub coverage: bool,
93 pub ephemeral_volume_storage_class: Option<String>,
99 pub service_fs_group: Option<i64>,
101 pub name_prefix: Option<String>,
103 pub collect_pod_metrics: bool,
105 pub enable_prometheus_scrape_annotations: bool,
107}
108
109impl KubernetesOrchestratorConfig {
110 pub fn name_prefix(&self) -> String {
111 self.name_prefix.clone().unwrap_or_default()
112 }
113}
114
115#[derive(ValueEnum, Debug, Clone, Copy)]
117pub enum KubernetesImagePullPolicy {
118 Always,
120 IfNotPresent,
122 Never,
124}
125
126impl fmt::Display for KubernetesImagePullPolicy {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 KubernetesImagePullPolicy::Always => f.write_str("Always"),
130 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
131 KubernetesImagePullPolicy::Never => f.write_str("Never"),
132 }
133 }
134}
135
136impl KubernetesImagePullPolicy {
137 pub fn as_kebab_case_str(&self) -> &'static str {
138 match self {
139 Self::Always => "always",
140 Self::IfNotPresent => "if-not-present",
141 Self::Never => "never",
142 }
143 }
144}
145
146pub struct KubernetesOrchestrator {
148 client: Client,
149 kubernetes_namespace: String,
150 config: KubernetesOrchestratorConfig,
151 secret_api: Api<Secret>,
152 vpc_endpoint_api: Api<VpcEndpoint>,
153 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
154 resource_reader: Arc<KubernetesResourceReader>,
155}
156
157impl fmt::Debug for KubernetesOrchestrator {
158 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
159 f.debug_struct("KubernetesOrchestrator").finish()
160 }
161}
162
163impl KubernetesOrchestrator {
164 pub async fn new(
166 config: KubernetesOrchestratorConfig,
167 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
168 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
169 let resource_reader =
170 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
171 Ok(KubernetesOrchestrator {
172 client: client.clone(),
173 kubernetes_namespace,
174 config,
175 secret_api: Api::default_namespaced(client.clone()),
176 vpc_endpoint_api: Api::default_namespaced(client),
177 namespaces: Mutex::new(BTreeMap::new()),
178 resource_reader,
179 })
180 }
181}
182
183impl Orchestrator for KubernetesOrchestrator {
184 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
185 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
186 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
187 let (command_tx, command_rx) = mpsc::unbounded_channel();
188 let worker = OrchestratorWorker {
189 metrics_api: Api::default_namespaced(self.client.clone()),
190 service_api: Api::default_namespaced(self.client.clone()),
191 stateful_set_api: Api::default_namespaced(self.client.clone()),
192 pod_api: Api::default_namespaced(self.client.clone()),
193 owner_references: vec![],
194 command_rx,
195 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
196 collect_pod_metrics: self.config.collect_pod_metrics,
197 }
198 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
199
200 Arc::new(NamespacedKubernetesOrchestrator {
201 pod_api: Api::default_namespaced(self.client.clone()),
202 kubernetes_namespace: self.kubernetes_namespace.clone(),
203 namespace: namespace.into(),
204 config: self.config.clone(),
205 scheduling_config: Default::default(),
207 service_infos: std::sync::Mutex::new(BTreeMap::new()),
208 command_tx,
209 _worker: worker,
210 })
211 }))
212 }
213}
214
215#[derive(Clone, Copy)]
216struct ServiceInfo {
217 scale: u16,
218 disk: bool,
219 disk_limit: Option<DiskLimit>,
220}
221
222struct NamespacedKubernetesOrchestrator {
223 pod_api: Api<Pod>,
224 kubernetes_namespace: String,
225 namespace: String,
226 config: KubernetesOrchestratorConfig,
227 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
228 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
229 command_tx: mpsc::UnboundedSender<WorkerCommand>,
230 _worker: AbortOnDropHandle<()>,
231}
232
233impl fmt::Debug for NamespacedKubernetesOrchestrator {
234 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235 f.debug_struct("NamespacedKubernetesOrchestrator")
236 .field("kubernetes_namespace", &self.kubernetes_namespace)
237 .field("namespace", &self.namespace)
238 .field("config", &self.config)
239 .finish()
240 }
241}
242
243enum WorkerCommand {
249 EnsureService {
250 desc: ServiceDescription,
251 },
252 DropService {
253 name: String,
254 },
255 ListServices {
256 namespace: String,
257 result_tx: oneshot::Sender<Vec<String>>,
258 },
259 FetchServiceMetrics {
260 name: String,
261 info: ServiceInfo,
262 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
263 },
264}
265
266#[derive(Debug, Clone)]
268struct ServiceDescription {
269 name: String,
270 scale: u16,
271 service: K8sService,
272 stateful_set: StatefulSet,
273 pod_template_hash: String,
274}
275
276struct OrchestratorWorker {
288 metrics_api: Api<PodMetrics>,
289 service_api: Api<K8sService>,
290 stateful_set_api: Api<StatefulSet>,
291 pod_api: Api<Pod>,
292 owner_references: Vec<OwnerReference>,
293 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
294 name_prefix: String,
295 collect_pod_metrics: bool,
296}
297
298#[derive(Deserialize, Clone, Debug)]
299pub struct PodMetricsContainer {
300 pub name: String,
301 pub usage: PodMetricsContainerUsage,
302}
303
304#[derive(Deserialize, Clone, Debug)]
305pub struct PodMetricsContainerUsage {
306 pub cpu: Quantity,
307 pub memory: Quantity,
308}
309
310#[derive(Deserialize, Clone, Debug)]
311pub struct PodMetrics {
312 pub metadata: ObjectMeta,
313 pub timestamp: String,
314 pub window: String,
315 pub containers: Vec<PodMetricsContainer>,
316}
317
318impl k8s_openapi::Resource for PodMetrics {
319 const GROUP: &'static str = "metrics.k8s.io";
320 const KIND: &'static str = "PodMetrics";
321 const VERSION: &'static str = "v1beta1";
322 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
323 const URL_PATH_SEGMENT: &'static str = "pods";
324
325 type Scope = k8s_openapi::NamespaceResourceScope;
326}
327
328impl k8s_openapi::Metadata for PodMetrics {
329 type Ty = ObjectMeta;
330
331 fn metadata(&self) -> &Self::Ty {
332 &self.metadata
333 }
334
335 fn metadata_mut(&mut self) -> &mut Self::Ty {
336 &mut self.metadata
337 }
338}
339
340#[derive(Deserialize, Clone, Debug)]
349pub struct MetricIdentifier {
350 #[serde(rename = "metricName")]
351 pub name: String,
352 }
354
355#[derive(Deserialize, Clone, Debug)]
356pub struct MetricValue {
357 #[serde(rename = "describedObject")]
358 pub described_object: ObjectReference,
359 #[serde(flatten)]
360 pub metric_identifier: MetricIdentifier,
361 pub timestamp: String,
362 pub value: Quantity,
363 }
365
366impl NamespacedKubernetesOrchestrator {
367 fn service_name(&self, id: &str) -> String {
368 format!(
369 "{}{}-{id}",
370 self.config.name_prefix.as_deref().unwrap_or(""),
371 self.namespace
372 )
373 }
374
375 fn watch_pod_params(&self) -> watcher::Config {
378 let ns_selector = format!(
379 "environmentd.materialize.cloud/namespace={}",
380 self.namespace
381 );
382 watcher::Config::default().labels(&ns_selector)
383 }
384
385 fn make_label_key(&self, key: &str) -> String {
388 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
389 }
390
391 fn label_selector_to_k8s(
392 &self,
393 MzLabelSelector { label_name, logic }: MzLabelSelector,
394 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
395 let (operator, values) = match logic {
396 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
397 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
398 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
399 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
400 LabelSelectionLogic::InSet { values } => {
401 if values.is_empty() {
402 Err(anyhow!(
403 "Invalid selector logic for {label_name}: empty `in` set"
404 ))
405 } else {
406 Ok(("In", values))
407 }
408 }
409 LabelSelectionLogic::NotInSet { values } => {
410 if values.is_empty() {
411 Err(anyhow!(
412 "Invalid selector logic for {label_name}: empty `notin` set"
413 ))
414 } else {
415 Ok(("NotIn", values))
416 }
417 }
418 }?;
419 let lsr = LabelSelectorRequirement {
420 key: self.make_label_key(&label_name),
421 operator: operator.to_string(),
422 values: Some(values),
423 };
424 Ok(lsr)
425 }
426
427 fn send_command(&self, cmd: WorkerCommand) {
428 self.command_tx.send(cmd).expect("worker task not dropped");
429 }
430}
431
432#[derive(Debug)]
433struct ScaledQuantity {
434 integral_part: u64,
435 exponent: i8,
436 base10: bool,
437}
438
439impl ScaledQuantity {
440 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
441 if base10 != self.base10 {
442 return None;
443 }
444 let exponent = self.exponent - scale;
445 let mut result = self.integral_part;
446 let base = if self.base10 { 10 } else { 2 };
447 if exponent < 0 {
448 for _ in exponent..0 {
449 result /= base;
450 }
451 } else {
452 for _ in 0..exponent {
453 result = result.checked_mul(base)?;
454 }
455 }
456 Some(result)
457 }
458}
459
460fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
470 const DEC_SUFFIXES: &[(&str, i8)] = &[
471 ("n", -9),
472 ("u", -6),
473 ("m", -3),
474 ("", 0),
475 ("k", 3), ("M", 6),
477 ("G", 9),
478 ("T", 12),
479 ("P", 15),
480 ("E", 18),
481 ];
482 const BIN_SUFFIXES: &[(&str, i8)] = &[
483 ("", 0),
484 ("Ki", 10),
485 ("Mi", 20),
486 ("Gi", 30),
487 ("Ti", 40),
488 ("Pi", 50),
489 ("Ei", 60),
490 ];
491
492 let (positive, s) = match s.chars().next() {
493 Some('+') => (true, &s[1..]),
494 Some('-') => (false, &s[1..]),
495 _ => (true, s),
496 };
497
498 if !positive {
499 anyhow::bail!("Negative numbers not supported")
500 }
501
502 fn is_suffix_char(ch: char) -> bool {
503 "numkMGTPEKi".contains(ch)
504 }
505 let (num, suffix) = match s.find(is_suffix_char) {
506 None => (s, ""),
507 Some(idx) => s.split_at(idx),
508 };
509 let num: u64 = num.parse()?;
510 let (exponent, base10) = if let Some((_, exponent)) =
511 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
512 {
513 (exponent, true)
514 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
515 (exponent, false)
516 } else {
517 anyhow::bail!("Unrecognized suffix: {suffix}");
518 };
519 Ok(ScaledQuantity {
520 integral_part: num,
521 exponent: *exponent,
522 base10,
523 })
524}
525
526#[async_trait]
527impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
528 async fn fetch_service_metrics(
529 &self,
530 id: &str,
531 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
532 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
533 *info
534 } else {
535 tracing::error!("Failed to get info for {id}");
537 anyhow::bail!("Failed to get info for {id}");
538 };
539
540 let (result_tx, result_rx) = oneshot::channel();
541 self.send_command(WorkerCommand::FetchServiceMetrics {
542 name: self.service_name(id),
543 info,
544 result_tx,
545 });
546
547 let metrics = result_rx.await.expect("worker task not dropped");
548 Ok(metrics)
549 }
550
551 fn ensure_service(
552 &self,
553 id: &str,
554 ServiceConfig {
555 image,
556 init_container_image,
557 args,
558 ports: ports_in,
559 memory_limit,
560 memory_request,
561 cpu_limit,
562 scale,
563 labels: labels_in,
564 availability_zones,
565 other_replicas_selector,
566 replicas_selector,
567 disk: disk_in,
568 disk_limit,
569 node_selector,
570 }: ServiceConfig,
571 ) -> Result<Box<dyn Service>, anyhow::Error> {
572 let scheduling_config: ServiceSchedulingConfig =
574 self.scheduling_config.read().expect("poisoned").clone();
575
576 let disk = {
580 let user_requested_disk = disk_in || scheduling_config.always_use_disk;
583 let size_disables_disk = disk_limit == Some(DiskLimit::ZERO);
587 user_requested_disk && !size_disables_disk
594 };
595
596 let name = self.service_name(id);
597 let match_labels = btreemap! {
603 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
604 "environmentd.materialize.cloud/service-id".into() => id.into(),
605 };
606 let mut labels = match_labels.clone();
607 for (key, value) in labels_in {
608 labels.insert(self.make_label_key(&key), value);
609 }
610
611 labels.insert(self.make_label_key("scale"), scale.to_string());
612
613 for port in &ports_in {
614 labels.insert(
615 format!("environmentd.materialize.cloud/port-{}", port.name),
616 "true".into(),
617 );
618 }
619 for (key, value) in &self.config.service_labels {
620 labels.insert(key.clone(), value.clone());
621 }
622 let mut limits = BTreeMap::new();
623 let mut requests = BTreeMap::new();
624 if let Some(memory_limit) = memory_limit {
625 limits.insert(
626 "memory".into(),
627 Quantity(memory_limit.0.as_u64().to_string()),
628 );
629 requests.insert(
630 "memory".into(),
631 Quantity(memory_limit.0.as_u64().to_string()),
632 );
633 }
634 if let Some(memory_request) = memory_request {
635 requests.insert(
636 "memory".into(),
637 Quantity(memory_request.0.as_u64().to_string()),
638 );
639 }
640 if let Some(cpu_limit) = cpu_limit {
641 limits.insert(
642 "cpu".into(),
643 Quantity(format!("{}m", cpu_limit.as_millicpus())),
644 );
645 requests.insert(
646 "cpu".into(),
647 Quantity(format!("{}m", cpu_limit.as_millicpus())),
648 );
649 }
650 let service = K8sService {
651 metadata: ObjectMeta {
652 name: Some(name.clone()),
653 ..Default::default()
654 },
655 spec: Some(ServiceSpec {
656 ports: Some(
657 ports_in
658 .iter()
659 .map(|port| ServicePort {
660 port: port.port_hint.into(),
661 name: Some(port.name.clone()),
662 ..Default::default()
663 })
664 .collect(),
665 ),
666 cluster_ip: Some("None".to_string()),
667 selector: Some(match_labels.clone()),
668 ..Default::default()
669 }),
670 status: None,
671 };
672
673 let hosts = (0..scale)
674 .map(|i| {
675 format!(
676 "{name}-{i}.{name}.{}.svc.cluster.local",
677 self.kubernetes_namespace
678 )
679 })
680 .collect::<Vec<_>>();
681 let ports = ports_in
682 .iter()
683 .map(|p| (p.name.clone(), p.port_hint))
684 .collect::<BTreeMap<_, _>>();
685
686 let mut listen_addrs = BTreeMap::new();
687 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
688 for (name, port) in &ports {
689 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
690 for (i, host) in hosts.iter().enumerate() {
691 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
692 }
693 }
694 let mut args = args(ServiceAssignments {
695 listen_addrs: &listen_addrs,
696 peer_addrs: &peer_addrs,
697 });
698
699 let anti_affinity = Some({
708 let label_selector_requirements = other_replicas_selector
709 .clone()
710 .into_iter()
711 .map(|ls| self.label_selector_to_k8s(ls))
712 .collect::<Result<Vec<_>, _>>()?;
713 let ls = LabelSelector {
714 match_expressions: Some(label_selector_requirements),
715 ..Default::default()
716 };
717 let pat = PodAffinityTerm {
718 label_selector: Some(ls),
719 topology_key: "kubernetes.io/hostname".to_string(),
720 ..Default::default()
721 };
722
723 if !scheduling_config.soften_replication_anti_affinity {
724 PodAntiAffinity {
725 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
726 ..Default::default()
727 }
728 } else {
729 PodAntiAffinity {
730 preferred_during_scheduling_ignored_during_execution: Some(vec![
731 WeightedPodAffinityTerm {
732 weight: scheduling_config.soften_replication_anti_affinity_weight,
733 pod_affinity_term: pat,
734 },
735 ]),
736 ..Default::default()
737 }
738 }
739 });
740
741 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
742 let ls = LabelSelector {
744 match_labels: Some(match_labels.clone()),
745 ..Default::default()
746 };
747 let pat = PodAffinityTerm {
748 label_selector: Some(ls),
749 topology_key: "topology.kubernetes.io/zone".to_string(),
750 ..Default::default()
751 };
752
753 Some(PodAffinity {
754 preferred_during_scheduling_ignored_during_execution: Some(vec![
755 WeightedPodAffinityTerm {
756 weight,
757 pod_affinity_term: pat,
758 },
759 ]),
760 ..Default::default()
761 })
762 } else {
763 None
764 };
765
766 let topology_spread = if scheduling_config.topology_spread.enabled {
767 let config = &scheduling_config.topology_spread;
768
769 if !config.ignore_non_singular_scale || scale <= 1 {
770 let label_selector_requirements = (if config.ignore_non_singular_scale {
771 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
772
773 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
774 label_name: "scale".into(),
775 logic: mz_orchestrator::LabelSelectionLogic::Eq {
776 value: "1".to_string(),
777 },
778 });
779
780 replicas_selector_ignoring_scale
781 } else {
782 replicas_selector
783 })
784 .into_iter()
785 .map(|ls| self.label_selector_to_k8s(ls))
786 .collect::<Result<Vec<_>, _>>()?;
787 let ls = LabelSelector {
788 match_expressions: Some(label_selector_requirements),
789 ..Default::default()
790 };
791
792 let constraint = TopologySpreadConstraint {
793 label_selector: Some(ls),
794 min_domains: config.min_domains,
795 max_skew: config.max_skew,
796 topology_key: "topology.kubernetes.io/zone".to_string(),
797 when_unsatisfiable: if config.soft {
798 "ScheduleAnyway".to_string()
799 } else {
800 "DoNotSchedule".to_string()
801 },
802 match_label_keys: None,
812 ..Default::default()
815 };
816 Some(vec![constraint])
817 } else {
818 None
819 }
820 } else {
821 None
822 };
823
824 let mut pod_annotations = btreemap! {
825 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
831 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
832
833 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
835 };
836 if self.config.enable_prometheus_scrape_annotations {
837 if let Some(internal_http_port) = ports_in
838 .iter()
839 .find(|port| port.name == "internal-http")
840 .map(|port| port.port_hint.to_string())
841 {
842 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
844 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
845 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
846 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
847 }
848 }
849
850 let default_node_selector = if disk {
851 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
852 } else {
853 vec![]
857 };
858
859 let node_selector: BTreeMap<String, String> = default_node_selector
860 .into_iter()
861 .chain(self.config.service_node_selector.clone())
862 .chain(node_selector)
863 .collect();
864
865 let node_affinity = if let Some(availability_zones) = availability_zones {
866 let selector = NodeSelectorTerm {
867 match_expressions: Some(vec![NodeSelectorRequirement {
868 key: "materialize.cloud/availability-zone".to_string(),
869 operator: "In".to_string(),
870 values: Some(availability_zones),
871 }]),
872 match_fields: None,
873 };
874
875 if scheduling_config.soften_az_affinity {
876 Some(NodeAffinity {
877 preferred_during_scheduling_ignored_during_execution: Some(vec![
878 PreferredSchedulingTerm {
879 preference: selector,
880 weight: scheduling_config.soften_az_affinity_weight,
881 },
882 ]),
883 required_during_scheduling_ignored_during_execution: None,
884 })
885 } else {
886 Some(NodeAffinity {
887 preferred_during_scheduling_ignored_during_execution: None,
888 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
889 node_selector_terms: vec![selector],
890 }),
891 })
892 }
893 } else {
894 None
895 };
896
897 let mut affinity = Affinity {
898 pod_anti_affinity: anti_affinity,
899 pod_affinity,
900 node_affinity,
901 ..Default::default()
902 };
903 if let Some(service_affinity) = &self.config.service_affinity {
904 affinity.merge_from(serde_json::from_str(service_affinity)?);
905 }
906
907 let container_name = image
908 .rsplit_once('/')
909 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
910 .context("`image` is not ORG/NAME:VERSION")?
911 .0
912 .to_string();
913
914 let container_security_context = if scheduling_config.security_context_enabled {
915 Some(SecurityContext {
916 privileged: Some(false),
917 run_as_non_root: Some(true),
918 allow_privilege_escalation: Some(false),
919 seccomp_profile: Some(SeccompProfile {
920 type_: "RuntimeDefault".to_string(),
921 ..Default::default()
922 }),
923 capabilities: Some(Capabilities {
924 drop: Some(vec!["ALL".to_string()]),
925 ..Default::default()
926 }),
927 ..Default::default()
928 })
929 } else {
930 None
931 };
932
933 let init_containers = init_container_image.map(|image| {
934 vec![Container {
935 name: "init".to_string(),
936 image: Some(image),
937 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
938 resources: Some(ResourceRequirements {
939 claims: None,
940 limits: Some(limits.clone()),
941 requests: Some(requests.clone()),
942 }),
943 security_context: container_security_context.clone(),
944 env: Some(vec![
945 EnvVar {
946 name: "MZ_NAMESPACE".to_string(),
947 value_from: Some(EnvVarSource {
948 field_ref: Some(ObjectFieldSelector {
949 field_path: "metadata.namespace".to_string(),
950 ..Default::default()
951 }),
952 ..Default::default()
953 }),
954 ..Default::default()
955 },
956 EnvVar {
957 name: "MZ_POD_NAME".to_string(),
958 value_from: Some(EnvVarSource {
959 field_ref: Some(ObjectFieldSelector {
960 field_path: "metadata.name".to_string(),
961 ..Default::default()
962 }),
963 ..Default::default()
964 }),
965 ..Default::default()
966 },
967 EnvVar {
968 name: "MZ_NODE_NAME".to_string(),
969 value_from: Some(EnvVarSource {
970 field_ref: Some(ObjectFieldSelector {
971 field_path: "spec.nodeName".to_string(),
972 ..Default::default()
973 }),
974 ..Default::default()
975 }),
976 ..Default::default()
977 },
978 ]),
979 ..Default::default()
980 }]
981 });
982
983 let env = if self.config.coverage {
984 Some(vec![EnvVar {
985 name: "LLVM_PROFILE_FILE".to_string(),
986 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
987 ..Default::default()
988 }])
989 } else {
990 None
991 };
992
993 let mut volume_mounts = vec![];
994
995 if self.config.coverage {
996 volume_mounts.push(VolumeMount {
997 name: "coverage".to_string(),
998 mount_path: "/coverage".to_string(),
999 ..Default::default()
1000 })
1001 }
1002
1003 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1004 (true, Some(ephemeral_volume_storage_class)) => {
1005 volume_mounts.push(VolumeMount {
1006 name: "scratch".to_string(),
1007 mount_path: "/scratch".to_string(),
1008 ..Default::default()
1009 });
1010 args.push("--scratch-directory=/scratch".into());
1011
1012 Some(vec![Volume {
1013 name: "scratch".to_string(),
1014 ephemeral: Some(EphemeralVolumeSource {
1015 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1016 spec: PersistentVolumeClaimSpec {
1017 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1018 storage_class_name: Some(
1019 ephemeral_volume_storage_class.to_string(),
1020 ),
1021 resources: Some(VolumeResourceRequirements {
1022 requests: Some(BTreeMap::from([(
1023 "storage".to_string(),
1024 Quantity(
1025 disk_limit
1026 .unwrap_or(DiskLimit::ARBITRARY)
1027 .0
1028 .as_u64()
1029 .to_string(),
1030 ),
1031 )])),
1032 ..Default::default()
1033 }),
1034 ..Default::default()
1035 },
1036 ..Default::default()
1037 }),
1038 ..Default::default()
1039 }),
1040 ..Default::default()
1041 }])
1042 }
1043 (true, None) => {
1044 return Err(anyhow!(
1045 "service requested disk but no ephemeral volume storage class was configured"
1046 ));
1047 }
1048 (false, _) => None,
1049 };
1050
1051 if let Some(name_prefix) = &self.config.name_prefix {
1052 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1053 }
1054
1055 let volume_claim_templates = if self.config.coverage {
1056 Some(vec![PersistentVolumeClaim {
1057 metadata: ObjectMeta {
1058 name: Some("coverage".to_string()),
1059 ..Default::default()
1060 },
1061 spec: Some(PersistentVolumeClaimSpec {
1062 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1063 resources: Some(VolumeResourceRequirements {
1064 requests: Some(BTreeMap::from([(
1065 "storage".to_string(),
1066 Quantity("10Gi".to_string()),
1067 )])),
1068 ..Default::default()
1069 }),
1070 ..Default::default()
1071 }),
1072 ..Default::default()
1073 }])
1074 } else {
1075 None
1076 };
1077
1078 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1079 Some(PodSecurityContext {
1080 fs_group: Some(fs_group),
1081 run_as_user: Some(fs_group),
1082 run_as_group: Some(fs_group),
1083 ..Default::default()
1084 })
1085 } else {
1086 None
1087 };
1088
1089 let mut tolerations = vec![
1090 Toleration {
1095 effect: Some("NoExecute".into()),
1096 key: Some("node.kubernetes.io/not-ready".into()),
1097 operator: Some("Exists".into()),
1098 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1099 value: None,
1100 },
1101 Toleration {
1102 effect: Some("NoExecute".into()),
1103 key: Some("node.kubernetes.io/unreachable".into()),
1104 operator: Some("Exists".into()),
1105 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1106 value: None,
1107 },
1108 ];
1109 if let Some(service_tolerations) = &self.config.service_tolerations {
1110 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1111 }
1112 let tolerations = Some(tolerations);
1113
1114 let mut pod_template_spec = PodTemplateSpec {
1115 metadata: Some(ObjectMeta {
1116 labels: Some(labels.clone()),
1117 annotations: Some(pod_annotations), ..Default::default()
1119 }),
1120 spec: Some(PodSpec {
1121 init_containers,
1122 containers: vec![Container {
1123 name: container_name,
1124 image: Some(image),
1125 args: Some(args),
1126 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1127 ports: Some(
1128 ports_in
1129 .iter()
1130 .map(|port| ContainerPort {
1131 container_port: port.port_hint.into(),
1132 name: Some(port.name.clone()),
1133 ..Default::default()
1134 })
1135 .collect(),
1136 ),
1137 security_context: container_security_context.clone(),
1138 resources: Some(ResourceRequirements {
1139 claims: None,
1140 limits: Some(limits),
1141 requests: Some(requests),
1142 }),
1143 volume_mounts: if !volume_mounts.is_empty() {
1144 Some(volume_mounts)
1145 } else {
1146 None
1147 },
1148 env,
1149 ..Default::default()
1150 }],
1151 volumes,
1152 security_context,
1153 node_selector: Some(node_selector),
1154 scheduler_name: self.config.scheduler_name.clone(),
1155 service_account: self.config.service_account.clone(),
1156 affinity: Some(affinity),
1157 topology_spread_constraints: topology_spread,
1158 tolerations,
1159 termination_grace_period_seconds: Some(0),
1181 ..Default::default()
1182 }),
1183 };
1184 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1185 let mut hasher = Sha256::new();
1186 hasher.update(pod_template_json);
1187 let pod_template_hash = format!("{:x}", hasher.finalize());
1188 pod_template_spec
1189 .metadata
1190 .as_mut()
1191 .unwrap()
1192 .annotations
1193 .as_mut()
1194 .unwrap()
1195 .insert(
1196 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1197 pod_template_hash.clone(),
1198 );
1199
1200 let stateful_set = StatefulSet {
1201 metadata: ObjectMeta {
1202 name: Some(name.clone()),
1203 ..Default::default()
1204 },
1205 spec: Some(StatefulSetSpec {
1206 selector: LabelSelector {
1207 match_labels: Some(match_labels),
1208 ..Default::default()
1209 },
1210 service_name: name.clone(),
1211 replicas: Some(scale.into()),
1212 template: pod_template_spec,
1213 pod_management_policy: Some("Parallel".to_string()),
1214 volume_claim_templates,
1215 ..Default::default()
1216 }),
1217 status: None,
1218 };
1219
1220 self.send_command(WorkerCommand::EnsureService {
1221 desc: ServiceDescription {
1222 name,
1223 scale,
1224 service,
1225 stateful_set,
1226 pod_template_hash,
1227 },
1228 });
1229
1230 self.service_infos.lock().expect("poisoned lock").insert(
1231 id.to_string(),
1232 ServiceInfo {
1233 scale,
1234 disk,
1235 disk_limit,
1236 },
1237 );
1238
1239 Ok(Box::new(KubernetesService { hosts, ports }))
1240 }
1241
1242 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1244 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1245 self.service_infos.lock().expect("poisoned lock").remove(id);
1246
1247 self.send_command(WorkerCommand::DropService {
1248 name: self.service_name(id),
1249 });
1250
1251 Ok(())
1252 }
1253
1254 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1256 let (result_tx, result_rx) = oneshot::channel();
1257 self.send_command(WorkerCommand::ListServices {
1258 namespace: self.namespace.clone(),
1259 result_tx,
1260 });
1261
1262 let list = result_rx.await.expect("worker task not dropped");
1263 Ok(list)
1264 }
1265
1266 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1267 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1268 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1269 let service_id_label = "environmentd.materialize.cloud/service-id";
1270 let service_id = pod
1271 .labels()
1272 .get(service_id_label)
1273 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1274 .clone();
1275
1276 let oomed = pod
1277 .status
1278 .as_ref()
1279 .and_then(|status| status.container_statuses.as_ref())
1280 .map(|container_statuses| {
1281 container_statuses.iter().any(|cs| {
1282 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1286 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1287 let termination_state = current_state.or(last_state);
1288
1289 let exit_code = termination_state.map(|s| s.exit_code);
1296 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1297 })
1298 })
1299 .unwrap_or(false);
1300
1301 let (pod_ready, last_probe_time) = pod
1302 .status
1303 .and_then(|status| status.conditions)
1304 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1305 .map(|c| (c.status == "True", c.last_probe_time))
1306 .unwrap_or((false, None));
1307
1308 let status = if pod_ready {
1309 ServiceStatus::Online
1310 } else {
1311 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1312 };
1313 let time = if let Some(time) = last_probe_time {
1314 time.0
1315 } else {
1316 Utc::now()
1317 };
1318
1319 Ok(ServiceEvent {
1320 service_id,
1321 process_id,
1322 status,
1323 time,
1324 })
1325 }
1326
1327 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1328 .touched_objects()
1329 .filter_map(|object| async move {
1330 match object {
1331 Ok(pod) => Some(into_service_event(pod)),
1332 Err(error) => {
1333 tracing::warn!("service watch error: {error}");
1336 None
1337 }
1338 }
1339 });
1340 Box::pin(stream)
1341 }
1342
1343 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1344 *self.scheduling_config.write().expect("poisoned") = config;
1345 }
1346}
1347
1348impl OrchestratorWorker {
1349 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1350 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1351 }
1352
1353 async fn run(mut self) {
1354 {
1355 info!("initializing Kubernetes orchestrator worker");
1356 let start = Instant::now();
1357
1358 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1362 let orchestrator_pod = Retry::default()
1363 .clamp_backoff(Duration::from_secs(10))
1364 .retry_async(|_| self.pod_api.get(&hostname))
1365 .await
1366 .expect("always retries on error");
1367 self.owner_references
1368 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1369
1370 info!(
1371 "Kubernetes orchestrator worker initialized in {:?}",
1372 start.elapsed()
1373 );
1374 }
1375
1376 while let Some(cmd) = self.command_rx.recv().await {
1377 self.handle_command(cmd).await;
1378 }
1379 }
1380
1381 async fn handle_command(&self, cmd: WorkerCommand) {
1387 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1388 where
1389 F: Fn() -> U,
1390 U: Future<Output = Result<R, K8sError>>,
1391 {
1392 Retry::default()
1393 .clamp_backoff(Duration::from_secs(10))
1394 .retry_async(|_| {
1395 f().map_err(
1396 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1397 )
1398 })
1399 .await
1400 .expect("always retries on error")
1401 }
1402
1403 use WorkerCommand::*;
1404 match cmd {
1405 EnsureService { desc } => {
1406 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1407 }
1408 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1409 ListServices {
1410 namespace,
1411 result_tx,
1412 } => {
1413 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1414 let _ = result_tx.send(result);
1415 }
1416 FetchServiceMetrics {
1417 name,
1418 info,
1419 result_tx,
1420 } => {
1421 let result = self.fetch_service_metrics(&name, &info).await;
1422 let _ = result_tx.send(result);
1423 }
1424 }
1425 }
1426
1427 async fn fetch_service_metrics(
1428 &self,
1429 name: &str,
1430 info: &ServiceInfo,
1431 ) -> Vec<ServiceProcessMetrics> {
1432 if !self.collect_pod_metrics {
1433 return (0..info.scale)
1434 .map(|_| ServiceProcessMetrics::default())
1435 .collect();
1436 }
1437
1438 async fn get_metrics(
1444 self_: &OrchestratorWorker,
1445 service_name: &str,
1446 i: usize,
1447 disk: bool,
1448 disk_limit: Option<DiskLimit>,
1449 ) -> ServiceProcessMetrics {
1450 let name = format!("{service_name}-{i}");
1451
1452 let disk_usage_fut = async {
1453 if disk {
1454 Some(get_disk_usage(self_, service_name, i).await)
1455 } else {
1456 None
1457 }
1458 };
1459 let (metrics, disk_usage) =
1460 match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await {
1461 (Ok(metrics), disk_usage) => {
1462 let disk_usage = match disk_usage {
1463 Some(Ok(disk_usage)) => Some(disk_usage),
1464 Some(Err(e)) => {
1465 warn!("Failed to fetch disk usage for {name}: {e}");
1466 None
1467 }
1468 _ => None,
1469 };
1470
1471 (metrics, disk_usage)
1472 }
1473 (Err(e), _) => {
1474 warn!("Failed to get metrics for {name}: {e}");
1475 return ServiceProcessMetrics::default();
1476 }
1477 };
1478 let Some(PodMetricsContainer {
1479 usage:
1480 PodMetricsContainerUsage {
1481 cpu: Quantity(cpu_str),
1482 memory: Quantity(mem_str),
1483 },
1484 ..
1485 }) = metrics.containers.get(0)
1486 else {
1487 warn!("metrics result contained no containers for {name}");
1488 return ServiceProcessMetrics::default();
1489 };
1490
1491 let cpu = match parse_k8s_quantity(cpu_str) {
1492 Ok(q) => match q.try_to_integer(-9, true) {
1493 Some(i) => Some(i),
1494 None => {
1495 tracing::error!("CPU value {q:? }out of range");
1496 None
1497 }
1498 },
1499 Err(e) => {
1500 tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1501 None
1502 }
1503 };
1504 let memory = match parse_k8s_quantity(mem_str) {
1505 Ok(q) => match q.try_to_integer(0, false) {
1506 Some(i) => Some(i),
1507 None => {
1508 tracing::error!("Memory value {q:?} out of range");
1509 None
1510 }
1511 },
1512 Err(e) => {
1513 tracing::error!("Failed to parse memory value {mem_str}: {e}");
1514 None
1515 }
1516 };
1517
1518 let disk_usage = match (disk_usage, disk_limit) {
1527 (Some(disk_usage), Some(DiskLimit(disk_limit))) => {
1528 Some(std::cmp::min(disk_usage, disk_limit.0))
1529 }
1530 _ => None,
1531 };
1532
1533 ServiceProcessMetrics {
1534 cpu_nano_cores: cpu,
1535 memory_bytes: memory,
1536 disk_usage_bytes: disk_usage,
1537 }
1538 }
1539
1540 async fn get_disk_usage(
1546 self_: &OrchestratorWorker,
1547 service_name: &str,
1548 i: usize,
1549 ) -> anyhow::Result<u64> {
1550 #[derive(Deserialize)]
1551 pub(crate) struct Usage {
1552 disk_bytes: Option<u64>,
1553 }
1554
1555 let service = self_
1556 .service_api
1557 .get(service_name)
1558 .await
1559 .with_context(|| format!("failed to get service {service_name}"))?;
1560 let namespace = service
1561 .metadata
1562 .namespace
1563 .context("missing service namespace")?;
1564 let internal_http_port = service
1565 .spec
1566 .and_then(|spec| spec.ports)
1567 .and_then(|ports| {
1568 ports
1569 .into_iter()
1570 .find(|p| p.name == Some("internal-http".into()))
1571 })
1572 .map(|p| p.port);
1573 let Some(port) = internal_http_port else {
1574 bail!("internal-http port missing in service spec");
1575 };
1576 let metrics_url = format!(
1577 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1578 /api/usage-metrics"
1579 );
1580
1581 let http_client = reqwest::Client::builder()
1582 .timeout(Duration::from_secs(10))
1583 .build()
1584 .context("error building HTTP client")?;
1585 let resp = http_client.get(metrics_url).send().await?;
1586 let usage: Usage = resp.json().await?;
1587
1588 usage
1589 .disk_bytes
1590 .ok_or_else(|| anyhow!("process did not provide disk usage"))
1591 }
1592
1593 let ret = futures::future::join_all(
1594 (0..info.scale).map(|i| get_metrics(self, name, i.into(), info.disk, info.disk_limit)),
1595 );
1596
1597 ret.await
1598 }
1599
1600 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1601 desc.service
1607 .metadata
1608 .owner_references
1609 .get_or_insert(vec![])
1610 .extend(self.owner_references.iter().cloned());
1611 desc.stateful_set
1612 .metadata
1613 .owner_references
1614 .get_or_insert(vec![])
1615 .extend(self.owner_references.iter().cloned());
1616
1617 self.service_api
1618 .patch(
1619 &desc.name,
1620 &PatchParams::apply(FIELD_MANAGER).force(),
1621 &Patch::Apply(desc.service),
1622 )
1623 .await?;
1624 self.stateful_set_api
1625 .patch(
1626 &desc.name,
1627 &PatchParams::apply(FIELD_MANAGER).force(),
1628 &Patch::Apply(desc.stateful_set),
1629 )
1630 .await?;
1631
1632 for pod_id in 0..desc.scale {
1637 let pod_name = format!("{}-{pod_id}", desc.name);
1638 let pod = match self.pod_api.get(&pod_name).await {
1639 Ok(pod) => pod,
1640 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1642 Err(e) => return Err(e),
1643 };
1644 if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION) != Some(&desc.pod_template_hash)
1645 {
1646 match self
1647 .pod_api
1648 .delete(&pod_name, &DeleteParams::default())
1649 .await
1650 {
1651 Ok(_) => (),
1652 Err(kube::Error::Api(e)) if e.code == 404 => (),
1654 Err(e) => return Err(e),
1655 }
1656 }
1657 }
1658
1659 Ok(())
1660 }
1661
1662 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1663 let res = self
1664 .stateful_set_api
1665 .delete(name, &DeleteParams::default())
1666 .await;
1667 match res {
1668 Ok(_) => (),
1669 Err(K8sError::Api(e)) if e.code == 404 => (),
1670 Err(e) => return Err(e),
1671 }
1672
1673 let res = self
1674 .service_api
1675 .delete(name, &DeleteParams::default())
1676 .await;
1677 match res {
1678 Ok(_) => Ok(()),
1679 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1680 Err(e) => Err(e),
1681 }
1682 }
1683
1684 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1685 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1686 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1687 Ok(stateful_sets
1688 .into_iter()
1689 .filter_map(|ss| {
1690 ss.metadata
1691 .name
1692 .unwrap()
1693 .strip_prefix(&name_prefix)
1694 .map(Into::into)
1695 })
1696 .collect())
1697 }
1698}
1699
1700#[derive(Debug, Clone)]
1701struct KubernetesService {
1702 hosts: Vec<String>,
1703 ports: BTreeMap<String, u16>,
1704}
1705
1706impl Service for KubernetesService {
1707 fn addresses(&self, port: &str) -> Vec<String> {
1708 let port = self.ports[port];
1709 self.hosts
1710 .iter()
1711 .map(|host| format!("{host}:{port}"))
1712 .collect()
1713 }
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718 use super::*;
1719
1720 #[mz_ore::test]
1721 fn k8s_quantity_base10_large() {
1722 let cases = &[
1723 ("42", 42),
1724 ("42k", 42000),
1725 ("42M", 42000000),
1726 ("42G", 42000000000),
1727 ("42T", 42000000000000),
1728 ("42P", 42000000000000000),
1729 ];
1730
1731 for (input, expected) in cases {
1732 let quantity = parse_k8s_quantity(input).unwrap();
1733 let number = quantity.try_to_integer(0, true).unwrap();
1734 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1735 }
1736 }
1737
1738 #[mz_ore::test]
1739 fn k8s_quantity_base10_small() {
1740 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1741
1742 for (input, expected) in cases {
1743 let quantity = parse_k8s_quantity(input).unwrap();
1744 let number = quantity.try_to_integer(-9, true).unwrap();
1745 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1746 }
1747 }
1748
1749 #[mz_ore::test]
1750 fn k8s_quantity_base2() {
1751 let cases = &[
1752 ("42Ki", 42 << 10),
1753 ("42Mi", 42 << 20),
1754 ("42Gi", 42 << 30),
1755 ("42Ti", 42 << 40),
1756 ("42Pi", 42 << 50),
1757 ];
1758
1759 for (input, expected) in cases {
1760 let quantity = parse_k8s_quantity(input).unwrap();
1761 let number = quantity.try_to_integer(0, false).unwrap();
1762 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1763 }
1764 }
1765}