1use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34 WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38 LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use k8s_openapi::jiff::Timestamp;
41use kube::ResourceExt;
42use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
43use kube::client::Client;
44use kube::error::Error as K8sError;
45use kube::runtime::{WatchStreamExt, watcher};
46use maplit::btreemap;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
49use mz_orchestrator::{
50 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
51 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
52 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
53};
54use mz_ore::cast::CastInto;
55use mz_ore::retry::Retry;
56use mz_ore::task::AbortOnDropHandle;
57use serde::Deserialize;
58use sha2::{Digest, Sha256};
59use tokio::sync::{mpsc, oneshot};
60use tracing::{error, info, warn};
61
62pub mod cloud_resource_controller;
63pub mod secrets;
64pub mod util;
65
66const FIELD_MANAGER: &str = "environmentd";
67const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
68
69const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
70
71#[derive(Debug, Clone)]
73pub struct KubernetesOrchestratorConfig {
74 pub context: String,
77 pub scheduler_name: Option<String>,
79 pub service_annotations: BTreeMap<String, String>,
81 pub service_labels: BTreeMap<String, String>,
83 pub service_node_selector: BTreeMap<String, String>,
85 pub service_affinity: Option<String>,
87 pub service_tolerations: Option<String>,
89 pub service_account: Option<String>,
91 pub image_pull_policy: KubernetesImagePullPolicy,
93 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
96 pub coverage: bool,
98 pub ephemeral_volume_storage_class: Option<String>,
104 pub service_fs_group: Option<i64>,
106 pub name_prefix: Option<String>,
108 pub collect_pod_metrics: bool,
110 pub enable_prometheus_scrape_annotations: bool,
112}
113
114impl KubernetesOrchestratorConfig {
115 pub fn name_prefix(&self) -> String {
116 self.name_prefix.clone().unwrap_or_default()
117 }
118}
119
120#[derive(ValueEnum, Debug, Clone, Copy)]
122pub enum KubernetesImagePullPolicy {
123 Always,
125 IfNotPresent,
127 Never,
129}
130
131impl fmt::Display for KubernetesImagePullPolicy {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 match self {
134 KubernetesImagePullPolicy::Always => f.write_str("Always"),
135 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
136 KubernetesImagePullPolicy::Never => f.write_str("Never"),
137 }
138 }
139}
140
141impl KubernetesImagePullPolicy {
142 pub fn as_kebab_case_str(&self) -> &'static str {
143 match self {
144 Self::Always => "always",
145 Self::IfNotPresent => "if-not-present",
146 Self::Never => "never",
147 }
148 }
149}
150
151pub struct KubernetesOrchestrator {
153 client: Client,
154 kubernetes_namespace: String,
155 config: KubernetesOrchestratorConfig,
156 secret_api: Api<Secret>,
157 vpc_endpoint_api: Api<VpcEndpoint>,
158 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
159 resource_reader: Arc<KubernetesResourceReader>,
160}
161
162impl fmt::Debug for KubernetesOrchestrator {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 f.debug_struct("KubernetesOrchestrator").finish()
165 }
166}
167
168impl KubernetesOrchestrator {
169 pub async fn new(
171 config: KubernetesOrchestratorConfig,
172 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
173 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
174 let resource_reader =
175 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
176 Ok(KubernetesOrchestrator {
177 client: client.clone(),
178 kubernetes_namespace,
179 config,
180 secret_api: Api::default_namespaced(client.clone()),
181 vpc_endpoint_api: Api::default_namespaced(client),
182 namespaces: Mutex::new(BTreeMap::new()),
183 resource_reader,
184 })
185 }
186}
187
188impl Orchestrator for KubernetesOrchestrator {
189 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
190 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
191 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
192 let (command_tx, command_rx) = mpsc::unbounded_channel();
193 let worker = OrchestratorWorker {
194 metrics_api: Api::default_namespaced(self.client.clone()),
195 service_api: Api::default_namespaced(self.client.clone()),
196 stateful_set_api: Api::default_namespaced(self.client.clone()),
197 pod_api: Api::default_namespaced(self.client.clone()),
198 owner_references: vec![],
199 command_rx,
200 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
201 collect_pod_metrics: self.config.collect_pod_metrics,
202 }
203 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
204
205 Arc::new(NamespacedKubernetesOrchestrator {
206 pod_api: Api::default_namespaced(self.client.clone()),
207 kubernetes_namespace: self.kubernetes_namespace.clone(),
208 namespace: namespace.into(),
209 config: self.config.clone(),
210 scheduling_config: Default::default(),
212 service_infos: std::sync::Mutex::new(BTreeMap::new()),
213 command_tx,
214 _worker: worker,
215 })
216 }))
217 }
218}
219
220#[derive(Clone, Copy)]
221struct ServiceInfo {
222 scale: NonZero<u16>,
223}
224
225struct NamespacedKubernetesOrchestrator {
226 pod_api: Api<Pod>,
227 kubernetes_namespace: String,
228 namespace: String,
229 config: KubernetesOrchestratorConfig,
230 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
231 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
232 command_tx: mpsc::UnboundedSender<WorkerCommand>,
233 _worker: AbortOnDropHandle<()>,
234}
235
236impl fmt::Debug for NamespacedKubernetesOrchestrator {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 f.debug_struct("NamespacedKubernetesOrchestrator")
239 .field("kubernetes_namespace", &self.kubernetes_namespace)
240 .field("namespace", &self.namespace)
241 .field("config", &self.config)
242 .finish()
243 }
244}
245
246enum WorkerCommand {
252 EnsureService {
253 desc: ServiceDescription,
254 },
255 DropService {
256 name: String,
257 },
258 ListServices {
259 namespace: String,
260 result_tx: oneshot::Sender<Vec<String>>,
261 },
262 FetchServiceMetrics {
263 name: String,
264 info: ServiceInfo,
265 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
266 },
267}
268
269#[derive(Debug, Clone)]
271struct ServiceDescription {
272 name: String,
273 scale: NonZero<u16>,
274 service: K8sService,
275 stateful_set: StatefulSet,
276 pod_template_hash: String,
277}
278
279struct OrchestratorWorker {
291 metrics_api: Api<PodMetrics>,
292 service_api: Api<K8sService>,
293 stateful_set_api: Api<StatefulSet>,
294 pod_api: Api<Pod>,
295 owner_references: Vec<OwnerReference>,
296 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
297 name_prefix: String,
298 collect_pod_metrics: bool,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainer {
303 pub name: String,
304 pub usage: PodMetricsContainerUsage,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetricsContainerUsage {
309 pub cpu: Quantity,
310 pub memory: Quantity,
311}
312
313#[derive(Deserialize, Clone, Debug)]
314pub struct PodMetrics {
315 pub metadata: ObjectMeta,
316 pub timestamp: String,
317 pub window: String,
318 pub containers: Vec<PodMetricsContainer>,
319}
320
321impl k8s_openapi::Resource for PodMetrics {
322 const GROUP: &'static str = "metrics.k8s.io";
323 const KIND: &'static str = "PodMetrics";
324 const VERSION: &'static str = "v1beta1";
325 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
326 const URL_PATH_SEGMENT: &'static str = "pods";
327
328 type Scope = k8s_openapi::NamespaceResourceScope;
329}
330
331impl k8s_openapi::Metadata for PodMetrics {
332 type Ty = ObjectMeta;
333
334 fn metadata(&self) -> &Self::Ty {
335 &self.metadata
336 }
337
338 fn metadata_mut(&mut self) -> &mut Self::Ty {
339 &mut self.metadata
340 }
341}
342
343#[derive(Deserialize, Clone, Debug)]
352pub struct MetricIdentifier {
353 #[serde(rename = "metricName")]
354 pub name: String,
355 }
357
358#[derive(Deserialize, Clone, Debug)]
359pub struct MetricValue {
360 #[serde(rename = "describedObject")]
361 pub described_object: ObjectReference,
362 #[serde(flatten)]
363 pub metric_identifier: MetricIdentifier,
364 pub timestamp: String,
365 pub value: Quantity,
366 }
368
369impl NamespacedKubernetesOrchestrator {
370 fn service_name(&self, id: &str) -> String {
371 format!(
372 "{}{}-{id}",
373 self.config.name_prefix.as_deref().unwrap_or(""),
374 self.namespace
375 )
376 }
377
378 fn watch_pod_params(&self) -> watcher::Config {
381 let ns_selector = format!(
382 "environmentd.materialize.cloud/namespace={}",
383 self.namespace
384 );
385 watcher::Config::default().timeout(59).labels(&ns_selector)
387 }
388
389 fn make_label_key(&self, key: &str) -> String {
392 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
393 }
394
395 fn label_selector_to_k8s(
396 &self,
397 MzLabelSelector { label_name, logic }: MzLabelSelector,
398 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
399 let (operator, values) = match logic {
400 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
401 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
402 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
403 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
404 LabelSelectionLogic::InSet { values } => {
405 if values.is_empty() {
406 Err(anyhow!(
407 "Invalid selector logic for {label_name}: empty `in` set"
408 ))
409 } else {
410 Ok(("In", values))
411 }
412 }
413 LabelSelectionLogic::NotInSet { values } => {
414 if values.is_empty() {
415 Err(anyhow!(
416 "Invalid selector logic for {label_name}: empty `notin` set"
417 ))
418 } else {
419 Ok(("NotIn", values))
420 }
421 }
422 }?;
423 let lsr = LabelSelectorRequirement {
424 key: self.make_label_key(&label_name),
425 operator: operator.to_string(),
426 values: Some(values),
427 };
428 Ok(lsr)
429 }
430
431 fn send_command(&self, cmd: WorkerCommand) {
432 self.command_tx.send(cmd).expect("worker task not dropped");
433 }
434}
435
436#[derive(Debug)]
437struct ScaledQuantity {
438 integral_part: u64,
439 exponent: i8,
440 base10: bool,
441}
442
443impl ScaledQuantity {
444 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
445 if base10 != self.base10 {
446 return None;
447 }
448 let exponent = self.exponent - scale;
449 let mut result = self.integral_part;
450 let base = if self.base10 { 10 } else { 2 };
451 if exponent < 0 {
452 for _ in exponent..0 {
453 result /= base;
454 }
455 } else {
456 for _ in 0..exponent {
457 result = result.checked_mul(base)?;
458 }
459 }
460 Some(result)
461 }
462}
463
464fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
474 const DEC_SUFFIXES: &[(&str, i8)] = &[
475 ("n", -9),
476 ("u", -6),
477 ("m", -3),
478 ("", 0),
479 ("k", 3), ("M", 6),
481 ("G", 9),
482 ("T", 12),
483 ("P", 15),
484 ("E", 18),
485 ];
486 const BIN_SUFFIXES: &[(&str, i8)] = &[
487 ("", 0),
488 ("Ki", 10),
489 ("Mi", 20),
490 ("Gi", 30),
491 ("Ti", 40),
492 ("Pi", 50),
493 ("Ei", 60),
494 ];
495
496 let (positive, s) = match s.chars().next() {
497 Some('+') => (true, &s[1..]),
498 Some('-') => (false, &s[1..]),
499 _ => (true, s),
500 };
501
502 if !positive {
503 anyhow::bail!("Negative numbers not supported")
504 }
505
506 fn is_suffix_char(ch: char) -> bool {
507 "numkMGTPEKi".contains(ch)
508 }
509 let (num, suffix) = match s.find(is_suffix_char) {
510 None => (s, ""),
511 Some(idx) => s.split_at(idx),
512 };
513 let num: u64 = num.parse()?;
514 let (exponent, base10) = if let Some((_, exponent)) =
515 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
516 {
517 (exponent, true)
518 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
519 (exponent, false)
520 } else {
521 anyhow::bail!("Unrecognized suffix: {suffix}");
522 };
523 Ok(ScaledQuantity {
524 integral_part: num,
525 exponent: *exponent,
526 base10,
527 })
528}
529
530#[async_trait]
531impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
532 async fn fetch_service_metrics(
533 &self,
534 id: &str,
535 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
536 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
537 *info
538 } else {
539 tracing::error!("Failed to get info for {id}");
541 anyhow::bail!("Failed to get info for {id}");
542 };
543
544 let (result_tx, result_rx) = oneshot::channel();
545 self.send_command(WorkerCommand::FetchServiceMetrics {
546 name: self.service_name(id),
547 info,
548 result_tx,
549 });
550
551 let metrics = result_rx.await.expect("worker task not dropped");
552 Ok(metrics)
553 }
554
555 fn ensure_service(
556 &self,
557 id: &str,
558 ServiceConfig {
559 image,
560 init_container_image,
561 args,
562 ports: ports_in,
563 memory_limit,
564 memory_request,
565 cpu_limit,
566 cpu_request,
567 scale,
568 labels: labels_in,
569 annotations: annotations_in,
570 availability_zones,
571 other_replicas_selector,
572 replicas_selector,
573 disk_limit,
574 node_selector,
575 }: ServiceConfig,
576 ) -> Result<Box<dyn Service>, anyhow::Error> {
577 let scheduling_config: ServiceSchedulingConfig =
579 self.scheduling_config.read().expect("poisoned").clone();
580
581 let disk = disk_limit != Some(DiskLimit::ZERO);
583
584 let name = self.service_name(id);
585 let mut match_labels = btreemap! {
591 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
592 "environmentd.materialize.cloud/service-id".into() => id.into(),
593 };
594 for (key, value) in &self.config.service_labels {
595 match_labels.insert(key.clone(), value.clone());
596 }
597
598 let mut labels = match_labels.clone();
599 for (key, value) in labels_in {
600 labels.insert(self.make_label_key(&key), value);
601 }
602
603 labels.insert(self.make_label_key("scale"), scale.to_string());
604
605 for port in &ports_in {
606 labels.insert(
607 format!("environmentd.materialize.cloud/port-{}", port.name),
608 "true".into(),
609 );
610 }
611 let mut limits = BTreeMap::new();
612 let mut requests = BTreeMap::new();
613 if let Some(memory_limit) = memory_limit {
614 limits.insert(
615 "memory".into(),
616 Quantity(memory_limit.0.as_u64().to_string()),
617 );
618 requests.insert(
619 "memory".into(),
620 Quantity(memory_limit.0.as_u64().to_string()),
621 );
622 }
623 if let Some(memory_request) = memory_request {
624 requests.insert(
625 "memory".into(),
626 Quantity(memory_request.0.as_u64().to_string()),
627 );
628 }
629 if let Some(cpu_limit) = cpu_limit {
630 limits.insert(
631 "cpu".into(),
632 Quantity(format!("{}m", cpu_limit.as_millicpus())),
633 );
634 requests.insert(
635 "cpu".into(),
636 Quantity(format!("{}m", cpu_limit.as_millicpus())),
637 );
638 }
639 if let Some(cpu_request) = cpu_request {
640 requests.insert(
641 "cpu".into(),
642 Quantity(format!("{}m", cpu_request.as_millicpus())),
643 );
644 }
645 let service = K8sService {
646 metadata: ObjectMeta {
647 name: Some(name.clone()),
648 ..Default::default()
649 },
650 spec: Some(ServiceSpec {
651 ports: Some(
652 ports_in
653 .iter()
654 .map(|port| ServicePort {
655 port: port.port_hint.into(),
656 name: Some(port.name.clone()),
657 ..Default::default()
658 })
659 .collect(),
660 ),
661 cluster_ip: Some("None".to_string()),
662 selector: Some(match_labels.clone()),
663 ..Default::default()
664 }),
665 status: None,
666 };
667
668 let hosts = (0..scale.get())
669 .map(|i| {
670 format!(
671 "{name}-{i}.{name}.{}.svc.cluster.local",
672 self.kubernetes_namespace
673 )
674 })
675 .collect::<Vec<_>>();
676 let ports = ports_in
677 .iter()
678 .map(|p| (p.name.clone(), p.port_hint))
679 .collect::<BTreeMap<_, _>>();
680
681 let mut listen_addrs = BTreeMap::new();
682 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
683 for (name, port) in &ports {
684 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
685 for (i, host) in hosts.iter().enumerate() {
686 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
687 }
688 }
689 let mut args = args(ServiceAssignments {
690 listen_addrs: &listen_addrs,
691 peer_addrs: &peer_addrs,
692 });
693
694 let anti_affinity = Some({
703 let label_selector_requirements = other_replicas_selector
704 .clone()
705 .into_iter()
706 .map(|ls| self.label_selector_to_k8s(ls))
707 .collect::<Result<Vec<_>, _>>()?;
708 let ls = LabelSelector {
709 match_expressions: Some(label_selector_requirements),
710 ..Default::default()
711 };
712 let pat = PodAffinityTerm {
713 label_selector: Some(ls),
714 topology_key: "kubernetes.io/hostname".to_string(),
715 ..Default::default()
716 };
717
718 if !scheduling_config.soften_replication_anti_affinity {
719 PodAntiAffinity {
720 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
721 ..Default::default()
722 }
723 } else {
724 PodAntiAffinity {
725 preferred_during_scheduling_ignored_during_execution: Some(vec![
726 WeightedPodAffinityTerm {
727 weight: scheduling_config.soften_replication_anti_affinity_weight,
728 pod_affinity_term: pat,
729 },
730 ]),
731 ..Default::default()
732 }
733 }
734 });
735
736 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
737 let ls = LabelSelector {
739 match_labels: Some(match_labels.clone()),
740 ..Default::default()
741 };
742 let pat = PodAffinityTerm {
743 label_selector: Some(ls),
744 topology_key: "topology.kubernetes.io/zone".to_string(),
745 ..Default::default()
746 };
747
748 Some(PodAffinity {
749 preferred_during_scheduling_ignored_during_execution: Some(vec![
750 WeightedPodAffinityTerm {
751 weight,
752 pod_affinity_term: pat,
753 },
754 ]),
755 ..Default::default()
756 })
757 } else {
758 None
759 };
760
761 let topology_spread = if scheduling_config.topology_spread.enabled {
762 let config = &scheduling_config.topology_spread;
763
764 if !config.ignore_non_singular_scale || scale.get() == 1 {
765 let label_selector_requirements = (if config.ignore_non_singular_scale {
766 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
767
768 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
769 label_name: "scale".into(),
770 logic: mz_orchestrator::LabelSelectionLogic::Eq {
771 value: "1".to_string(),
772 },
773 });
774
775 replicas_selector_ignoring_scale
776 } else {
777 replicas_selector
778 })
779 .into_iter()
780 .map(|ls| self.label_selector_to_k8s(ls))
781 .collect::<Result<Vec<_>, _>>()?;
782 let ls = LabelSelector {
783 match_expressions: Some(label_selector_requirements),
784 ..Default::default()
785 };
786
787 let constraint = TopologySpreadConstraint {
788 label_selector: Some(ls),
789 min_domains: config.min_domains,
790 max_skew: config.max_skew,
791 topology_key: "topology.kubernetes.io/zone".to_string(),
792 when_unsatisfiable: if config.soft {
793 "ScheduleAnyway".to_string()
794 } else {
795 "DoNotSchedule".to_string()
796 },
797 match_label_keys: None,
807 ..Default::default()
810 };
811 Some(vec![constraint])
812 } else {
813 None
814 }
815 } else {
816 None
817 };
818
819 let mut pod_annotations = btreemap! {
820 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
826 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
827
828 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
830 };
831 for (key, value) in annotations_in {
832 pod_annotations.insert(self.make_label_key(&key), value);
834 }
835 if self.config.enable_prometheus_scrape_annotations {
836 if let Some(internal_http_port) = ports_in
837 .iter()
838 .find(|port| port.name == "internal-http")
839 .map(|port| port.port_hint.to_string())
840 {
841 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
843 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
844 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
845 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
846 }
847 }
848 for (key, value) in &self.config.service_annotations {
849 pod_annotations.insert(key.clone(), value.clone());
850 }
851
852 let default_node_selector = if disk {
853 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
854 } else {
855 vec![]
859 };
860
861 let node_selector: BTreeMap<String, String> = default_node_selector
862 .into_iter()
863 .chain(self.config.service_node_selector.clone())
864 .chain(node_selector)
865 .collect();
866
867 let node_affinity = if let Some(availability_zones) = availability_zones {
868 let selector = NodeSelectorTerm {
869 match_expressions: Some(vec![NodeSelectorRequirement {
870 key: "materialize.cloud/availability-zone".to_string(),
871 operator: "In".to_string(),
872 values: Some(availability_zones),
873 }]),
874 match_fields: None,
875 };
876
877 if scheduling_config.soften_az_affinity {
878 Some(NodeAffinity {
879 preferred_during_scheduling_ignored_during_execution: Some(vec![
880 PreferredSchedulingTerm {
881 preference: selector,
882 weight: scheduling_config.soften_az_affinity_weight,
883 },
884 ]),
885 required_during_scheduling_ignored_during_execution: None,
886 })
887 } else {
888 Some(NodeAffinity {
889 preferred_during_scheduling_ignored_during_execution: None,
890 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
891 node_selector_terms: vec![selector],
892 }),
893 })
894 }
895 } else {
896 None
897 };
898
899 let mut affinity = Affinity {
900 pod_anti_affinity: anti_affinity,
901 pod_affinity,
902 node_affinity,
903 ..Default::default()
904 };
905 if let Some(service_affinity) = &self.config.service_affinity {
906 affinity.merge_from(serde_json::from_str(service_affinity)?);
907 }
908
909 let container_name = image
910 .rsplit_once('/')
911 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
912 .context("`image` is not ORG/NAME:VERSION")?
913 .0
914 .to_string();
915
916 let container_security_context = if scheduling_config.security_context_enabled {
917 Some(SecurityContext {
918 privileged: Some(false),
919 run_as_non_root: Some(true),
920 allow_privilege_escalation: Some(false),
921 seccomp_profile: Some(SeccompProfile {
922 type_: "RuntimeDefault".to_string(),
923 ..Default::default()
924 }),
925 capabilities: Some(Capabilities {
926 drop: Some(vec!["ALL".to_string()]),
927 ..Default::default()
928 }),
929 ..Default::default()
930 })
931 } else {
932 None
933 };
934
935 let init_containers = init_container_image.map(|image| {
936 vec![Container {
937 name: "init".to_string(),
938 image: Some(image),
939 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
940 resources: Some(ResourceRequirements {
941 claims: None,
942 limits: Some(limits.clone()),
943 requests: Some(requests.clone()),
944 }),
945 security_context: container_security_context.clone(),
946 env: Some(vec![
947 EnvVar {
948 name: "MZ_NAMESPACE".to_string(),
949 value_from: Some(EnvVarSource {
950 field_ref: Some(ObjectFieldSelector {
951 field_path: "metadata.namespace".to_string(),
952 ..Default::default()
953 }),
954 ..Default::default()
955 }),
956 ..Default::default()
957 },
958 EnvVar {
959 name: "MZ_POD_NAME".to_string(),
960 value_from: Some(EnvVarSource {
961 field_ref: Some(ObjectFieldSelector {
962 field_path: "metadata.name".to_string(),
963 ..Default::default()
964 }),
965 ..Default::default()
966 }),
967 ..Default::default()
968 },
969 EnvVar {
970 name: "MZ_NODE_NAME".to_string(),
971 value_from: Some(EnvVarSource {
972 field_ref: Some(ObjectFieldSelector {
973 field_path: "spec.nodeName".to_string(),
974 ..Default::default()
975 }),
976 ..Default::default()
977 }),
978 ..Default::default()
979 },
980 ]),
981 ..Default::default()
982 }]
983 });
984
985 let env = if self.config.coverage {
986 Some(vec![EnvVar {
987 name: "LLVM_PROFILE_FILE".to_string(),
988 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
989 ..Default::default()
990 }])
991 } else {
992 None
993 };
994
995 let mut volume_mounts = vec![];
996
997 if self.config.coverage {
998 volume_mounts.push(VolumeMount {
999 name: "coverage".to_string(),
1000 mount_path: "/coverage".to_string(),
1001 ..Default::default()
1002 })
1003 }
1004
1005 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1006 (true, Some(ephemeral_volume_storage_class)) => {
1007 volume_mounts.push(VolumeMount {
1008 name: "scratch".to_string(),
1009 mount_path: "/scratch".to_string(),
1010 ..Default::default()
1011 });
1012 args.push("--scratch-directory=/scratch".into());
1013
1014 Some(vec![Volume {
1015 name: "scratch".to_string(),
1016 ephemeral: Some(EphemeralVolumeSource {
1017 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1018 spec: PersistentVolumeClaimSpec {
1019 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1020 storage_class_name: Some(
1021 ephemeral_volume_storage_class.to_string(),
1022 ),
1023 resources: Some(VolumeResourceRequirements {
1024 requests: Some(BTreeMap::from([(
1025 "storage".to_string(),
1026 Quantity(
1027 disk_limit
1028 .unwrap_or(DiskLimit::ARBITRARY)
1029 .0
1030 .as_u64()
1031 .to_string(),
1032 ),
1033 )])),
1034 ..Default::default()
1035 }),
1036 ..Default::default()
1037 },
1038 ..Default::default()
1039 }),
1040 ..Default::default()
1041 }),
1042 ..Default::default()
1043 }])
1044 }
1045 (true, None) => {
1046 return Err(anyhow!(
1047 "service requested disk but no ephemeral volume storage class was configured"
1048 ));
1049 }
1050 (false, _) => None,
1051 };
1052
1053 if let Some(name_prefix) = &self.config.name_prefix {
1054 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1055 }
1056
1057 let volume_claim_templates = if self.config.coverage {
1058 Some(vec![PersistentVolumeClaim {
1059 metadata: ObjectMeta {
1060 name: Some("coverage".to_string()),
1061 ..Default::default()
1062 },
1063 spec: Some(PersistentVolumeClaimSpec {
1064 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1065 resources: Some(VolumeResourceRequirements {
1066 requests: Some(BTreeMap::from([(
1067 "storage".to_string(),
1068 Quantity("10Gi".to_string()),
1069 )])),
1070 ..Default::default()
1071 }),
1072 ..Default::default()
1073 }),
1074 ..Default::default()
1075 }])
1076 } else {
1077 None
1078 };
1079
1080 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1081 Some(PodSecurityContext {
1082 fs_group: Some(fs_group),
1083 run_as_user: Some(fs_group),
1084 run_as_group: Some(fs_group),
1085 ..Default::default()
1086 })
1087 } else {
1088 None
1089 };
1090
1091 let mut tolerations = vec![
1092 Toleration {
1097 effect: Some("NoExecute".into()),
1098 key: Some("node.kubernetes.io/not-ready".into()),
1099 operator: Some("Exists".into()),
1100 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1101 value: None,
1102 },
1103 Toleration {
1104 effect: Some("NoExecute".into()),
1105 key: Some("node.kubernetes.io/unreachable".into()),
1106 operator: Some("Exists".into()),
1107 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1108 value: None,
1109 },
1110 ];
1111 if let Some(service_tolerations) = &self.config.service_tolerations {
1112 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1113 }
1114 let tolerations = Some(tolerations);
1115
1116 let mut pod_template_spec = PodTemplateSpec {
1117 metadata: Some(ObjectMeta {
1118 labels: Some(labels.clone()),
1119 ..Default::default()
1122 }),
1123 spec: Some(PodSpec {
1124 init_containers,
1125 containers: vec![Container {
1126 name: container_name,
1127 image: Some(image),
1128 args: Some(args),
1129 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1130 ports: Some(
1131 ports_in
1132 .iter()
1133 .map(|port| ContainerPort {
1134 container_port: port.port_hint.into(),
1135 name: Some(port.name.clone()),
1136 ..Default::default()
1137 })
1138 .collect(),
1139 ),
1140 security_context: container_security_context.clone(),
1141 resources: Some(ResourceRequirements {
1142 claims: None,
1143 limits: Some(limits),
1144 requests: Some(requests),
1145 }),
1146 volume_mounts: if !volume_mounts.is_empty() {
1147 Some(volume_mounts)
1148 } else {
1149 None
1150 },
1151 env,
1152 ..Default::default()
1153 }],
1154 volumes,
1155 security_context,
1156 node_selector: Some(node_selector),
1157 scheduler_name: self.config.scheduler_name.clone(),
1158 service_account: self.config.service_account.clone(),
1159 affinity: Some(affinity),
1160 topology_spread_constraints: topology_spread,
1161 tolerations,
1162 termination_grace_period_seconds: Some(0),
1184 ..Default::default()
1185 }),
1186 };
1187 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1188 let mut hasher = Sha256::new();
1189 hasher.update(pod_template_json);
1190 let pod_template_hash = format!("{:x}", hasher.finalize());
1191 pod_annotations.insert(
1192 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1193 pod_template_hash.clone(),
1194 );
1195
1196 pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1197
1198 let stateful_set = StatefulSet {
1199 metadata: ObjectMeta {
1200 name: Some(name.clone()),
1201 ..Default::default()
1202 },
1203 spec: Some(StatefulSetSpec {
1204 selector: LabelSelector {
1205 match_labels: Some(match_labels),
1206 ..Default::default()
1207 },
1208 service_name: Some(name.clone()),
1209 replicas: Some(scale.cast_into()),
1210 template: pod_template_spec,
1211 update_strategy: Some(StatefulSetUpdateStrategy {
1212 type_: Some("OnDelete".to_owned()),
1213 ..Default::default()
1214 }),
1215 pod_management_policy: Some("Parallel".to_string()),
1216 volume_claim_templates,
1217 ..Default::default()
1218 }),
1219 status: None,
1220 };
1221
1222 self.send_command(WorkerCommand::EnsureService {
1223 desc: ServiceDescription {
1224 name,
1225 scale,
1226 service,
1227 stateful_set,
1228 pod_template_hash,
1229 },
1230 });
1231
1232 self.service_infos
1233 .lock()
1234 .expect("poisoned lock")
1235 .insert(id.to_string(), ServiceInfo { scale });
1236
1237 Ok(Box::new(KubernetesService { hosts, ports }))
1238 }
1239
1240 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1242 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1243 self.service_infos.lock().expect("poisoned lock").remove(id);
1244
1245 self.send_command(WorkerCommand::DropService {
1246 name: self.service_name(id),
1247 });
1248
1249 Ok(())
1250 }
1251
1252 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1254 let (result_tx, result_rx) = oneshot::channel();
1255 self.send_command(WorkerCommand::ListServices {
1256 namespace: self.namespace.clone(),
1257 result_tx,
1258 });
1259
1260 let list = result_rx.await.expect("worker task not dropped");
1261 Ok(list)
1262 }
1263
1264 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1265 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1266 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1267 let service_id_label = "environmentd.materialize.cloud/service-id";
1268 let service_id = pod
1269 .labels()
1270 .get(service_id_label)
1271 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1272 .clone();
1273
1274 let oomed = pod
1275 .status
1276 .as_ref()
1277 .and_then(|status| status.container_statuses.as_ref())
1278 .map(|container_statuses| {
1279 container_statuses.iter().any(|cs| {
1280 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1284 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1285 let termination_state = current_state.or(last_state);
1286
1287 let exit_code = termination_state.map(|s| s.exit_code);
1294 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1295 })
1296 })
1297 .unwrap_or(false);
1298
1299 let (pod_ready, last_probe_time) = pod
1300 .status
1301 .and_then(|status| status.conditions)
1302 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1303 .map(|c| (c.status == "True", c.last_probe_time))
1304 .unwrap_or((false, None));
1305
1306 let status = if pod_ready {
1307 ServiceStatus::Online
1308 } else {
1309 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1310 };
1311 let time = if let Some(time) = last_probe_time {
1312 time.0
1313 } else {
1314 Timestamp::now()
1315 };
1316
1317 Ok(ServiceEvent {
1318 service_id,
1319 process_id,
1320 status,
1321 time: DateTime::from_timestamp_nanos(
1322 time.as_nanosecond().try_into().expect("must fit"),
1323 ),
1324 })
1325 }
1326
1327 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1328 .touched_objects()
1329 .filter_map(|object| async move {
1330 match object {
1331 Ok(pod) => Some(into_service_event(pod)),
1332 Err(error) => {
1333 tracing::warn!("service watch error: {error}");
1336 None
1337 }
1338 }
1339 });
1340 Box::pin(stream)
1341 }
1342
1343 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1344 *self.scheduling_config.write().expect("poisoned") = config;
1345 }
1346}
1347
1348impl OrchestratorWorker {
1349 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1350 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1351 }
1352
1353 async fn run(mut self) {
1354 {
1355 info!("initializing Kubernetes orchestrator worker");
1356 let start = Instant::now();
1357
1358 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1362 let orchestrator_pod = Retry::default()
1363 .clamp_backoff(Duration::from_secs(10))
1364 .retry_async(|_| self.pod_api.get(&hostname))
1365 .await
1366 .expect("always retries on error");
1367 self.owner_references
1368 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1369
1370 if !self.collect_pod_metrics {
1371 info!(
1372 "pod metrics collection is disabled; resource usage graphs in the console will not be available"
1373 );
1374 }
1375
1376 info!(
1377 "Kubernetes orchestrator worker initialized in {:?}",
1378 start.elapsed()
1379 );
1380 }
1381
1382 while let Some(cmd) = self.command_rx.recv().await {
1383 self.handle_command(cmd).await;
1384 }
1385 }
1386
1387 async fn handle_command(&self, cmd: WorkerCommand) {
1393 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1394 where
1395 F: Fn() -> U,
1396 U: Future<Output = Result<R, K8sError>>,
1397 {
1398 Retry::default()
1399 .clamp_backoff(Duration::from_secs(10))
1400 .retry_async(|_| {
1401 f().map_err(
1402 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1403 )
1404 })
1405 .await
1406 .expect("always retries on error")
1407 }
1408
1409 use WorkerCommand::*;
1410 match cmd {
1411 EnsureService { desc } => {
1412 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1413 }
1414 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1415 ListServices {
1416 namespace,
1417 result_tx,
1418 } => {
1419 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1420 let _ = result_tx.send(result);
1421 }
1422 FetchServiceMetrics {
1423 name,
1424 info,
1425 result_tx,
1426 } => {
1427 let result = self.fetch_service_metrics(&name, &info).await;
1428 let _ = result_tx.send(result);
1429 }
1430 }
1431 }
1432
1433 async fn fetch_service_metrics(
1434 &self,
1435 name: &str,
1436 info: &ServiceInfo,
1437 ) -> Vec<ServiceProcessMetrics> {
1438 if !self.collect_pod_metrics {
1439 return (0..info.scale.get())
1440 .map(|_| ServiceProcessMetrics::default())
1441 .collect();
1442 }
1443
1444 #[derive(Deserialize)]
1446 pub(crate) struct ClusterdUsage {
1447 disk_bytes: Option<u64>,
1448 memory_bytes: Option<u64>,
1449 swap_bytes: Option<u64>,
1450 heap_limit: Option<u64>,
1451 }
1452
1453 async fn get_metrics(
1459 self_: &OrchestratorWorker,
1460 service_name: &str,
1461 i: usize,
1462 ) -> ServiceProcessMetrics {
1463 let name = format!("{service_name}-{i}");
1464
1465 let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1466 let (metrics, clusterd_usage) =
1467 match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1468 {
1469 (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1470 (Ok(metrics), Err(e)) => {
1471 warn!("Failed to fetch clusterd usage for {name}: {e}");
1472 (metrics, None)
1473 }
1474 (Err(e), _) => {
1475 warn!("Failed to get metrics for {name}: {e}");
1476 return ServiceProcessMetrics::default();
1477 }
1478 };
1479 let Some(PodMetricsContainer {
1480 usage:
1481 PodMetricsContainerUsage {
1482 cpu: Quantity(cpu_str),
1483 memory: Quantity(mem_str),
1484 },
1485 ..
1486 }) = metrics.containers.get(0)
1487 else {
1488 warn!("metrics result contained no containers for {name}");
1489 return ServiceProcessMetrics::default();
1490 };
1491
1492 let mut process_metrics = ServiceProcessMetrics::default();
1493
1494 match parse_k8s_quantity(cpu_str) {
1495 Ok(q) => match q.try_to_integer(-9, true) {
1496 Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1497 None => error!("CPU value {q:?} out of range"),
1498 },
1499 Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1500 }
1501 match parse_k8s_quantity(mem_str) {
1502 Ok(q) => match q.try_to_integer(0, false) {
1503 Some(mem) => process_metrics.memory_bytes = Some(mem),
1504 None => error!("memory value {q:?} out of range"),
1505 },
1506 Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1507 }
1508
1509 if let Some(usage) = clusterd_usage {
1510 process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1516 (Some(disk), Some(swap)) => Some(disk + swap),
1517 (disk, swap) => disk.or(swap),
1518 };
1519
1520 process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1523 (Some(memory), Some(swap)) => Some(memory + swap),
1524 (Some(memory), None) => Some(memory),
1525 (None, _) => None,
1526 };
1527
1528 process_metrics.heap_limit = usage.heap_limit;
1529 }
1530
1531 process_metrics
1532 }
1533
1534 async fn get_clusterd_usage(
1540 self_: &OrchestratorWorker,
1541 service_name: &str,
1542 i: usize,
1543 ) -> anyhow::Result<ClusterdUsage> {
1544 let service = self_
1545 .service_api
1546 .get(service_name)
1547 .await
1548 .with_context(|| format!("failed to get service {service_name}"))?;
1549 let namespace = service
1550 .metadata
1551 .namespace
1552 .context("missing service namespace")?;
1553 let internal_http_port = service
1554 .spec
1555 .and_then(|spec| spec.ports)
1556 .and_then(|ports| {
1557 ports
1558 .into_iter()
1559 .find(|p| p.name == Some("internal-http".into()))
1560 })
1561 .map(|p| p.port);
1562 let Some(port) = internal_http_port else {
1563 bail!("internal-http port missing in service spec");
1564 };
1565 let metrics_url = format!(
1566 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1567 /api/usage-metrics"
1568 );
1569
1570 let http_client = reqwest::Client::builder()
1571 .timeout(Duration::from_secs(10))
1572 .build()
1573 .context("error building HTTP client")?;
1574 let resp = http_client.get(metrics_url).send().await?;
1575 let usage = resp.json().await?;
1576
1577 Ok(usage)
1578 }
1579
1580 let ret = futures::future::join_all(
1581 (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1582 );
1583
1584 ret.await
1585 }
1586
1587 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1588 desc.service
1594 .metadata
1595 .owner_references
1596 .get_or_insert(vec![])
1597 .extend(self.owner_references.iter().cloned());
1598 desc.stateful_set
1599 .metadata
1600 .owner_references
1601 .get_or_insert(vec![])
1602 .extend(self.owner_references.iter().cloned());
1603
1604 let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1605 let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1606 let pod_annotations = pod_metadata.annotations.clone();
1607
1608 self.service_api
1609 .patch(
1610 &desc.name,
1611 &PatchParams::apply(FIELD_MANAGER).force(),
1612 &Patch::Apply(desc.service),
1613 )
1614 .await?;
1615 self.stateful_set_api
1616 .patch(
1617 &desc.name,
1618 &PatchParams::apply(FIELD_MANAGER).force(),
1619 &Patch::Apply(desc.stateful_set),
1620 )
1621 .await?;
1622
1623 for pod_id in 0..desc.scale.get() {
1634 let pod_name = format!("{}-{pod_id}", desc.name);
1635 let pod = match self.pod_api.get(&pod_name).await {
1636 Ok(pod) => pod,
1637 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1639 Err(e) => return Err(e),
1640 };
1641
1642 let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1643 != Some(&desc.pod_template_hash)
1644 {
1645 self.pod_api
1646 .delete(&pod_name, &DeleteParams::default())
1647 .await
1648 .map(|_| ())
1649 } else {
1650 let metadata = ObjectMeta {
1651 annotations: pod_annotations.clone(),
1652 ..Default::default()
1653 }
1654 .into_request_partial::<Pod>();
1655 self.pod_api
1656 .patch_metadata(
1657 &pod_name,
1658 &PatchParams::apply(FIELD_MANAGER).force(),
1659 &Patch::Apply(&metadata),
1660 )
1661 .await
1662 .map(|_| ())
1663 };
1664
1665 match result {
1666 Ok(()) => (),
1667 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1669 Err(e) => return Err(e),
1670 }
1671 }
1672
1673 Ok(())
1674 }
1675
1676 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1677 let res = self
1678 .stateful_set_api
1679 .delete(name, &DeleteParams::default())
1680 .await;
1681 match res {
1682 Ok(_) => (),
1683 Err(K8sError::Api(e)) if e.code == 404 => (),
1684 Err(e) => return Err(e),
1685 }
1686
1687 let res = self
1688 .service_api
1689 .delete(name, &DeleteParams::default())
1690 .await;
1691 match res {
1692 Ok(_) => Ok(()),
1693 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1694 Err(e) => Err(e),
1695 }
1696 }
1697
1698 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1699 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1700 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1701 Ok(stateful_sets
1702 .into_iter()
1703 .filter_map(|ss| {
1704 ss.metadata
1705 .name
1706 .unwrap()
1707 .strip_prefix(&name_prefix)
1708 .map(Into::into)
1709 })
1710 .collect())
1711 }
1712}
1713
1714#[derive(Debug, Clone)]
1715struct KubernetesService {
1716 hosts: Vec<String>,
1717 ports: BTreeMap<String, u16>,
1718}
1719
1720impl Service for KubernetesService {
1721 fn addresses(&self, port: &str) -> Vec<String> {
1722 let port = self.ports[port];
1723 self.hosts
1724 .iter()
1725 .map(|host| format!("{host}:{port}"))
1726 .collect()
1727 }
1728}
1729
1730#[cfg(test)]
1731mod tests {
1732 use super::*;
1733
1734 #[mz_ore::test]
1735 fn k8s_quantity_base10_large() {
1736 let cases = &[
1737 ("42", 42),
1738 ("42k", 42000),
1739 ("42M", 42000000),
1740 ("42G", 42000000000),
1741 ("42T", 42000000000000),
1742 ("42P", 42000000000000000),
1743 ];
1744
1745 for (input, expected) in cases {
1746 let quantity = parse_k8s_quantity(input).unwrap();
1747 let number = quantity.try_to_integer(0, true).unwrap();
1748 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1749 }
1750 }
1751
1752 #[mz_ore::test]
1753 fn k8s_quantity_base10_small() {
1754 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1755
1756 for (input, expected) in cases {
1757 let quantity = parse_k8s_quantity(input).unwrap();
1758 let number = quantity.try_to_integer(-9, true).unwrap();
1759 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1760 }
1761 }
1762
1763 #[mz_ore::test]
1764 fn k8s_quantity_base2() {
1765 let cases = &[
1766 ("42Ki", 42 << 10),
1767 ("42Mi", 42 << 20),
1768 ("42Gi", 42 << 30),
1769 ("42Ti", 42 << 40),
1770 ("42Pi", 42 << 50),
1771 ];
1772
1773 for (input, expected) in cases {
1774 let quantity = parse_k8s_quantity(input).unwrap();
1775 let number = quantity.try_to_integer(0, false).unwrap();
1776 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1777 }
1778 }
1779}