1use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::Utc;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34 WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38 LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use kube::ResourceExt;
41use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
42use kube::client::Client;
43use kube::error::Error as K8sError;
44use kube::runtime::{WatchStreamExt, watcher};
45use maplit::btreemap;
46use mz_cloud_resources::AwsExternalIdPrefix;
47use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
48use mz_orchestrator::{
49 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
50 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
51 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
52};
53use mz_ore::cast::CastInto;
54use mz_ore::retry::Retry;
55use mz_ore::task::AbortOnDropHandle;
56use serde::Deserialize;
57use sha2::{Digest, Sha256};
58use tokio::sync::{mpsc, oneshot};
59use tracing::{error, info, warn};
60
61pub mod cloud_resource_controller;
62pub mod secrets;
63pub mod util;
64
65const FIELD_MANAGER: &str = "environmentd";
66const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
67
68const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
69
70#[derive(Debug, Clone)]
72pub struct KubernetesOrchestratorConfig {
73 pub context: String,
76 pub scheduler_name: Option<String>,
78 pub service_annotations: BTreeMap<String, String>,
80 pub service_labels: BTreeMap<String, String>,
82 pub service_node_selector: BTreeMap<String, String>,
84 pub service_affinity: Option<String>,
86 pub service_tolerations: Option<String>,
88 pub service_account: Option<String>,
90 pub image_pull_policy: KubernetesImagePullPolicy,
92 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
95 pub coverage: bool,
97 pub ephemeral_volume_storage_class: Option<String>,
103 pub service_fs_group: Option<i64>,
105 pub name_prefix: Option<String>,
107 pub collect_pod_metrics: bool,
109 pub enable_prometheus_scrape_annotations: bool,
111}
112
113impl KubernetesOrchestratorConfig {
114 pub fn name_prefix(&self) -> String {
115 self.name_prefix.clone().unwrap_or_default()
116 }
117}
118
119#[derive(ValueEnum, Debug, Clone, Copy)]
121pub enum KubernetesImagePullPolicy {
122 Always,
124 IfNotPresent,
126 Never,
128}
129
130impl fmt::Display for KubernetesImagePullPolicy {
131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132 match self {
133 KubernetesImagePullPolicy::Always => f.write_str("Always"),
134 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
135 KubernetesImagePullPolicy::Never => f.write_str("Never"),
136 }
137 }
138}
139
140impl KubernetesImagePullPolicy {
141 pub fn as_kebab_case_str(&self) -> &'static str {
142 match self {
143 Self::Always => "always",
144 Self::IfNotPresent => "if-not-present",
145 Self::Never => "never",
146 }
147 }
148}
149
150pub struct KubernetesOrchestrator {
152 client: Client,
153 kubernetes_namespace: String,
154 config: KubernetesOrchestratorConfig,
155 secret_api: Api<Secret>,
156 vpc_endpoint_api: Api<VpcEndpoint>,
157 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
158 resource_reader: Arc<KubernetesResourceReader>,
159}
160
161impl fmt::Debug for KubernetesOrchestrator {
162 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
163 f.debug_struct("KubernetesOrchestrator").finish()
164 }
165}
166
167impl KubernetesOrchestrator {
168 pub async fn new(
170 config: KubernetesOrchestratorConfig,
171 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
172 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
173 let resource_reader =
174 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
175 Ok(KubernetesOrchestrator {
176 client: client.clone(),
177 kubernetes_namespace,
178 config,
179 secret_api: Api::default_namespaced(client.clone()),
180 vpc_endpoint_api: Api::default_namespaced(client),
181 namespaces: Mutex::new(BTreeMap::new()),
182 resource_reader,
183 })
184 }
185}
186
187impl Orchestrator for KubernetesOrchestrator {
188 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
189 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
190 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
191 let (command_tx, command_rx) = mpsc::unbounded_channel();
192 let worker = OrchestratorWorker {
193 metrics_api: Api::default_namespaced(self.client.clone()),
194 service_api: Api::default_namespaced(self.client.clone()),
195 stateful_set_api: Api::default_namespaced(self.client.clone()),
196 pod_api: Api::default_namespaced(self.client.clone()),
197 owner_references: vec![],
198 command_rx,
199 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
200 collect_pod_metrics: self.config.collect_pod_metrics,
201 }
202 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
203
204 Arc::new(NamespacedKubernetesOrchestrator {
205 pod_api: Api::default_namespaced(self.client.clone()),
206 kubernetes_namespace: self.kubernetes_namespace.clone(),
207 namespace: namespace.into(),
208 config: self.config.clone(),
209 scheduling_config: Default::default(),
211 service_infos: std::sync::Mutex::new(BTreeMap::new()),
212 command_tx,
213 _worker: worker,
214 })
215 }))
216 }
217}
218
219#[derive(Clone, Copy)]
220struct ServiceInfo {
221 scale: NonZero<u16>,
222}
223
224struct NamespacedKubernetesOrchestrator {
225 pod_api: Api<Pod>,
226 kubernetes_namespace: String,
227 namespace: String,
228 config: KubernetesOrchestratorConfig,
229 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
230 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
231 command_tx: mpsc::UnboundedSender<WorkerCommand>,
232 _worker: AbortOnDropHandle<()>,
233}
234
235impl fmt::Debug for NamespacedKubernetesOrchestrator {
236 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
237 f.debug_struct("NamespacedKubernetesOrchestrator")
238 .field("kubernetes_namespace", &self.kubernetes_namespace)
239 .field("namespace", &self.namespace)
240 .field("config", &self.config)
241 .finish()
242 }
243}
244
245enum WorkerCommand {
251 EnsureService {
252 desc: ServiceDescription,
253 },
254 DropService {
255 name: String,
256 },
257 ListServices {
258 namespace: String,
259 result_tx: oneshot::Sender<Vec<String>>,
260 },
261 FetchServiceMetrics {
262 name: String,
263 info: ServiceInfo,
264 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
265 },
266}
267
268#[derive(Debug, Clone)]
270struct ServiceDescription {
271 name: String,
272 scale: NonZero<u16>,
273 service: K8sService,
274 stateful_set: StatefulSet,
275 pod_template_hash: String,
276}
277
278struct OrchestratorWorker {
290 metrics_api: Api<PodMetrics>,
291 service_api: Api<K8sService>,
292 stateful_set_api: Api<StatefulSet>,
293 pod_api: Api<Pod>,
294 owner_references: Vec<OwnerReference>,
295 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
296 name_prefix: String,
297 collect_pod_metrics: bool,
298}
299
300#[derive(Deserialize, Clone, Debug)]
301pub struct PodMetricsContainer {
302 pub name: String,
303 pub usage: PodMetricsContainerUsage,
304}
305
306#[derive(Deserialize, Clone, Debug)]
307pub struct PodMetricsContainerUsage {
308 pub cpu: Quantity,
309 pub memory: Quantity,
310}
311
312#[derive(Deserialize, Clone, Debug)]
313pub struct PodMetrics {
314 pub metadata: ObjectMeta,
315 pub timestamp: String,
316 pub window: String,
317 pub containers: Vec<PodMetricsContainer>,
318}
319
320impl k8s_openapi::Resource for PodMetrics {
321 const GROUP: &'static str = "metrics.k8s.io";
322 const KIND: &'static str = "PodMetrics";
323 const VERSION: &'static str = "v1beta1";
324 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
325 const URL_PATH_SEGMENT: &'static str = "pods";
326
327 type Scope = k8s_openapi::NamespaceResourceScope;
328}
329
330impl k8s_openapi::Metadata for PodMetrics {
331 type Ty = ObjectMeta;
332
333 fn metadata(&self) -> &Self::Ty {
334 &self.metadata
335 }
336
337 fn metadata_mut(&mut self) -> &mut Self::Ty {
338 &mut self.metadata
339 }
340}
341
342#[derive(Deserialize, Clone, Debug)]
351pub struct MetricIdentifier {
352 #[serde(rename = "metricName")]
353 pub name: String,
354 }
356
357#[derive(Deserialize, Clone, Debug)]
358pub struct MetricValue {
359 #[serde(rename = "describedObject")]
360 pub described_object: ObjectReference,
361 #[serde(flatten)]
362 pub metric_identifier: MetricIdentifier,
363 pub timestamp: String,
364 pub value: Quantity,
365 }
367
368impl NamespacedKubernetesOrchestrator {
369 fn service_name(&self, id: &str) -> String {
370 format!(
371 "{}{}-{id}",
372 self.config.name_prefix.as_deref().unwrap_or(""),
373 self.namespace
374 )
375 }
376
377 fn watch_pod_params(&self) -> watcher::Config {
380 let ns_selector = format!(
381 "environmentd.materialize.cloud/namespace={}",
382 self.namespace
383 );
384 watcher::Config::default().timeout(59).labels(&ns_selector)
386 }
387
388 fn make_label_key(&self, key: &str) -> String {
391 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
392 }
393
394 fn label_selector_to_k8s(
395 &self,
396 MzLabelSelector { label_name, logic }: MzLabelSelector,
397 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
398 let (operator, values) = match logic {
399 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
400 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
401 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
402 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
403 LabelSelectionLogic::InSet { values } => {
404 if values.is_empty() {
405 Err(anyhow!(
406 "Invalid selector logic for {label_name}: empty `in` set"
407 ))
408 } else {
409 Ok(("In", values))
410 }
411 }
412 LabelSelectionLogic::NotInSet { values } => {
413 if values.is_empty() {
414 Err(anyhow!(
415 "Invalid selector logic for {label_name}: empty `notin` set"
416 ))
417 } else {
418 Ok(("NotIn", values))
419 }
420 }
421 }?;
422 let lsr = LabelSelectorRequirement {
423 key: self.make_label_key(&label_name),
424 operator: operator.to_string(),
425 values: Some(values),
426 };
427 Ok(lsr)
428 }
429
430 fn send_command(&self, cmd: WorkerCommand) {
431 self.command_tx.send(cmd).expect("worker task not dropped");
432 }
433}
434
435#[derive(Debug)]
436struct ScaledQuantity {
437 integral_part: u64,
438 exponent: i8,
439 base10: bool,
440}
441
442impl ScaledQuantity {
443 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
444 if base10 != self.base10 {
445 return None;
446 }
447 let exponent = self.exponent - scale;
448 let mut result = self.integral_part;
449 let base = if self.base10 { 10 } else { 2 };
450 if exponent < 0 {
451 for _ in exponent..0 {
452 result /= base;
453 }
454 } else {
455 for _ in 0..exponent {
456 result = result.checked_mul(base)?;
457 }
458 }
459 Some(result)
460 }
461}
462
463fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
473 const DEC_SUFFIXES: &[(&str, i8)] = &[
474 ("n", -9),
475 ("u", -6),
476 ("m", -3),
477 ("", 0),
478 ("k", 3), ("M", 6),
480 ("G", 9),
481 ("T", 12),
482 ("P", 15),
483 ("E", 18),
484 ];
485 const BIN_SUFFIXES: &[(&str, i8)] = &[
486 ("", 0),
487 ("Ki", 10),
488 ("Mi", 20),
489 ("Gi", 30),
490 ("Ti", 40),
491 ("Pi", 50),
492 ("Ei", 60),
493 ];
494
495 let (positive, s) = match s.chars().next() {
496 Some('+') => (true, &s[1..]),
497 Some('-') => (false, &s[1..]),
498 _ => (true, s),
499 };
500
501 if !positive {
502 anyhow::bail!("Negative numbers not supported")
503 }
504
505 fn is_suffix_char(ch: char) -> bool {
506 "numkMGTPEKi".contains(ch)
507 }
508 let (num, suffix) = match s.find(is_suffix_char) {
509 None => (s, ""),
510 Some(idx) => s.split_at(idx),
511 };
512 let num: u64 = num.parse()?;
513 let (exponent, base10) = if let Some((_, exponent)) =
514 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
515 {
516 (exponent, true)
517 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
518 (exponent, false)
519 } else {
520 anyhow::bail!("Unrecognized suffix: {suffix}");
521 };
522 Ok(ScaledQuantity {
523 integral_part: num,
524 exponent: *exponent,
525 base10,
526 })
527}
528
529#[async_trait]
530impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
531 async fn fetch_service_metrics(
532 &self,
533 id: &str,
534 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
535 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
536 *info
537 } else {
538 tracing::error!("Failed to get info for {id}");
540 anyhow::bail!("Failed to get info for {id}");
541 };
542
543 let (result_tx, result_rx) = oneshot::channel();
544 self.send_command(WorkerCommand::FetchServiceMetrics {
545 name: self.service_name(id),
546 info,
547 result_tx,
548 });
549
550 let metrics = result_rx.await.expect("worker task not dropped");
551 Ok(metrics)
552 }
553
554 fn ensure_service(
555 &self,
556 id: &str,
557 ServiceConfig {
558 image,
559 init_container_image,
560 args,
561 ports: ports_in,
562 memory_limit,
563 memory_request,
564 cpu_limit,
565 scale,
566 labels: labels_in,
567 annotations: annotations_in,
568 availability_zones,
569 other_replicas_selector,
570 replicas_selector,
571 disk_limit,
572 node_selector,
573 }: ServiceConfig,
574 ) -> Result<Box<dyn Service>, anyhow::Error> {
575 let scheduling_config: ServiceSchedulingConfig =
577 self.scheduling_config.read().expect("poisoned").clone();
578
579 let disk = disk_limit != Some(DiskLimit::ZERO);
581
582 let name = self.service_name(id);
583 let mut match_labels = btreemap! {
589 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
590 "environmentd.materialize.cloud/service-id".into() => id.into(),
591 };
592 for (key, value) in &self.config.service_labels {
593 match_labels.insert(key.clone(), value.clone());
594 }
595
596 let mut labels = match_labels.clone();
597 for (key, value) in labels_in {
598 labels.insert(self.make_label_key(&key), value);
599 }
600
601 labels.insert(self.make_label_key("scale"), scale.to_string());
602
603 for port in &ports_in {
604 labels.insert(
605 format!("environmentd.materialize.cloud/port-{}", port.name),
606 "true".into(),
607 );
608 }
609 let mut limits = BTreeMap::new();
610 let mut requests = BTreeMap::new();
611 if let Some(memory_limit) = memory_limit {
612 limits.insert(
613 "memory".into(),
614 Quantity(memory_limit.0.as_u64().to_string()),
615 );
616 requests.insert(
617 "memory".into(),
618 Quantity(memory_limit.0.as_u64().to_string()),
619 );
620 }
621 if let Some(memory_request) = memory_request {
622 requests.insert(
623 "memory".into(),
624 Quantity(memory_request.0.as_u64().to_string()),
625 );
626 }
627 if let Some(cpu_limit) = cpu_limit {
628 limits.insert(
629 "cpu".into(),
630 Quantity(format!("{}m", cpu_limit.as_millicpus())),
631 );
632 requests.insert(
633 "cpu".into(),
634 Quantity(format!("{}m", cpu_limit.as_millicpus())),
635 );
636 }
637 let service = K8sService {
638 metadata: ObjectMeta {
639 name: Some(name.clone()),
640 ..Default::default()
641 },
642 spec: Some(ServiceSpec {
643 ports: Some(
644 ports_in
645 .iter()
646 .map(|port| ServicePort {
647 port: port.port_hint.into(),
648 name: Some(port.name.clone()),
649 ..Default::default()
650 })
651 .collect(),
652 ),
653 cluster_ip: Some("None".to_string()),
654 selector: Some(match_labels.clone()),
655 ..Default::default()
656 }),
657 status: None,
658 };
659
660 let hosts = (0..scale.get())
661 .map(|i| {
662 format!(
663 "{name}-{i}.{name}.{}.svc.cluster.local",
664 self.kubernetes_namespace
665 )
666 })
667 .collect::<Vec<_>>();
668 let ports = ports_in
669 .iter()
670 .map(|p| (p.name.clone(), p.port_hint))
671 .collect::<BTreeMap<_, _>>();
672
673 let mut listen_addrs = BTreeMap::new();
674 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
675 for (name, port) in &ports {
676 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
677 for (i, host) in hosts.iter().enumerate() {
678 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
679 }
680 }
681 let mut args = args(ServiceAssignments {
682 listen_addrs: &listen_addrs,
683 peer_addrs: &peer_addrs,
684 });
685
686 let anti_affinity = Some({
695 let label_selector_requirements = other_replicas_selector
696 .clone()
697 .into_iter()
698 .map(|ls| self.label_selector_to_k8s(ls))
699 .collect::<Result<Vec<_>, _>>()?;
700 let ls = LabelSelector {
701 match_expressions: Some(label_selector_requirements),
702 ..Default::default()
703 };
704 let pat = PodAffinityTerm {
705 label_selector: Some(ls),
706 topology_key: "kubernetes.io/hostname".to_string(),
707 ..Default::default()
708 };
709
710 if !scheduling_config.soften_replication_anti_affinity {
711 PodAntiAffinity {
712 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
713 ..Default::default()
714 }
715 } else {
716 PodAntiAffinity {
717 preferred_during_scheduling_ignored_during_execution: Some(vec![
718 WeightedPodAffinityTerm {
719 weight: scheduling_config.soften_replication_anti_affinity_weight,
720 pod_affinity_term: pat,
721 },
722 ]),
723 ..Default::default()
724 }
725 }
726 });
727
728 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
729 let ls = LabelSelector {
731 match_labels: Some(match_labels.clone()),
732 ..Default::default()
733 };
734 let pat = PodAffinityTerm {
735 label_selector: Some(ls),
736 topology_key: "topology.kubernetes.io/zone".to_string(),
737 ..Default::default()
738 };
739
740 Some(PodAffinity {
741 preferred_during_scheduling_ignored_during_execution: Some(vec![
742 WeightedPodAffinityTerm {
743 weight,
744 pod_affinity_term: pat,
745 },
746 ]),
747 ..Default::default()
748 })
749 } else {
750 None
751 };
752
753 let topology_spread = if scheduling_config.topology_spread.enabled {
754 let config = &scheduling_config.topology_spread;
755
756 if !config.ignore_non_singular_scale || scale.get() == 1 {
757 let label_selector_requirements = (if config.ignore_non_singular_scale {
758 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
759
760 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
761 label_name: "scale".into(),
762 logic: mz_orchestrator::LabelSelectionLogic::Eq {
763 value: "1".to_string(),
764 },
765 });
766
767 replicas_selector_ignoring_scale
768 } else {
769 replicas_selector
770 })
771 .into_iter()
772 .map(|ls| self.label_selector_to_k8s(ls))
773 .collect::<Result<Vec<_>, _>>()?;
774 let ls = LabelSelector {
775 match_expressions: Some(label_selector_requirements),
776 ..Default::default()
777 };
778
779 let constraint = TopologySpreadConstraint {
780 label_selector: Some(ls),
781 min_domains: config.min_domains,
782 max_skew: config.max_skew,
783 topology_key: "topology.kubernetes.io/zone".to_string(),
784 when_unsatisfiable: if config.soft {
785 "ScheduleAnyway".to_string()
786 } else {
787 "DoNotSchedule".to_string()
788 },
789 match_label_keys: None,
799 ..Default::default()
802 };
803 Some(vec![constraint])
804 } else {
805 None
806 }
807 } else {
808 None
809 };
810
811 let mut pod_annotations = btreemap! {
812 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
818 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
819
820 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
822 };
823 for (key, value) in annotations_in {
824 pod_annotations.insert(self.make_label_key(&key), value);
826 }
827 if self.config.enable_prometheus_scrape_annotations {
828 if let Some(internal_http_port) = ports_in
829 .iter()
830 .find(|port| port.name == "internal-http")
831 .map(|port| port.port_hint.to_string())
832 {
833 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
835 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
836 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
837 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
838 }
839 }
840 for (key, value) in &self.config.service_annotations {
841 pod_annotations.insert(key.clone(), value.clone());
842 }
843
844 let default_node_selector = if disk {
845 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
846 } else {
847 vec![]
851 };
852
853 let node_selector: BTreeMap<String, String> = default_node_selector
854 .into_iter()
855 .chain(self.config.service_node_selector.clone())
856 .chain(node_selector)
857 .collect();
858
859 let node_affinity = if let Some(availability_zones) = availability_zones {
860 let selector = NodeSelectorTerm {
861 match_expressions: Some(vec![NodeSelectorRequirement {
862 key: "materialize.cloud/availability-zone".to_string(),
863 operator: "In".to_string(),
864 values: Some(availability_zones),
865 }]),
866 match_fields: None,
867 };
868
869 if scheduling_config.soften_az_affinity {
870 Some(NodeAffinity {
871 preferred_during_scheduling_ignored_during_execution: Some(vec![
872 PreferredSchedulingTerm {
873 preference: selector,
874 weight: scheduling_config.soften_az_affinity_weight,
875 },
876 ]),
877 required_during_scheduling_ignored_during_execution: None,
878 })
879 } else {
880 Some(NodeAffinity {
881 preferred_during_scheduling_ignored_during_execution: None,
882 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
883 node_selector_terms: vec![selector],
884 }),
885 })
886 }
887 } else {
888 None
889 };
890
891 let mut affinity = Affinity {
892 pod_anti_affinity: anti_affinity,
893 pod_affinity,
894 node_affinity,
895 ..Default::default()
896 };
897 if let Some(service_affinity) = &self.config.service_affinity {
898 affinity.merge_from(serde_json::from_str(service_affinity)?);
899 }
900
901 let container_name = image
902 .rsplit_once('/')
903 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
904 .context("`image` is not ORG/NAME:VERSION")?
905 .0
906 .to_string();
907
908 let container_security_context = if scheduling_config.security_context_enabled {
909 Some(SecurityContext {
910 privileged: Some(false),
911 run_as_non_root: Some(true),
912 allow_privilege_escalation: Some(false),
913 seccomp_profile: Some(SeccompProfile {
914 type_: "RuntimeDefault".to_string(),
915 ..Default::default()
916 }),
917 capabilities: Some(Capabilities {
918 drop: Some(vec!["ALL".to_string()]),
919 ..Default::default()
920 }),
921 ..Default::default()
922 })
923 } else {
924 None
925 };
926
927 let init_containers = init_container_image.map(|image| {
928 vec![Container {
929 name: "init".to_string(),
930 image: Some(image),
931 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
932 resources: Some(ResourceRequirements {
933 claims: None,
934 limits: Some(limits.clone()),
935 requests: Some(requests.clone()),
936 }),
937 security_context: container_security_context.clone(),
938 env: Some(vec![
939 EnvVar {
940 name: "MZ_NAMESPACE".to_string(),
941 value_from: Some(EnvVarSource {
942 field_ref: Some(ObjectFieldSelector {
943 field_path: "metadata.namespace".to_string(),
944 ..Default::default()
945 }),
946 ..Default::default()
947 }),
948 ..Default::default()
949 },
950 EnvVar {
951 name: "MZ_POD_NAME".to_string(),
952 value_from: Some(EnvVarSource {
953 field_ref: Some(ObjectFieldSelector {
954 field_path: "metadata.name".to_string(),
955 ..Default::default()
956 }),
957 ..Default::default()
958 }),
959 ..Default::default()
960 },
961 EnvVar {
962 name: "MZ_NODE_NAME".to_string(),
963 value_from: Some(EnvVarSource {
964 field_ref: Some(ObjectFieldSelector {
965 field_path: "spec.nodeName".to_string(),
966 ..Default::default()
967 }),
968 ..Default::default()
969 }),
970 ..Default::default()
971 },
972 ]),
973 ..Default::default()
974 }]
975 });
976
977 let env = if self.config.coverage {
978 Some(vec![EnvVar {
979 name: "LLVM_PROFILE_FILE".to_string(),
980 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
981 ..Default::default()
982 }])
983 } else {
984 None
985 };
986
987 let mut volume_mounts = vec![];
988
989 if self.config.coverage {
990 volume_mounts.push(VolumeMount {
991 name: "coverage".to_string(),
992 mount_path: "/coverage".to_string(),
993 ..Default::default()
994 })
995 }
996
997 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
998 (true, Some(ephemeral_volume_storage_class)) => {
999 volume_mounts.push(VolumeMount {
1000 name: "scratch".to_string(),
1001 mount_path: "/scratch".to_string(),
1002 ..Default::default()
1003 });
1004 args.push("--scratch-directory=/scratch".into());
1005
1006 Some(vec![Volume {
1007 name: "scratch".to_string(),
1008 ephemeral: Some(EphemeralVolumeSource {
1009 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1010 spec: PersistentVolumeClaimSpec {
1011 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1012 storage_class_name: Some(
1013 ephemeral_volume_storage_class.to_string(),
1014 ),
1015 resources: Some(VolumeResourceRequirements {
1016 requests: Some(BTreeMap::from([(
1017 "storage".to_string(),
1018 Quantity(
1019 disk_limit
1020 .unwrap_or(DiskLimit::ARBITRARY)
1021 .0
1022 .as_u64()
1023 .to_string(),
1024 ),
1025 )])),
1026 ..Default::default()
1027 }),
1028 ..Default::default()
1029 },
1030 ..Default::default()
1031 }),
1032 ..Default::default()
1033 }),
1034 ..Default::default()
1035 }])
1036 }
1037 (true, None) => {
1038 return Err(anyhow!(
1039 "service requested disk but no ephemeral volume storage class was configured"
1040 ));
1041 }
1042 (false, _) => None,
1043 };
1044
1045 if let Some(name_prefix) = &self.config.name_prefix {
1046 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1047 }
1048
1049 let volume_claim_templates = if self.config.coverage {
1050 Some(vec![PersistentVolumeClaim {
1051 metadata: ObjectMeta {
1052 name: Some("coverage".to_string()),
1053 ..Default::default()
1054 },
1055 spec: Some(PersistentVolumeClaimSpec {
1056 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1057 resources: Some(VolumeResourceRequirements {
1058 requests: Some(BTreeMap::from([(
1059 "storage".to_string(),
1060 Quantity("10Gi".to_string()),
1061 )])),
1062 ..Default::default()
1063 }),
1064 ..Default::default()
1065 }),
1066 ..Default::default()
1067 }])
1068 } else {
1069 None
1070 };
1071
1072 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1073 Some(PodSecurityContext {
1074 fs_group: Some(fs_group),
1075 run_as_user: Some(fs_group),
1076 run_as_group: Some(fs_group),
1077 ..Default::default()
1078 })
1079 } else {
1080 None
1081 };
1082
1083 let mut tolerations = vec![
1084 Toleration {
1089 effect: Some("NoExecute".into()),
1090 key: Some("node.kubernetes.io/not-ready".into()),
1091 operator: Some("Exists".into()),
1092 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1093 value: None,
1094 },
1095 Toleration {
1096 effect: Some("NoExecute".into()),
1097 key: Some("node.kubernetes.io/unreachable".into()),
1098 operator: Some("Exists".into()),
1099 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1100 value: None,
1101 },
1102 ];
1103 if let Some(service_tolerations) = &self.config.service_tolerations {
1104 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1105 }
1106 let tolerations = Some(tolerations);
1107
1108 let mut pod_template_spec = PodTemplateSpec {
1109 metadata: Some(ObjectMeta {
1110 labels: Some(labels.clone()),
1111 ..Default::default()
1114 }),
1115 spec: Some(PodSpec {
1116 init_containers,
1117 containers: vec![Container {
1118 name: container_name,
1119 image: Some(image),
1120 args: Some(args),
1121 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1122 ports: Some(
1123 ports_in
1124 .iter()
1125 .map(|port| ContainerPort {
1126 container_port: port.port_hint.into(),
1127 name: Some(port.name.clone()),
1128 ..Default::default()
1129 })
1130 .collect(),
1131 ),
1132 security_context: container_security_context.clone(),
1133 resources: Some(ResourceRequirements {
1134 claims: None,
1135 limits: Some(limits),
1136 requests: Some(requests),
1137 }),
1138 volume_mounts: if !volume_mounts.is_empty() {
1139 Some(volume_mounts)
1140 } else {
1141 None
1142 },
1143 env,
1144 ..Default::default()
1145 }],
1146 volumes,
1147 security_context,
1148 node_selector: Some(node_selector),
1149 scheduler_name: self.config.scheduler_name.clone(),
1150 service_account: self.config.service_account.clone(),
1151 affinity: Some(affinity),
1152 topology_spread_constraints: topology_spread,
1153 tolerations,
1154 termination_grace_period_seconds: Some(0),
1176 ..Default::default()
1177 }),
1178 };
1179 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1180 let mut hasher = Sha256::new();
1181 hasher.update(pod_template_json);
1182 let pod_template_hash = format!("{:x}", hasher.finalize());
1183 pod_annotations.insert(
1184 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1185 pod_template_hash.clone(),
1186 );
1187
1188 pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1189
1190 let stateful_set = StatefulSet {
1191 metadata: ObjectMeta {
1192 name: Some(name.clone()),
1193 ..Default::default()
1194 },
1195 spec: Some(StatefulSetSpec {
1196 selector: LabelSelector {
1197 match_labels: Some(match_labels),
1198 ..Default::default()
1199 },
1200 service_name: Some(name.clone()),
1201 replicas: Some(scale.cast_into()),
1202 template: pod_template_spec,
1203 update_strategy: Some(StatefulSetUpdateStrategy {
1204 type_: Some("OnDelete".to_owned()),
1205 ..Default::default()
1206 }),
1207 pod_management_policy: Some("Parallel".to_string()),
1208 volume_claim_templates,
1209 ..Default::default()
1210 }),
1211 status: None,
1212 };
1213
1214 self.send_command(WorkerCommand::EnsureService {
1215 desc: ServiceDescription {
1216 name,
1217 scale,
1218 service,
1219 stateful_set,
1220 pod_template_hash,
1221 },
1222 });
1223
1224 self.service_infos
1225 .lock()
1226 .expect("poisoned lock")
1227 .insert(id.to_string(), ServiceInfo { scale });
1228
1229 Ok(Box::new(KubernetesService { hosts, ports }))
1230 }
1231
1232 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1234 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1235 self.service_infos.lock().expect("poisoned lock").remove(id);
1236
1237 self.send_command(WorkerCommand::DropService {
1238 name: self.service_name(id),
1239 });
1240
1241 Ok(())
1242 }
1243
1244 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1246 let (result_tx, result_rx) = oneshot::channel();
1247 self.send_command(WorkerCommand::ListServices {
1248 namespace: self.namespace.clone(),
1249 result_tx,
1250 });
1251
1252 let list = result_rx.await.expect("worker task not dropped");
1253 Ok(list)
1254 }
1255
1256 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1257 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1258 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1259 let service_id_label = "environmentd.materialize.cloud/service-id";
1260 let service_id = pod
1261 .labels()
1262 .get(service_id_label)
1263 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1264 .clone();
1265
1266 let oomed = pod
1267 .status
1268 .as_ref()
1269 .and_then(|status| status.container_statuses.as_ref())
1270 .map(|container_statuses| {
1271 container_statuses.iter().any(|cs| {
1272 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1276 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1277 let termination_state = current_state.or(last_state);
1278
1279 let exit_code = termination_state.map(|s| s.exit_code);
1286 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1287 })
1288 })
1289 .unwrap_or(false);
1290
1291 let (pod_ready, last_probe_time) = pod
1292 .status
1293 .and_then(|status| status.conditions)
1294 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1295 .map(|c| (c.status == "True", c.last_probe_time))
1296 .unwrap_or((false, None));
1297
1298 let status = if pod_ready {
1299 ServiceStatus::Online
1300 } else {
1301 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1302 };
1303 let time = if let Some(time) = last_probe_time {
1304 time.0
1305 } else {
1306 Utc::now()
1307 };
1308
1309 Ok(ServiceEvent {
1310 service_id,
1311 process_id,
1312 status,
1313 time,
1314 })
1315 }
1316
1317 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1318 .touched_objects()
1319 .filter_map(|object| async move {
1320 match object {
1321 Ok(pod) => Some(into_service_event(pod)),
1322 Err(error) => {
1323 tracing::warn!("service watch error: {error}");
1326 None
1327 }
1328 }
1329 });
1330 Box::pin(stream)
1331 }
1332
1333 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1334 *self.scheduling_config.write().expect("poisoned") = config;
1335 }
1336}
1337
1338impl OrchestratorWorker {
1339 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1340 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1341 }
1342
1343 async fn run(mut self) {
1344 {
1345 info!("initializing Kubernetes orchestrator worker");
1346 let start = Instant::now();
1347
1348 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1352 let orchestrator_pod = Retry::default()
1353 .clamp_backoff(Duration::from_secs(10))
1354 .retry_async(|_| self.pod_api.get(&hostname))
1355 .await
1356 .expect("always retries on error");
1357 self.owner_references
1358 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1359
1360 info!(
1361 "Kubernetes orchestrator worker initialized in {:?}",
1362 start.elapsed()
1363 );
1364 }
1365
1366 while let Some(cmd) = self.command_rx.recv().await {
1367 self.handle_command(cmd).await;
1368 }
1369 }
1370
1371 async fn handle_command(&self, cmd: WorkerCommand) {
1377 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1378 where
1379 F: Fn() -> U,
1380 U: Future<Output = Result<R, K8sError>>,
1381 {
1382 Retry::default()
1383 .clamp_backoff(Duration::from_secs(10))
1384 .retry_async(|_| {
1385 f().map_err(
1386 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1387 )
1388 })
1389 .await
1390 .expect("always retries on error")
1391 }
1392
1393 use WorkerCommand::*;
1394 match cmd {
1395 EnsureService { desc } => {
1396 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1397 }
1398 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1399 ListServices {
1400 namespace,
1401 result_tx,
1402 } => {
1403 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1404 let _ = result_tx.send(result);
1405 }
1406 FetchServiceMetrics {
1407 name,
1408 info,
1409 result_tx,
1410 } => {
1411 let result = self.fetch_service_metrics(&name, &info).await;
1412 let _ = result_tx.send(result);
1413 }
1414 }
1415 }
1416
1417 async fn fetch_service_metrics(
1418 &self,
1419 name: &str,
1420 info: &ServiceInfo,
1421 ) -> Vec<ServiceProcessMetrics> {
1422 if !self.collect_pod_metrics {
1423 return (0..info.scale.get())
1424 .map(|_| ServiceProcessMetrics::default())
1425 .collect();
1426 }
1427
1428 #[derive(Deserialize)]
1430 pub(crate) struct ClusterdUsage {
1431 disk_bytes: Option<u64>,
1432 memory_bytes: Option<u64>,
1433 swap_bytes: Option<u64>,
1434 heap_limit: Option<u64>,
1435 }
1436
1437 async fn get_metrics(
1443 self_: &OrchestratorWorker,
1444 service_name: &str,
1445 i: usize,
1446 ) -> ServiceProcessMetrics {
1447 let name = format!("{service_name}-{i}");
1448
1449 let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1450 let (metrics, clusterd_usage) =
1451 match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1452 {
1453 (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1454 (Ok(metrics), Err(e)) => {
1455 warn!("Failed to fetch clusterd usage for {name}: {e}");
1456 (metrics, None)
1457 }
1458 (Err(e), _) => {
1459 warn!("Failed to get metrics for {name}: {e}");
1460 return ServiceProcessMetrics::default();
1461 }
1462 };
1463 let Some(PodMetricsContainer {
1464 usage:
1465 PodMetricsContainerUsage {
1466 cpu: Quantity(cpu_str),
1467 memory: Quantity(mem_str),
1468 },
1469 ..
1470 }) = metrics.containers.get(0)
1471 else {
1472 warn!("metrics result contained no containers for {name}");
1473 return ServiceProcessMetrics::default();
1474 };
1475
1476 let mut process_metrics = ServiceProcessMetrics::default();
1477
1478 match parse_k8s_quantity(cpu_str) {
1479 Ok(q) => match q.try_to_integer(-9, true) {
1480 Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1481 None => error!("CPU value {q:?} out of range"),
1482 },
1483 Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1484 }
1485 match parse_k8s_quantity(mem_str) {
1486 Ok(q) => match q.try_to_integer(0, false) {
1487 Some(mem) => process_metrics.memory_bytes = Some(mem),
1488 None => error!("memory value {q:?} out of range"),
1489 },
1490 Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1491 }
1492
1493 if let Some(usage) = clusterd_usage {
1494 process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1500 (Some(disk), Some(swap)) => Some(disk + swap),
1501 (disk, swap) => disk.or(swap),
1502 };
1503
1504 process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1507 (Some(memory), Some(swap)) => Some(memory + swap),
1508 (Some(memory), None) => Some(memory),
1509 (None, _) => None,
1510 };
1511
1512 process_metrics.heap_limit = usage.heap_limit;
1513 }
1514
1515 process_metrics
1516 }
1517
1518 async fn get_clusterd_usage(
1524 self_: &OrchestratorWorker,
1525 service_name: &str,
1526 i: usize,
1527 ) -> anyhow::Result<ClusterdUsage> {
1528 let service = self_
1529 .service_api
1530 .get(service_name)
1531 .await
1532 .with_context(|| format!("failed to get service {service_name}"))?;
1533 let namespace = service
1534 .metadata
1535 .namespace
1536 .context("missing service namespace")?;
1537 let internal_http_port = service
1538 .spec
1539 .and_then(|spec| spec.ports)
1540 .and_then(|ports| {
1541 ports
1542 .into_iter()
1543 .find(|p| p.name == Some("internal-http".into()))
1544 })
1545 .map(|p| p.port);
1546 let Some(port) = internal_http_port else {
1547 bail!("internal-http port missing in service spec");
1548 };
1549 let metrics_url = format!(
1550 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1551 /api/usage-metrics"
1552 );
1553
1554 let http_client = reqwest::Client::builder()
1555 .timeout(Duration::from_secs(10))
1556 .build()
1557 .context("error building HTTP client")?;
1558 let resp = http_client.get(metrics_url).send().await?;
1559 let usage = resp.json().await?;
1560
1561 Ok(usage)
1562 }
1563
1564 let ret = futures::future::join_all(
1565 (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1566 );
1567
1568 ret.await
1569 }
1570
1571 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1572 desc.service
1578 .metadata
1579 .owner_references
1580 .get_or_insert(vec![])
1581 .extend(self.owner_references.iter().cloned());
1582 desc.stateful_set
1583 .metadata
1584 .owner_references
1585 .get_or_insert(vec![])
1586 .extend(self.owner_references.iter().cloned());
1587
1588 let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1589 let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1590 let pod_annotations = pod_metadata.annotations.clone();
1591
1592 self.service_api
1593 .patch(
1594 &desc.name,
1595 &PatchParams::apply(FIELD_MANAGER).force(),
1596 &Patch::Apply(desc.service),
1597 )
1598 .await?;
1599 self.stateful_set_api
1600 .patch(
1601 &desc.name,
1602 &PatchParams::apply(FIELD_MANAGER).force(),
1603 &Patch::Apply(desc.stateful_set),
1604 )
1605 .await?;
1606
1607 for pod_id in 0..desc.scale.get() {
1618 let pod_name = format!("{}-{pod_id}", desc.name);
1619 let pod = match self.pod_api.get(&pod_name).await {
1620 Ok(pod) => pod,
1621 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1623 Err(e) => return Err(e),
1624 };
1625
1626 let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1627 != Some(&desc.pod_template_hash)
1628 {
1629 self.pod_api
1630 .delete(&pod_name, &DeleteParams::default())
1631 .await
1632 .map(|_| ())
1633 } else {
1634 let metadata = ObjectMeta {
1635 annotations: pod_annotations.clone(),
1636 ..Default::default()
1637 }
1638 .into_request_partial::<Pod>();
1639 self.pod_api
1640 .patch_metadata(
1641 &pod_name,
1642 &PatchParams::apply(FIELD_MANAGER).force(),
1643 &Patch::Apply(&metadata),
1644 )
1645 .await
1646 .map(|_| ())
1647 };
1648
1649 match result {
1650 Ok(()) => (),
1651 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1653 Err(e) => return Err(e),
1654 }
1655 }
1656
1657 Ok(())
1658 }
1659
1660 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1661 let res = self
1662 .stateful_set_api
1663 .delete(name, &DeleteParams::default())
1664 .await;
1665 match res {
1666 Ok(_) => (),
1667 Err(K8sError::Api(e)) if e.code == 404 => (),
1668 Err(e) => return Err(e),
1669 }
1670
1671 let res = self
1672 .service_api
1673 .delete(name, &DeleteParams::default())
1674 .await;
1675 match res {
1676 Ok(_) => Ok(()),
1677 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1678 Err(e) => Err(e),
1679 }
1680 }
1681
1682 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1683 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1684 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1685 Ok(stateful_sets
1686 .into_iter()
1687 .filter_map(|ss| {
1688 ss.metadata
1689 .name
1690 .unwrap()
1691 .strip_prefix(&name_prefix)
1692 .map(Into::into)
1693 })
1694 .collect())
1695 }
1696}
1697
1698#[derive(Debug, Clone)]
1699struct KubernetesService {
1700 hosts: Vec<String>,
1701 ports: BTreeMap<String, u16>,
1702}
1703
1704impl Service for KubernetesService {
1705 fn addresses(&self, port: &str) -> Vec<String> {
1706 let port = self.ports[port];
1707 self.hosts
1708 .iter()
1709 .map(|host| format!("{host}:{port}"))
1710 .collect()
1711 }
1712}
1713
1714#[cfg(test)]
1715mod tests {
1716 use super::*;
1717
1718 #[mz_ore::test]
1719 fn k8s_quantity_base10_large() {
1720 let cases = &[
1721 ("42", 42),
1722 ("42k", 42000),
1723 ("42M", 42000000),
1724 ("42G", 42000000000),
1725 ("42T", 42000000000000),
1726 ("42P", 42000000000000000),
1727 ];
1728
1729 for (input, expected) in cases {
1730 let quantity = parse_k8s_quantity(input).unwrap();
1731 let number = quantity.try_to_integer(0, true).unwrap();
1732 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1733 }
1734 }
1735
1736 #[mz_ore::test]
1737 fn k8s_quantity_base10_small() {
1738 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1739
1740 for (input, expected) in cases {
1741 let quantity = parse_k8s_quantity(input).unwrap();
1742 let number = quantity.try_to_integer(-9, true).unwrap();
1743 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1744 }
1745 }
1746
1747 #[mz_ore::test]
1748 fn k8s_quantity_base2() {
1749 let cases = &[
1750 ("42Ki", 42 << 10),
1751 ("42Mi", 42 << 20),
1752 ("42Gi", 42 << 30),
1753 ("42Ti", 42 << 40),
1754 ("42Pi", 42 << 50),
1755 ];
1756
1757 for (input, expected) in cases {
1758 let quantity = parse_k8s_quantity(input).unwrap();
1759 let number = quantity.try_to_integer(0, false).unwrap();
1760 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1761 }
1762 }
1763}