1use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34 WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38 LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use k8s_openapi::jiff::Timestamp;
41use kube::ResourceExt;
42use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
43use kube::client::Client;
44use kube::error::Error as K8sError;
45use kube::runtime::{WatchStreamExt, watcher};
46use maplit::btreemap;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
49use mz_orchestrator::{
50 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
51 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
52 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
53};
54use mz_ore::cast::CastInto;
55use mz_ore::retry::Retry;
56use mz_ore::task::AbortOnDropHandle;
57use serde::Deserialize;
58use sha2::{Digest, Sha256};
59use tokio::sync::{mpsc, oneshot};
60use tracing::{error, info, warn};
61
62pub mod cloud_resource_controller;
63pub mod secrets;
64pub mod util;
65
66const FIELD_MANAGER: &str = "environmentd";
67const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
68
69const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
70
71#[derive(Debug, Clone)]
73pub struct KubernetesOrchestratorConfig {
74 pub context: String,
77 pub scheduler_name: Option<String>,
79 pub service_annotations: BTreeMap<String, String>,
81 pub service_labels: BTreeMap<String, String>,
83 pub service_node_selector: BTreeMap<String, String>,
85 pub service_affinity: Option<String>,
87 pub service_tolerations: Option<String>,
89 pub service_account: Option<String>,
91 pub image_pull_policy: KubernetesImagePullPolicy,
93 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
96 pub coverage: bool,
98 pub ephemeral_volume_storage_class: Option<String>,
104 pub service_fs_group: Option<i64>,
106 pub name_prefix: Option<String>,
108 pub collect_pod_metrics: bool,
110 pub enable_prometheus_scrape_annotations: bool,
112}
113
114impl KubernetesOrchestratorConfig {
115 pub fn name_prefix(&self) -> String {
116 self.name_prefix.clone().unwrap_or_default()
117 }
118}
119
120#[derive(ValueEnum, Debug, Clone, Copy)]
122pub enum KubernetesImagePullPolicy {
123 Always,
125 IfNotPresent,
127 Never,
129}
130
131impl fmt::Display for KubernetesImagePullPolicy {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 match self {
134 KubernetesImagePullPolicy::Always => f.write_str("Always"),
135 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
136 KubernetesImagePullPolicy::Never => f.write_str("Never"),
137 }
138 }
139}
140
141impl KubernetesImagePullPolicy {
142 pub fn as_kebab_case_str(&self) -> &'static str {
143 match self {
144 Self::Always => "always",
145 Self::IfNotPresent => "if-not-present",
146 Self::Never => "never",
147 }
148 }
149}
150
151pub struct KubernetesOrchestrator {
153 client: Client,
154 kubernetes_namespace: String,
155 config: KubernetesOrchestratorConfig,
156 secret_api: Api<Secret>,
157 vpc_endpoint_api: Api<VpcEndpoint>,
158 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
159 resource_reader: Arc<KubernetesResourceReader>,
160}
161
162impl fmt::Debug for KubernetesOrchestrator {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 f.debug_struct("KubernetesOrchestrator").finish()
165 }
166}
167
168impl KubernetesOrchestrator {
169 pub async fn new(
171 config: KubernetesOrchestratorConfig,
172 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
173 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
174 let resource_reader =
175 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
176 Ok(KubernetesOrchestrator {
177 client: client.clone(),
178 kubernetes_namespace,
179 config,
180 secret_api: Api::default_namespaced(client.clone()),
181 vpc_endpoint_api: Api::default_namespaced(client),
182 namespaces: Mutex::new(BTreeMap::new()),
183 resource_reader,
184 })
185 }
186}
187
188impl Orchestrator for KubernetesOrchestrator {
189 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
190 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
191 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
192 let (command_tx, command_rx) = mpsc::unbounded_channel();
193 let worker = OrchestratorWorker {
194 metrics_api: Api::default_namespaced(self.client.clone()),
195 service_api: Api::default_namespaced(self.client.clone()),
196 stateful_set_api: Api::default_namespaced(self.client.clone()),
197 pod_api: Api::default_namespaced(self.client.clone()),
198 owner_references: vec![],
199 command_rx,
200 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
201 collect_pod_metrics: self.config.collect_pod_metrics,
202 }
203 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
204
205 Arc::new(NamespacedKubernetesOrchestrator {
206 pod_api: Api::default_namespaced(self.client.clone()),
207 kubernetes_namespace: self.kubernetes_namespace.clone(),
208 namespace: namespace.into(),
209 config: self.config.clone(),
210 scheduling_config: Default::default(),
212 service_infos: std::sync::Mutex::new(BTreeMap::new()),
213 command_tx,
214 _worker: worker,
215 })
216 }))
217 }
218}
219
220#[derive(Clone, Copy)]
221struct ServiceInfo {
222 scale: NonZero<u16>,
223}
224
225struct NamespacedKubernetesOrchestrator {
226 pod_api: Api<Pod>,
227 kubernetes_namespace: String,
228 namespace: String,
229 config: KubernetesOrchestratorConfig,
230 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
231 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
232 command_tx: mpsc::UnboundedSender<WorkerCommand>,
233 _worker: AbortOnDropHandle<()>,
234}
235
236impl fmt::Debug for NamespacedKubernetesOrchestrator {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 f.debug_struct("NamespacedKubernetesOrchestrator")
239 .field("kubernetes_namespace", &self.kubernetes_namespace)
240 .field("namespace", &self.namespace)
241 .field("config", &self.config)
242 .finish()
243 }
244}
245
246enum WorkerCommand {
252 EnsureService {
253 desc: ServiceDescription,
254 },
255 DropService {
256 name: String,
257 },
258 ListServices {
259 namespace: String,
260 result_tx: oneshot::Sender<Vec<String>>,
261 },
262 FetchServiceMetrics {
263 name: String,
264 info: ServiceInfo,
265 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
266 },
267}
268
269#[derive(Debug, Clone)]
271struct ServiceDescription {
272 name: String,
273 scale: NonZero<u16>,
274 service: K8sService,
275 stateful_set: StatefulSet,
276 pod_template_hash: String,
277}
278
279struct OrchestratorWorker {
291 metrics_api: Api<PodMetrics>,
292 service_api: Api<K8sService>,
293 stateful_set_api: Api<StatefulSet>,
294 pod_api: Api<Pod>,
295 owner_references: Vec<OwnerReference>,
296 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
297 name_prefix: String,
298 collect_pod_metrics: bool,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainer {
303 pub name: String,
304 pub usage: PodMetricsContainerUsage,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetricsContainerUsage {
309 pub cpu: Quantity,
310 pub memory: Quantity,
311}
312
313#[derive(Deserialize, Clone, Debug)]
314pub struct PodMetrics {
315 pub metadata: ObjectMeta,
316 pub timestamp: String,
317 pub window: String,
318 pub containers: Vec<PodMetricsContainer>,
319}
320
321impl k8s_openapi::Resource for PodMetrics {
322 const GROUP: &'static str = "metrics.k8s.io";
323 const KIND: &'static str = "PodMetrics";
324 const VERSION: &'static str = "v1beta1";
325 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
326 const URL_PATH_SEGMENT: &'static str = "pods";
327
328 type Scope = k8s_openapi::NamespaceResourceScope;
329}
330
331impl k8s_openapi::Metadata for PodMetrics {
332 type Ty = ObjectMeta;
333
334 fn metadata(&self) -> &Self::Ty {
335 &self.metadata
336 }
337
338 fn metadata_mut(&mut self) -> &mut Self::Ty {
339 &mut self.metadata
340 }
341}
342
343#[derive(Deserialize, Clone, Debug)]
352pub struct MetricIdentifier {
353 #[serde(rename = "metricName")]
354 pub name: String,
355 }
357
358#[derive(Deserialize, Clone, Debug)]
359pub struct MetricValue {
360 #[serde(rename = "describedObject")]
361 pub described_object: ObjectReference,
362 #[serde(flatten)]
363 pub metric_identifier: MetricIdentifier,
364 pub timestamp: String,
365 pub value: Quantity,
366 }
368
369impl NamespacedKubernetesOrchestrator {
370 fn service_name(&self, id: &str) -> String {
371 format!(
372 "{}{}-{id}",
373 self.config.name_prefix.as_deref().unwrap_or(""),
374 self.namespace
375 )
376 }
377
378 fn watch_pod_params(&self) -> watcher::Config {
381 let ns_selector = format!(
382 "environmentd.materialize.cloud/namespace={}",
383 self.namespace
384 );
385 watcher::Config::default().timeout(59).labels(&ns_selector)
387 }
388
389 fn make_label_key(&self, key: &str) -> String {
392 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
393 }
394
395 fn label_selector_to_k8s(
396 &self,
397 MzLabelSelector { label_name, logic }: MzLabelSelector,
398 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
399 let (operator, values) = match logic {
400 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
401 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
402 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
403 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
404 LabelSelectionLogic::InSet { values } => {
405 if values.is_empty() {
406 Err(anyhow!(
407 "Invalid selector logic for {label_name}: empty `in` set"
408 ))
409 } else {
410 Ok(("In", values))
411 }
412 }
413 LabelSelectionLogic::NotInSet { values } => {
414 if values.is_empty() {
415 Err(anyhow!(
416 "Invalid selector logic for {label_name}: empty `notin` set"
417 ))
418 } else {
419 Ok(("NotIn", values))
420 }
421 }
422 }?;
423 let lsr = LabelSelectorRequirement {
424 key: self.make_label_key(&label_name),
425 operator: operator.to_string(),
426 values: Some(values),
427 };
428 Ok(lsr)
429 }
430
431 fn send_command(&self, cmd: WorkerCommand) {
432 self.command_tx.send(cmd).expect("worker task not dropped");
433 }
434}
435
436#[derive(Debug)]
437struct ScaledQuantity {
438 integral_part: u64,
439 exponent: i8,
440 base10: bool,
441}
442
443impl ScaledQuantity {
444 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
445 if base10 != self.base10 {
446 return None;
447 }
448 let exponent = self.exponent - scale;
449 let mut result = self.integral_part;
450 let base = if self.base10 { 10 } else { 2 };
451 if exponent < 0 {
452 for _ in exponent..0 {
453 result /= base;
454 }
455 } else {
456 for _ in 0..exponent {
457 result = result.checked_mul(base)?;
458 }
459 }
460 Some(result)
461 }
462}
463
464fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
474 const DEC_SUFFIXES: &[(&str, i8)] = &[
475 ("n", -9),
476 ("u", -6),
477 ("m", -3),
478 ("", 0),
479 ("k", 3), ("M", 6),
481 ("G", 9),
482 ("T", 12),
483 ("P", 15),
484 ("E", 18),
485 ];
486 const BIN_SUFFIXES: &[(&str, i8)] = &[
487 ("", 0),
488 ("Ki", 10),
489 ("Mi", 20),
490 ("Gi", 30),
491 ("Ti", 40),
492 ("Pi", 50),
493 ("Ei", 60),
494 ];
495
496 let (positive, s) = match s.chars().next() {
497 Some('+') => (true, &s[1..]),
498 Some('-') => (false, &s[1..]),
499 _ => (true, s),
500 };
501
502 if !positive {
503 anyhow::bail!("Negative numbers not supported")
504 }
505
506 fn is_suffix_char(ch: char) -> bool {
507 "numkMGTPEKi".contains(ch)
508 }
509 let (num, suffix) = match s.find(is_suffix_char) {
510 None => (s, ""),
511 Some(idx) => s.split_at(idx),
512 };
513 let num: u64 = num.parse()?;
514 let (exponent, base10) = if let Some((_, exponent)) =
515 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
516 {
517 (exponent, true)
518 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
519 (exponent, false)
520 } else {
521 anyhow::bail!("Unrecognized suffix: {suffix}");
522 };
523 Ok(ScaledQuantity {
524 integral_part: num,
525 exponent: *exponent,
526 base10,
527 })
528}
529
530#[async_trait]
531impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
532 async fn fetch_service_metrics(
533 &self,
534 id: &str,
535 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
536 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
537 *info
538 } else {
539 tracing::error!("Failed to get info for {id}");
541 anyhow::bail!("Failed to get info for {id}");
542 };
543
544 let (result_tx, result_rx) = oneshot::channel();
545 self.send_command(WorkerCommand::FetchServiceMetrics {
546 name: self.service_name(id),
547 info,
548 result_tx,
549 });
550
551 let metrics = result_rx.await.expect("worker task not dropped");
552 Ok(metrics)
553 }
554
555 fn ensure_service(
556 &self,
557 id: &str,
558 ServiceConfig {
559 image,
560 init_container_image,
561 args,
562 ports: ports_in,
563 memory_limit,
564 memory_request,
565 cpu_limit,
566 cpu_request,
567 scale,
568 labels: labels_in,
569 annotations: annotations_in,
570 availability_zones,
571 other_replicas_selector,
572 replicas_selector,
573 disk_limit,
574 node_selector,
575 }: ServiceConfig,
576 ) -> Result<Box<dyn Service>, anyhow::Error> {
577 let scheduling_config: ServiceSchedulingConfig =
579 self.scheduling_config.read().expect("poisoned").clone();
580
581 let disk = disk_limit != Some(DiskLimit::ZERO);
583
584 let name = self.service_name(id);
585 let mut match_labels = btreemap! {
591 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
592 "environmentd.materialize.cloud/service-id".into() => id.into(),
593 };
594 for (key, value) in &self.config.service_labels {
595 match_labels.insert(key.clone(), value.clone());
596 }
597
598 let mut labels = match_labels.clone();
599 for (key, value) in labels_in {
600 labels.insert(self.make_label_key(&key), value);
601 }
602
603 labels.insert(self.make_label_key("scale"), scale.to_string());
604
605 for port in &ports_in {
606 labels.insert(
607 format!("environmentd.materialize.cloud/port-{}", port.name),
608 "true".into(),
609 );
610 }
611 let mut limits = BTreeMap::new();
612 let mut requests = BTreeMap::new();
613 if let Some(memory_limit) = memory_limit {
614 limits.insert(
615 "memory".into(),
616 Quantity(memory_limit.0.as_u64().to_string()),
617 );
618 requests.insert(
619 "memory".into(),
620 Quantity(memory_limit.0.as_u64().to_string()),
621 );
622 }
623 if let Some(memory_request) = memory_request {
624 requests.insert(
625 "memory".into(),
626 Quantity(memory_request.0.as_u64().to_string()),
627 );
628 }
629 if let Some(cpu_limit) = cpu_limit {
630 limits.insert(
631 "cpu".into(),
632 Quantity(format!("{}m", cpu_limit.as_millicpus())),
633 );
634 requests.insert(
635 "cpu".into(),
636 Quantity(format!("{}m", cpu_limit.as_millicpus())),
637 );
638 }
639 if let Some(cpu_request) = cpu_request {
640 requests.insert(
641 "cpu".into(),
642 Quantity(format!("{}m", cpu_request.as_millicpus())),
643 );
644 }
645 let service = K8sService {
646 metadata: ObjectMeta {
647 name: Some(name.clone()),
648 ..Default::default()
649 },
650 spec: Some(ServiceSpec {
651 ports: Some(
652 ports_in
653 .iter()
654 .map(|port| ServicePort {
655 port: port.port_hint.into(),
656 name: Some(port.name.clone()),
657 ..Default::default()
658 })
659 .collect(),
660 ),
661 cluster_ip: Some("None".to_string()),
662 selector: Some(match_labels.clone()),
663 ..Default::default()
664 }),
665 status: None,
666 };
667
668 let hosts = (0..scale.get())
669 .map(|i| {
670 format!(
671 "{name}-{i}.{name}.{}.svc.cluster.local",
672 self.kubernetes_namespace
673 )
674 })
675 .collect::<Vec<_>>();
676 let ports = ports_in
677 .iter()
678 .map(|p| (p.name.clone(), p.port_hint))
679 .collect::<BTreeMap<_, _>>();
680
681 let mut listen_addrs = BTreeMap::new();
682 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
683 for (name, port) in &ports {
684 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
685 for (i, host) in hosts.iter().enumerate() {
686 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
687 }
688 }
689 let mut args = args(ServiceAssignments {
690 listen_addrs: &listen_addrs,
691 peer_addrs: &peer_addrs,
692 });
693
694 let anti_affinity = Some({
703 let label_selector_requirements = other_replicas_selector
704 .clone()
705 .into_iter()
706 .map(|ls| self.label_selector_to_k8s(ls))
707 .collect::<Result<Vec<_>, _>>()?;
708 let ls = LabelSelector {
709 match_expressions: Some(label_selector_requirements),
710 ..Default::default()
711 };
712 let pat = PodAffinityTerm {
713 label_selector: Some(ls),
714 topology_key: "kubernetes.io/hostname".to_string(),
715 ..Default::default()
716 };
717
718 if !scheduling_config.soften_replication_anti_affinity {
719 PodAntiAffinity {
720 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
721 ..Default::default()
722 }
723 } else {
724 PodAntiAffinity {
725 preferred_during_scheduling_ignored_during_execution: Some(vec![
726 WeightedPodAffinityTerm {
727 weight: scheduling_config.soften_replication_anti_affinity_weight,
728 pod_affinity_term: pat,
729 },
730 ]),
731 ..Default::default()
732 }
733 }
734 });
735
736 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
737 let ls = LabelSelector {
739 match_labels: Some(match_labels.clone()),
740 ..Default::default()
741 };
742 let pat = PodAffinityTerm {
743 label_selector: Some(ls),
744 topology_key: "topology.kubernetes.io/zone".to_string(),
745 ..Default::default()
746 };
747
748 Some(PodAffinity {
749 preferred_during_scheduling_ignored_during_execution: Some(vec![
750 WeightedPodAffinityTerm {
751 weight,
752 pod_affinity_term: pat,
753 },
754 ]),
755 ..Default::default()
756 })
757 } else {
758 None
759 };
760
761 let topology_spread = if scheduling_config.topology_spread.enabled {
762 let config = &scheduling_config.topology_spread;
763
764 if !config.ignore_non_singular_scale || scale.get() == 1 {
765 let label_selector_requirements = (if config.ignore_non_singular_scale {
766 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
767
768 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
769 label_name: "scale".into(),
770 logic: mz_orchestrator::LabelSelectionLogic::Eq {
771 value: "1".to_string(),
772 },
773 });
774
775 replicas_selector_ignoring_scale
776 } else {
777 replicas_selector
778 })
779 .into_iter()
780 .map(|ls| self.label_selector_to_k8s(ls))
781 .collect::<Result<Vec<_>, _>>()?;
782 let ls = LabelSelector {
783 match_expressions: Some(label_selector_requirements),
784 ..Default::default()
785 };
786
787 let constraint = TopologySpreadConstraint {
788 label_selector: Some(ls),
789 min_domains: config.min_domains,
790 max_skew: config.max_skew,
791 topology_key: "topology.kubernetes.io/zone".to_string(),
792 when_unsatisfiable: if config.soft {
793 "ScheduleAnyway".to_string()
794 } else {
795 "DoNotSchedule".to_string()
796 },
797 match_label_keys: None,
807 ..Default::default()
810 };
811 Some(vec![constraint])
812 } else {
813 None
814 }
815 } else {
816 None
817 };
818
819 let mut pod_annotations = btreemap! {
820 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
826 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
827
828 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
830 };
831 for (key, value) in annotations_in {
832 pod_annotations.insert(self.make_label_key(&key), value);
834 }
835 if self.config.enable_prometheus_scrape_annotations {
836 if let Some(internal_http_port) = ports_in
837 .iter()
838 .find(|port| port.name == "internal-http")
839 .map(|port| port.port_hint.to_string())
840 {
841 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
843 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
844 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
845 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
846 }
847 }
848 for (key, value) in &self.config.service_annotations {
849 pod_annotations.insert(key.clone(), value.clone());
850 }
851
852 let default_node_selector = if disk {
853 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
854 } else {
855 vec![]
859 };
860
861 let node_selector: BTreeMap<String, String> = default_node_selector
862 .into_iter()
863 .chain(self.config.service_node_selector.clone())
864 .chain(node_selector)
865 .collect();
866
867 let node_affinity = if let Some(availability_zones) = availability_zones {
868 let selector = NodeSelectorTerm {
869 match_expressions: Some(vec![NodeSelectorRequirement {
870 key: "materialize.cloud/availability-zone".to_string(),
871 operator: "In".to_string(),
872 values: Some(availability_zones),
873 }]),
874 match_fields: None,
875 };
876
877 if scheduling_config.soften_az_affinity {
878 Some(NodeAffinity {
879 preferred_during_scheduling_ignored_during_execution: Some(vec![
880 PreferredSchedulingTerm {
881 preference: selector,
882 weight: scheduling_config.soften_az_affinity_weight,
883 },
884 ]),
885 required_during_scheduling_ignored_during_execution: None,
886 })
887 } else {
888 Some(NodeAffinity {
889 preferred_during_scheduling_ignored_during_execution: None,
890 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
891 node_selector_terms: vec![selector],
892 }),
893 })
894 }
895 } else {
896 None
897 };
898
899 let mut affinity = Affinity {
900 pod_anti_affinity: anti_affinity,
901 pod_affinity,
902 node_affinity,
903 ..Default::default()
904 };
905 if let Some(service_affinity) = &self.config.service_affinity {
906 affinity.merge_from(serde_json::from_str(service_affinity)?);
907 }
908
909 let container_name = image
910 .rsplit_once('/')
911 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
912 .context("`image` is not ORG/NAME:VERSION")?
913 .0
914 .to_string();
915
916 let container_security_context = if scheduling_config.security_context_enabled {
917 Some(SecurityContext {
918 privileged: Some(false),
919 run_as_non_root: Some(true),
920 allow_privilege_escalation: Some(false),
921 seccomp_profile: Some(SeccompProfile {
922 type_: "RuntimeDefault".to_string(),
923 ..Default::default()
924 }),
925 capabilities: Some(Capabilities {
926 drop: Some(vec!["ALL".to_string()]),
927 ..Default::default()
928 }),
929 ..Default::default()
930 })
931 } else {
932 None
933 };
934
935 let init_containers = init_container_image.map(|image| {
936 vec![Container {
937 name: "init".to_string(),
938 image: Some(image),
939 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
940 resources: Some(ResourceRequirements {
941 claims: None,
942 limits: Some(limits.clone()),
943 requests: Some(requests.clone()),
944 }),
945 security_context: container_security_context.clone(),
946 env: Some(vec![
947 EnvVar {
948 name: "MZ_NAMESPACE".to_string(),
949 value_from: Some(EnvVarSource {
950 field_ref: Some(ObjectFieldSelector {
951 field_path: "metadata.namespace".to_string(),
952 ..Default::default()
953 }),
954 ..Default::default()
955 }),
956 ..Default::default()
957 },
958 EnvVar {
959 name: "MZ_POD_NAME".to_string(),
960 value_from: Some(EnvVarSource {
961 field_ref: Some(ObjectFieldSelector {
962 field_path: "metadata.name".to_string(),
963 ..Default::default()
964 }),
965 ..Default::default()
966 }),
967 ..Default::default()
968 },
969 EnvVar {
970 name: "MZ_NODE_NAME".to_string(),
971 value_from: Some(EnvVarSource {
972 field_ref: Some(ObjectFieldSelector {
973 field_path: "spec.nodeName".to_string(),
974 ..Default::default()
975 }),
976 ..Default::default()
977 }),
978 ..Default::default()
979 },
980 ]),
981 ..Default::default()
982 }]
983 });
984
985 let env = if self.config.coverage {
986 Some(vec![EnvVar {
987 name: "LLVM_PROFILE_FILE".to_string(),
988 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
989 ..Default::default()
990 }])
991 } else {
992 None
993 };
994
995 let mut volume_mounts = vec![];
996
997 if self.config.coverage {
998 volume_mounts.push(VolumeMount {
999 name: "coverage".to_string(),
1000 mount_path: "/coverage".to_string(),
1001 ..Default::default()
1002 })
1003 }
1004
1005 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1006 (true, Some(ephemeral_volume_storage_class)) => {
1007 volume_mounts.push(VolumeMount {
1008 name: "scratch".to_string(),
1009 mount_path: "/scratch".to_string(),
1010 ..Default::default()
1011 });
1012 args.push("--scratch-directory=/scratch".into());
1013
1014 Some(vec![Volume {
1015 name: "scratch".to_string(),
1016 ephemeral: Some(EphemeralVolumeSource {
1017 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1018 spec: PersistentVolumeClaimSpec {
1019 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1020 storage_class_name: Some(
1021 ephemeral_volume_storage_class.to_string(),
1022 ),
1023 resources: Some(VolumeResourceRequirements {
1024 requests: Some(BTreeMap::from([(
1025 "storage".to_string(),
1026 Quantity(
1027 disk_limit
1028 .unwrap_or(DiskLimit::ARBITRARY)
1029 .0
1030 .as_u64()
1031 .to_string(),
1032 ),
1033 )])),
1034 ..Default::default()
1035 }),
1036 ..Default::default()
1037 },
1038 ..Default::default()
1039 }),
1040 ..Default::default()
1041 }),
1042 ..Default::default()
1043 }])
1044 }
1045 (true, None) => {
1046 return Err(anyhow!(
1047 "service requested disk but no ephemeral volume storage class was configured"
1048 ));
1049 }
1050 (false, _) => None,
1051 };
1052
1053 if let Some(name_prefix) = &self.config.name_prefix {
1054 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1055 }
1056
1057 let volume_claim_templates = if self.config.coverage {
1058 Some(vec![PersistentVolumeClaim {
1059 metadata: ObjectMeta {
1060 name: Some("coverage".to_string()),
1061 ..Default::default()
1062 },
1063 spec: Some(PersistentVolumeClaimSpec {
1064 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1065 resources: Some(VolumeResourceRequirements {
1066 requests: Some(BTreeMap::from([(
1067 "storage".to_string(),
1068 Quantity("10Gi".to_string()),
1069 )])),
1070 ..Default::default()
1071 }),
1072 ..Default::default()
1073 }),
1074 ..Default::default()
1075 }])
1076 } else {
1077 None
1078 };
1079
1080 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1081 Some(PodSecurityContext {
1082 fs_group: Some(fs_group),
1083 run_as_user: Some(fs_group),
1084 run_as_group: Some(fs_group),
1085 ..Default::default()
1086 })
1087 } else {
1088 None
1089 };
1090
1091 let mut tolerations = vec![
1092 Toleration {
1097 effect: Some("NoExecute".into()),
1098 key: Some("node.kubernetes.io/not-ready".into()),
1099 operator: Some("Exists".into()),
1100 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1101 value: None,
1102 },
1103 Toleration {
1104 effect: Some("NoExecute".into()),
1105 key: Some("node.kubernetes.io/unreachable".into()),
1106 operator: Some("Exists".into()),
1107 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1108 value: None,
1109 },
1110 ];
1111 if let Some(service_tolerations) = &self.config.service_tolerations {
1112 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1113 }
1114 let tolerations = Some(tolerations);
1115
1116 let mut pod_template_spec = PodTemplateSpec {
1117 metadata: Some(ObjectMeta {
1118 labels: Some(labels.clone()),
1119 ..Default::default()
1122 }),
1123 spec: Some(PodSpec {
1124 init_containers,
1125 containers: vec![Container {
1126 name: container_name,
1127 image: Some(image),
1128 args: Some(args),
1129 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1130 ports: Some(
1131 ports_in
1132 .iter()
1133 .map(|port| ContainerPort {
1134 container_port: port.port_hint.into(),
1135 name: Some(port.name.clone()),
1136 ..Default::default()
1137 })
1138 .collect(),
1139 ),
1140 security_context: container_security_context.clone(),
1141 resources: Some(ResourceRequirements {
1142 claims: None,
1143 limits: Some(limits),
1144 requests: Some(requests),
1145 }),
1146 volume_mounts: if !volume_mounts.is_empty() {
1147 Some(volume_mounts)
1148 } else {
1149 None
1150 },
1151 env,
1152 ..Default::default()
1153 }],
1154 volumes,
1155 security_context,
1156 node_selector: Some(node_selector),
1157 scheduler_name: self.config.scheduler_name.clone(),
1158 service_account: self.config.service_account.clone(),
1159 affinity: Some(affinity),
1160 topology_spread_constraints: topology_spread,
1161 tolerations,
1162 termination_grace_period_seconds: Some(0),
1184 ..Default::default()
1185 }),
1186 };
1187 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1188 let mut hasher = Sha256::new();
1189 hasher.update(pod_template_json);
1190 let pod_template_hash = format!("{:x}", hasher.finalize());
1191 pod_annotations.insert(
1192 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1193 pod_template_hash.clone(),
1194 );
1195
1196 pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1197
1198 let stateful_set = StatefulSet {
1199 metadata: ObjectMeta {
1200 name: Some(name.clone()),
1201 ..Default::default()
1202 },
1203 spec: Some(StatefulSetSpec {
1204 selector: LabelSelector {
1205 match_labels: Some(match_labels),
1206 ..Default::default()
1207 },
1208 service_name: Some(name.clone()),
1209 replicas: Some(scale.cast_into()),
1210 template: pod_template_spec,
1211 update_strategy: Some(StatefulSetUpdateStrategy {
1212 type_: Some("OnDelete".to_owned()),
1213 ..Default::default()
1214 }),
1215 pod_management_policy: Some("Parallel".to_string()),
1216 volume_claim_templates,
1217 ..Default::default()
1218 }),
1219 status: None,
1220 };
1221
1222 self.send_command(WorkerCommand::EnsureService {
1223 desc: ServiceDescription {
1224 name,
1225 scale,
1226 service,
1227 stateful_set,
1228 pod_template_hash,
1229 },
1230 });
1231
1232 self.service_infos
1233 .lock()
1234 .expect("poisoned lock")
1235 .insert(id.to_string(), ServiceInfo { scale });
1236
1237 Ok(Box::new(KubernetesService { hosts, ports }))
1238 }
1239
1240 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1242 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1243 self.service_infos.lock().expect("poisoned lock").remove(id);
1244
1245 self.send_command(WorkerCommand::DropService {
1246 name: self.service_name(id),
1247 });
1248
1249 Ok(())
1250 }
1251
1252 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1254 let (result_tx, result_rx) = oneshot::channel();
1255 self.send_command(WorkerCommand::ListServices {
1256 namespace: self.namespace.clone(),
1257 result_tx,
1258 });
1259
1260 let list = result_rx.await.expect("worker task not dropped");
1261 Ok(list)
1262 }
1263
1264 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1265 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1266 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1267 let service_id_label = "environmentd.materialize.cloud/service-id";
1268 let service_id = pod
1269 .labels()
1270 .get(service_id_label)
1271 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1272 .clone();
1273
1274 let oomed = pod
1275 .status
1276 .as_ref()
1277 .and_then(|status| status.container_statuses.as_ref())
1278 .map(|container_statuses| {
1279 container_statuses.iter().any(|cs| {
1280 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1284 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1285 let termination_state = current_state.or(last_state);
1286
1287 let exit_code = termination_state.map(|s| s.exit_code);
1294 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1295 })
1296 })
1297 .unwrap_or(false);
1298
1299 let (pod_ready, last_probe_time) = pod
1300 .status
1301 .and_then(|status| status.conditions)
1302 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1303 .map(|c| (c.status == "True", c.last_probe_time))
1304 .unwrap_or((false, None));
1305
1306 let status = if pod_ready {
1307 ServiceStatus::Online
1308 } else {
1309 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1310 };
1311 let time = if let Some(time) = last_probe_time {
1312 time.0
1313 } else {
1314 Timestamp::now()
1315 };
1316
1317 Ok(ServiceEvent {
1318 service_id,
1319 process_id,
1320 status,
1321 time: DateTime::from_timestamp_nanos(
1322 time.as_nanosecond().try_into().expect("must fit"),
1323 ),
1324 })
1325 }
1326
1327 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1328 .touched_objects()
1329 .filter_map(|object| async move {
1330 match object {
1331 Ok(pod) => Some(into_service_event(pod)),
1332 Err(error) => {
1333 tracing::warn!("service watch error: {error}");
1336 None
1337 }
1338 }
1339 });
1340 Box::pin(stream)
1341 }
1342
1343 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1344 *self.scheduling_config.write().expect("poisoned") = config;
1345 }
1346}
1347
1348impl OrchestratorWorker {
1349 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1350 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1351 }
1352
1353 async fn run(mut self) {
1354 {
1355 info!("initializing Kubernetes orchestrator worker");
1356 let start = Instant::now();
1357
1358 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1362 let orchestrator_pod = Retry::default()
1363 .clamp_backoff(Duration::from_secs(10))
1364 .retry_async(|_| self.pod_api.get(&hostname))
1365 .await
1366 .expect("always retries on error");
1367 self.owner_references
1368 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1369
1370 info!(
1371 "Kubernetes orchestrator worker initialized in {:?}",
1372 start.elapsed()
1373 );
1374 }
1375
1376 while let Some(cmd) = self.command_rx.recv().await {
1377 self.handle_command(cmd).await;
1378 }
1379 }
1380
1381 async fn handle_command(&self, cmd: WorkerCommand) {
1387 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1388 where
1389 F: Fn() -> U,
1390 U: Future<Output = Result<R, K8sError>>,
1391 {
1392 Retry::default()
1393 .clamp_backoff(Duration::from_secs(10))
1394 .retry_async(|_| {
1395 f().map_err(
1396 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1397 )
1398 })
1399 .await
1400 .expect("always retries on error")
1401 }
1402
1403 use WorkerCommand::*;
1404 match cmd {
1405 EnsureService { desc } => {
1406 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1407 }
1408 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1409 ListServices {
1410 namespace,
1411 result_tx,
1412 } => {
1413 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1414 let _ = result_tx.send(result);
1415 }
1416 FetchServiceMetrics {
1417 name,
1418 info,
1419 result_tx,
1420 } => {
1421 let result = self.fetch_service_metrics(&name, &info).await;
1422 let _ = result_tx.send(result);
1423 }
1424 }
1425 }
1426
1427 async fn fetch_service_metrics(
1428 &self,
1429 name: &str,
1430 info: &ServiceInfo,
1431 ) -> Vec<ServiceProcessMetrics> {
1432 if !self.collect_pod_metrics {
1433 return (0..info.scale.get())
1434 .map(|_| ServiceProcessMetrics::default())
1435 .collect();
1436 }
1437
1438 #[derive(Deserialize)]
1440 pub(crate) struct ClusterdUsage {
1441 disk_bytes: Option<u64>,
1442 memory_bytes: Option<u64>,
1443 swap_bytes: Option<u64>,
1444 heap_limit: Option<u64>,
1445 }
1446
1447 async fn get_metrics(
1453 self_: &OrchestratorWorker,
1454 service_name: &str,
1455 i: usize,
1456 ) -> ServiceProcessMetrics {
1457 let name = format!("{service_name}-{i}");
1458
1459 let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1460 let (metrics, clusterd_usage) =
1461 match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1462 {
1463 (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1464 (Ok(metrics), Err(e)) => {
1465 warn!("Failed to fetch clusterd usage for {name}: {e}");
1466 (metrics, None)
1467 }
1468 (Err(e), _) => {
1469 warn!("Failed to get metrics for {name}: {e}");
1470 return ServiceProcessMetrics::default();
1471 }
1472 };
1473 let Some(PodMetricsContainer {
1474 usage:
1475 PodMetricsContainerUsage {
1476 cpu: Quantity(cpu_str),
1477 memory: Quantity(mem_str),
1478 },
1479 ..
1480 }) = metrics.containers.get(0)
1481 else {
1482 warn!("metrics result contained no containers for {name}");
1483 return ServiceProcessMetrics::default();
1484 };
1485
1486 let mut process_metrics = ServiceProcessMetrics::default();
1487
1488 match parse_k8s_quantity(cpu_str) {
1489 Ok(q) => match q.try_to_integer(-9, true) {
1490 Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1491 None => error!("CPU value {q:?} out of range"),
1492 },
1493 Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1494 }
1495 match parse_k8s_quantity(mem_str) {
1496 Ok(q) => match q.try_to_integer(0, false) {
1497 Some(mem) => process_metrics.memory_bytes = Some(mem),
1498 None => error!("memory value {q:?} out of range"),
1499 },
1500 Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1501 }
1502
1503 if let Some(usage) = clusterd_usage {
1504 process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1510 (Some(disk), Some(swap)) => Some(disk + swap),
1511 (disk, swap) => disk.or(swap),
1512 };
1513
1514 process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1517 (Some(memory), Some(swap)) => Some(memory + swap),
1518 (Some(memory), None) => Some(memory),
1519 (None, _) => None,
1520 };
1521
1522 process_metrics.heap_limit = usage.heap_limit;
1523 }
1524
1525 process_metrics
1526 }
1527
1528 async fn get_clusterd_usage(
1534 self_: &OrchestratorWorker,
1535 service_name: &str,
1536 i: usize,
1537 ) -> anyhow::Result<ClusterdUsage> {
1538 let service = self_
1539 .service_api
1540 .get(service_name)
1541 .await
1542 .with_context(|| format!("failed to get service {service_name}"))?;
1543 let namespace = service
1544 .metadata
1545 .namespace
1546 .context("missing service namespace")?;
1547 let internal_http_port = service
1548 .spec
1549 .and_then(|spec| spec.ports)
1550 .and_then(|ports| {
1551 ports
1552 .into_iter()
1553 .find(|p| p.name == Some("internal-http".into()))
1554 })
1555 .map(|p| p.port);
1556 let Some(port) = internal_http_port else {
1557 bail!("internal-http port missing in service spec");
1558 };
1559 let metrics_url = format!(
1560 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1561 /api/usage-metrics"
1562 );
1563
1564 let http_client = reqwest::Client::builder()
1565 .timeout(Duration::from_secs(10))
1566 .build()
1567 .context("error building HTTP client")?;
1568 let resp = http_client.get(metrics_url).send().await?;
1569 let usage = resp.json().await?;
1570
1571 Ok(usage)
1572 }
1573
1574 let ret = futures::future::join_all(
1575 (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1576 );
1577
1578 ret.await
1579 }
1580
1581 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1582 desc.service
1588 .metadata
1589 .owner_references
1590 .get_or_insert(vec![])
1591 .extend(self.owner_references.iter().cloned());
1592 desc.stateful_set
1593 .metadata
1594 .owner_references
1595 .get_or_insert(vec![])
1596 .extend(self.owner_references.iter().cloned());
1597
1598 let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1599 let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1600 let pod_annotations = pod_metadata.annotations.clone();
1601
1602 self.service_api
1603 .patch(
1604 &desc.name,
1605 &PatchParams::apply(FIELD_MANAGER).force(),
1606 &Patch::Apply(desc.service),
1607 )
1608 .await?;
1609 self.stateful_set_api
1610 .patch(
1611 &desc.name,
1612 &PatchParams::apply(FIELD_MANAGER).force(),
1613 &Patch::Apply(desc.stateful_set),
1614 )
1615 .await?;
1616
1617 for pod_id in 0..desc.scale.get() {
1628 let pod_name = format!("{}-{pod_id}", desc.name);
1629 let pod = match self.pod_api.get(&pod_name).await {
1630 Ok(pod) => pod,
1631 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1633 Err(e) => return Err(e),
1634 };
1635
1636 let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1637 != Some(&desc.pod_template_hash)
1638 {
1639 self.pod_api
1640 .delete(&pod_name, &DeleteParams::default())
1641 .await
1642 .map(|_| ())
1643 } else {
1644 let metadata = ObjectMeta {
1645 annotations: pod_annotations.clone(),
1646 ..Default::default()
1647 }
1648 .into_request_partial::<Pod>();
1649 self.pod_api
1650 .patch_metadata(
1651 &pod_name,
1652 &PatchParams::apply(FIELD_MANAGER).force(),
1653 &Patch::Apply(&metadata),
1654 )
1655 .await
1656 .map(|_| ())
1657 };
1658
1659 match result {
1660 Ok(()) => (),
1661 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1663 Err(e) => return Err(e),
1664 }
1665 }
1666
1667 Ok(())
1668 }
1669
1670 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1671 let res = self
1672 .stateful_set_api
1673 .delete(name, &DeleteParams::default())
1674 .await;
1675 match res {
1676 Ok(_) => (),
1677 Err(K8sError::Api(e)) if e.code == 404 => (),
1678 Err(e) => return Err(e),
1679 }
1680
1681 let res = self
1682 .service_api
1683 .delete(name, &DeleteParams::default())
1684 .await;
1685 match res {
1686 Ok(_) => Ok(()),
1687 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1688 Err(e) => Err(e),
1689 }
1690 }
1691
1692 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1693 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1694 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1695 Ok(stateful_sets
1696 .into_iter()
1697 .filter_map(|ss| {
1698 ss.metadata
1699 .name
1700 .unwrap()
1701 .strip_prefix(&name_prefix)
1702 .map(Into::into)
1703 })
1704 .collect())
1705 }
1706}
1707
1708#[derive(Debug, Clone)]
1709struct KubernetesService {
1710 hosts: Vec<String>,
1711 ports: BTreeMap<String, u16>,
1712}
1713
1714impl Service for KubernetesService {
1715 fn addresses(&self, port: &str) -> Vec<String> {
1716 let port = self.ports[port];
1717 self.hosts
1718 .iter()
1719 .map(|host| format!("{host}:{port}"))
1720 .collect()
1721 }
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use super::*;
1727
1728 #[mz_ore::test]
1729 fn k8s_quantity_base10_large() {
1730 let cases = &[
1731 ("42", 42),
1732 ("42k", 42000),
1733 ("42M", 42000000),
1734 ("42G", 42000000000),
1735 ("42T", 42000000000000),
1736 ("42P", 42000000000000000),
1737 ];
1738
1739 for (input, expected) in cases {
1740 let quantity = parse_k8s_quantity(input).unwrap();
1741 let number = quantity.try_to_integer(0, true).unwrap();
1742 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1743 }
1744 }
1745
1746 #[mz_ore::test]
1747 fn k8s_quantity_base10_small() {
1748 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1749
1750 for (input, expected) in cases {
1751 let quantity = parse_k8s_quantity(input).unwrap();
1752 let number = quantity.try_to_integer(-9, true).unwrap();
1753 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1754 }
1755 }
1756
1757 #[mz_ore::test]
1758 fn k8s_quantity_base2() {
1759 let cases = &[
1760 ("42Ki", 42 << 10),
1761 ("42Mi", 42 << 20),
1762 ("42Gi", 42 << 30),
1763 ("42Ti", 42 << 40),
1764 ("42Pi", 42 << 50),
1765 ];
1766
1767 for (input, expected) in cases {
1768 let quantity = parse_k8s_quantity(input).unwrap();
1769 let number = quantity.try_to_integer(0, false).unwrap();
1770 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1771 }
1772 }
1773}