1use std::collections::BTreeMap;
11use std::future::Future;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14use std::{env, fmt};
15
16use anyhow::{Context, anyhow, bail};
17use async_trait::async_trait;
18use chrono::Utc;
19use clap::ValueEnum;
20use cloud_resource_controller::KubernetesResourceReader;
21use futures::TryFutureExt;
22use futures::stream::{BoxStream, StreamExt};
23use k8s_openapi::DeepMerge;
24use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
25use k8s_openapi::api::core::v1::{
26 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
27 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
28 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
29 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
30 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
31 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
32 Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
33 WeightedPodAffinityTerm,
34};
35use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
36use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
37 LabelSelector, LabelSelectorRequirement, OwnerReference,
38};
39use kube::ResourceExt;
40use kube::api::{Api, DeleteParams, ObjectMeta, Patch, PatchParams};
41use kube::client::Client;
42use kube::error::Error as K8sError;
43use kube::runtime::{WatchStreamExt, watcher};
44use maplit::btreemap;
45use mz_cloud_resources::AwsExternalIdPrefix;
46use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
47use mz_orchestrator::{
48 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
49 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
50 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
51};
52use mz_ore::retry::Retry;
53use mz_ore::task::AbortOnDropHandle;
54use serde::Deserialize;
55use sha2::{Digest, Sha256};
56use tokio::sync::{mpsc, oneshot};
57use tracing::{info, warn};
58
59pub mod cloud_resource_controller;
60pub mod secrets;
61pub mod util;
62
63const FIELD_MANAGER: &str = "environmentd";
64const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
65
66const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
67
68#[derive(Debug, Clone)]
70pub struct KubernetesOrchestratorConfig {
71 pub context: String,
74 pub scheduler_name: Option<String>,
76 pub service_annotations: BTreeMap<String, String>,
78 pub service_labels: BTreeMap<String, String>,
80 pub service_node_selector: BTreeMap<String, String>,
82 pub service_affinity: Option<String>,
84 pub service_tolerations: Option<String>,
86 pub service_account: Option<String>,
88 pub image_pull_policy: KubernetesImagePullPolicy,
90 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
93 pub coverage: bool,
95 pub ephemeral_volume_storage_class: Option<String>,
101 pub service_fs_group: Option<i64>,
103 pub name_prefix: Option<String>,
105 pub collect_pod_metrics: bool,
107 pub enable_prometheus_scrape_annotations: bool,
109}
110
111impl KubernetesOrchestratorConfig {
112 pub fn name_prefix(&self) -> String {
113 self.name_prefix.clone().unwrap_or_default()
114 }
115}
116
117#[derive(ValueEnum, Debug, Clone, Copy)]
119pub enum KubernetesImagePullPolicy {
120 Always,
122 IfNotPresent,
124 Never,
126}
127
128impl fmt::Display for KubernetesImagePullPolicy {
129 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
130 match self {
131 KubernetesImagePullPolicy::Always => f.write_str("Always"),
132 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
133 KubernetesImagePullPolicy::Never => f.write_str("Never"),
134 }
135 }
136}
137
138impl KubernetesImagePullPolicy {
139 pub fn as_kebab_case_str(&self) -> &'static str {
140 match self {
141 Self::Always => "always",
142 Self::IfNotPresent => "if-not-present",
143 Self::Never => "never",
144 }
145 }
146}
147
148pub struct KubernetesOrchestrator {
150 client: Client,
151 kubernetes_namespace: String,
152 config: KubernetesOrchestratorConfig,
153 secret_api: Api<Secret>,
154 vpc_endpoint_api: Api<VpcEndpoint>,
155 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
156 resource_reader: Arc<KubernetesResourceReader>,
157}
158
159impl fmt::Debug for KubernetesOrchestrator {
160 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
161 f.debug_struct("KubernetesOrchestrator").finish()
162 }
163}
164
165impl KubernetesOrchestrator {
166 pub async fn new(
168 config: KubernetesOrchestratorConfig,
169 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
170 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
171 let resource_reader =
172 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
173 Ok(KubernetesOrchestrator {
174 client: client.clone(),
175 kubernetes_namespace,
176 config,
177 secret_api: Api::default_namespaced(client.clone()),
178 vpc_endpoint_api: Api::default_namespaced(client),
179 namespaces: Mutex::new(BTreeMap::new()),
180 resource_reader,
181 })
182 }
183}
184
185impl Orchestrator for KubernetesOrchestrator {
186 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
187 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
188 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
189 let (command_tx, command_rx) = mpsc::unbounded_channel();
190 let worker = OrchestratorWorker {
191 metrics_api: Api::default_namespaced(self.client.clone()),
192 service_api: Api::default_namespaced(self.client.clone()),
193 stateful_set_api: Api::default_namespaced(self.client.clone()),
194 pod_api: Api::default_namespaced(self.client.clone()),
195 owner_references: vec![],
196 command_rx,
197 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
198 collect_pod_metrics: self.config.collect_pod_metrics,
199 }
200 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
201
202 Arc::new(NamespacedKubernetesOrchestrator {
203 pod_api: Api::default_namespaced(self.client.clone()),
204 kubernetes_namespace: self.kubernetes_namespace.clone(),
205 namespace: namespace.into(),
206 config: self.config.clone(),
207 scheduling_config: Default::default(),
209 service_infos: std::sync::Mutex::new(BTreeMap::new()),
210 command_tx,
211 _worker: worker,
212 })
213 }))
214 }
215}
216
217#[derive(Clone, Copy)]
218struct ServiceInfo {
219 scale: u16,
220}
221
222struct NamespacedKubernetesOrchestrator {
223 pod_api: Api<Pod>,
224 kubernetes_namespace: String,
225 namespace: String,
226 config: KubernetesOrchestratorConfig,
227 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
228 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
229 command_tx: mpsc::UnboundedSender<WorkerCommand>,
230 _worker: AbortOnDropHandle<()>,
231}
232
233impl fmt::Debug for NamespacedKubernetesOrchestrator {
234 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235 f.debug_struct("NamespacedKubernetesOrchestrator")
236 .field("kubernetes_namespace", &self.kubernetes_namespace)
237 .field("namespace", &self.namespace)
238 .field("config", &self.config)
239 .finish()
240 }
241}
242
243enum WorkerCommand {
249 EnsureService {
250 desc: ServiceDescription,
251 },
252 DropService {
253 name: String,
254 },
255 ListServices {
256 namespace: String,
257 result_tx: oneshot::Sender<Vec<String>>,
258 },
259 FetchServiceMetrics {
260 name: String,
261 info: ServiceInfo,
262 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
263 },
264}
265
266#[derive(Debug, Clone)]
268struct ServiceDescription {
269 name: String,
270 scale: u16,
271 service: K8sService,
272 stateful_set: StatefulSet,
273 pod_template_hash: String,
274}
275
276struct OrchestratorWorker {
288 metrics_api: Api<PodMetrics>,
289 service_api: Api<K8sService>,
290 stateful_set_api: Api<StatefulSet>,
291 pod_api: Api<Pod>,
292 owner_references: Vec<OwnerReference>,
293 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
294 name_prefix: String,
295 collect_pod_metrics: bool,
296}
297
298#[derive(Deserialize, Clone, Debug)]
299pub struct PodMetricsContainer {
300 pub name: String,
301 pub usage: PodMetricsContainerUsage,
302}
303
304#[derive(Deserialize, Clone, Debug)]
305pub struct PodMetricsContainerUsage {
306 pub cpu: Quantity,
307 pub memory: Quantity,
308}
309
310#[derive(Deserialize, Clone, Debug)]
311pub struct PodMetrics {
312 pub metadata: ObjectMeta,
313 pub timestamp: String,
314 pub window: String,
315 pub containers: Vec<PodMetricsContainer>,
316}
317
318impl k8s_openapi::Resource for PodMetrics {
319 const GROUP: &'static str = "metrics.k8s.io";
320 const KIND: &'static str = "PodMetrics";
321 const VERSION: &'static str = "v1beta1";
322 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
323 const URL_PATH_SEGMENT: &'static str = "pods";
324
325 type Scope = k8s_openapi::NamespaceResourceScope;
326}
327
328impl k8s_openapi::Metadata for PodMetrics {
329 type Ty = ObjectMeta;
330
331 fn metadata(&self) -> &Self::Ty {
332 &self.metadata
333 }
334
335 fn metadata_mut(&mut self) -> &mut Self::Ty {
336 &mut self.metadata
337 }
338}
339
340#[derive(Deserialize, Clone, Debug)]
349pub struct MetricIdentifier {
350 #[serde(rename = "metricName")]
351 pub name: String,
352 }
354
355#[derive(Deserialize, Clone, Debug)]
356pub struct MetricValue {
357 #[serde(rename = "describedObject")]
358 pub described_object: ObjectReference,
359 #[serde(flatten)]
360 pub metric_identifier: MetricIdentifier,
361 pub timestamp: String,
362 pub value: Quantity,
363 }
365
366impl NamespacedKubernetesOrchestrator {
367 fn service_name(&self, id: &str) -> String {
368 format!(
369 "{}{}-{id}",
370 self.config.name_prefix.as_deref().unwrap_or(""),
371 self.namespace
372 )
373 }
374
375 fn watch_pod_params(&self) -> watcher::Config {
378 let ns_selector = format!(
379 "environmentd.materialize.cloud/namespace={}",
380 self.namespace
381 );
382 watcher::Config::default().timeout(59).labels(&ns_selector)
384 }
385
386 fn make_label_key(&self, key: &str) -> String {
389 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
390 }
391
392 fn label_selector_to_k8s(
393 &self,
394 MzLabelSelector { label_name, logic }: MzLabelSelector,
395 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
396 let (operator, values) = match logic {
397 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
398 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
399 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
400 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
401 LabelSelectionLogic::InSet { values } => {
402 if values.is_empty() {
403 Err(anyhow!(
404 "Invalid selector logic for {label_name}: empty `in` set"
405 ))
406 } else {
407 Ok(("In", values))
408 }
409 }
410 LabelSelectionLogic::NotInSet { values } => {
411 if values.is_empty() {
412 Err(anyhow!(
413 "Invalid selector logic for {label_name}: empty `notin` set"
414 ))
415 } else {
416 Ok(("NotIn", values))
417 }
418 }
419 }?;
420 let lsr = LabelSelectorRequirement {
421 key: self.make_label_key(&label_name),
422 operator: operator.to_string(),
423 values: Some(values),
424 };
425 Ok(lsr)
426 }
427
428 fn send_command(&self, cmd: WorkerCommand) {
429 self.command_tx.send(cmd).expect("worker task not dropped");
430 }
431}
432
433#[derive(Debug)]
434struct ScaledQuantity {
435 integral_part: u64,
436 exponent: i8,
437 base10: bool,
438}
439
440impl ScaledQuantity {
441 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
442 if base10 != self.base10 {
443 return None;
444 }
445 let exponent = self.exponent - scale;
446 let mut result = self.integral_part;
447 let base = if self.base10 { 10 } else { 2 };
448 if exponent < 0 {
449 for _ in exponent..0 {
450 result /= base;
451 }
452 } else {
453 for _ in 0..exponent {
454 result = result.checked_mul(base)?;
455 }
456 }
457 Some(result)
458 }
459}
460
461fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
471 const DEC_SUFFIXES: &[(&str, i8)] = &[
472 ("n", -9),
473 ("u", -6),
474 ("m", -3),
475 ("", 0),
476 ("k", 3), ("M", 6),
478 ("G", 9),
479 ("T", 12),
480 ("P", 15),
481 ("E", 18),
482 ];
483 const BIN_SUFFIXES: &[(&str, i8)] = &[
484 ("", 0),
485 ("Ki", 10),
486 ("Mi", 20),
487 ("Gi", 30),
488 ("Ti", 40),
489 ("Pi", 50),
490 ("Ei", 60),
491 ];
492
493 let (positive, s) = match s.chars().next() {
494 Some('+') => (true, &s[1..]),
495 Some('-') => (false, &s[1..]),
496 _ => (true, s),
497 };
498
499 if !positive {
500 anyhow::bail!("Negative numbers not supported")
501 }
502
503 fn is_suffix_char(ch: char) -> bool {
504 "numkMGTPEKi".contains(ch)
505 }
506 let (num, suffix) = match s.find(is_suffix_char) {
507 None => (s, ""),
508 Some(idx) => s.split_at(idx),
509 };
510 let num: u64 = num.parse()?;
511 let (exponent, base10) = if let Some((_, exponent)) =
512 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
513 {
514 (exponent, true)
515 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
516 (exponent, false)
517 } else {
518 anyhow::bail!("Unrecognized suffix: {suffix}");
519 };
520 Ok(ScaledQuantity {
521 integral_part: num,
522 exponent: *exponent,
523 base10,
524 })
525}
526
527#[async_trait]
528impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
529 async fn fetch_service_metrics(
530 &self,
531 id: &str,
532 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
533 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
534 *info
535 } else {
536 tracing::error!("Failed to get info for {id}");
538 anyhow::bail!("Failed to get info for {id}");
539 };
540
541 let (result_tx, result_rx) = oneshot::channel();
542 self.send_command(WorkerCommand::FetchServiceMetrics {
543 name: self.service_name(id),
544 info,
545 result_tx,
546 });
547
548 let metrics = result_rx.await.expect("worker task not dropped");
549 Ok(metrics)
550 }
551
552 fn ensure_service(
553 &self,
554 id: &str,
555 ServiceConfig {
556 image,
557 init_container_image,
558 args,
559 ports: ports_in,
560 memory_limit,
561 memory_request,
562 cpu_limit,
563 scale,
564 labels: labels_in,
565 annotations: annotations_in,
566 availability_zones,
567 other_replicas_selector,
568 replicas_selector,
569 disk_limit,
570 node_selector,
571 }: ServiceConfig,
572 ) -> Result<Box<dyn Service>, anyhow::Error> {
573 let scheduling_config: ServiceSchedulingConfig =
575 self.scheduling_config.read().expect("poisoned").clone();
576
577 let disk = disk_limit != Some(DiskLimit::ZERO);
579
580 let name = self.service_name(id);
581 let mut match_labels = btreemap! {
587 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
588 "environmentd.materialize.cloud/service-id".into() => id.into(),
589 };
590 for (key, value) in &self.config.service_labels {
591 match_labels.insert(key.clone(), value.clone());
592 }
593
594 let mut labels = match_labels.clone();
595 for (key, value) in labels_in {
596 labels.insert(self.make_label_key(&key), value);
597 }
598
599 labels.insert(self.make_label_key("scale"), scale.to_string());
600
601 for port in &ports_in {
602 labels.insert(
603 format!("environmentd.materialize.cloud/port-{}", port.name),
604 "true".into(),
605 );
606 }
607 let mut limits = BTreeMap::new();
608 let mut requests = BTreeMap::new();
609 if let Some(memory_limit) = memory_limit {
610 limits.insert(
611 "memory".into(),
612 Quantity(memory_limit.0.as_u64().to_string()),
613 );
614 requests.insert(
615 "memory".into(),
616 Quantity(memory_limit.0.as_u64().to_string()),
617 );
618 }
619 if let Some(memory_request) = memory_request {
620 requests.insert(
621 "memory".into(),
622 Quantity(memory_request.0.as_u64().to_string()),
623 );
624 }
625 if let Some(cpu_limit) = cpu_limit {
626 limits.insert(
627 "cpu".into(),
628 Quantity(format!("{}m", cpu_limit.as_millicpus())),
629 );
630 requests.insert(
631 "cpu".into(),
632 Quantity(format!("{}m", cpu_limit.as_millicpus())),
633 );
634 }
635 let service = K8sService {
636 metadata: ObjectMeta {
637 name: Some(name.clone()),
638 ..Default::default()
639 },
640 spec: Some(ServiceSpec {
641 ports: Some(
642 ports_in
643 .iter()
644 .map(|port| ServicePort {
645 port: port.port_hint.into(),
646 name: Some(port.name.clone()),
647 ..Default::default()
648 })
649 .collect(),
650 ),
651 cluster_ip: Some("None".to_string()),
652 selector: Some(match_labels.clone()),
653 ..Default::default()
654 }),
655 status: None,
656 };
657
658 let hosts = (0..scale)
659 .map(|i| {
660 format!(
661 "{name}-{i}.{name}.{}.svc.cluster.local",
662 self.kubernetes_namespace
663 )
664 })
665 .collect::<Vec<_>>();
666 let ports = ports_in
667 .iter()
668 .map(|p| (p.name.clone(), p.port_hint))
669 .collect::<BTreeMap<_, _>>();
670
671 let mut listen_addrs = BTreeMap::new();
672 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
673 for (name, port) in &ports {
674 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
675 for (i, host) in hosts.iter().enumerate() {
676 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
677 }
678 }
679 let mut args = args(ServiceAssignments {
680 listen_addrs: &listen_addrs,
681 peer_addrs: &peer_addrs,
682 });
683
684 let anti_affinity = Some({
693 let label_selector_requirements = other_replicas_selector
694 .clone()
695 .into_iter()
696 .map(|ls| self.label_selector_to_k8s(ls))
697 .collect::<Result<Vec<_>, _>>()?;
698 let ls = LabelSelector {
699 match_expressions: Some(label_selector_requirements),
700 ..Default::default()
701 };
702 let pat = PodAffinityTerm {
703 label_selector: Some(ls),
704 topology_key: "kubernetes.io/hostname".to_string(),
705 ..Default::default()
706 };
707
708 if !scheduling_config.soften_replication_anti_affinity {
709 PodAntiAffinity {
710 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
711 ..Default::default()
712 }
713 } else {
714 PodAntiAffinity {
715 preferred_during_scheduling_ignored_during_execution: Some(vec![
716 WeightedPodAffinityTerm {
717 weight: scheduling_config.soften_replication_anti_affinity_weight,
718 pod_affinity_term: pat,
719 },
720 ]),
721 ..Default::default()
722 }
723 }
724 });
725
726 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
727 let ls = LabelSelector {
729 match_labels: Some(match_labels.clone()),
730 ..Default::default()
731 };
732 let pat = PodAffinityTerm {
733 label_selector: Some(ls),
734 topology_key: "topology.kubernetes.io/zone".to_string(),
735 ..Default::default()
736 };
737
738 Some(PodAffinity {
739 preferred_during_scheduling_ignored_during_execution: Some(vec![
740 WeightedPodAffinityTerm {
741 weight,
742 pod_affinity_term: pat,
743 },
744 ]),
745 ..Default::default()
746 })
747 } else {
748 None
749 };
750
751 let topology_spread = if scheduling_config.topology_spread.enabled {
752 let config = &scheduling_config.topology_spread;
753
754 if !config.ignore_non_singular_scale || scale <= 1 {
755 let label_selector_requirements = (if config.ignore_non_singular_scale {
756 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
757
758 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
759 label_name: "scale".into(),
760 logic: mz_orchestrator::LabelSelectionLogic::Eq {
761 value: "1".to_string(),
762 },
763 });
764
765 replicas_selector_ignoring_scale
766 } else {
767 replicas_selector
768 })
769 .into_iter()
770 .map(|ls| self.label_selector_to_k8s(ls))
771 .collect::<Result<Vec<_>, _>>()?;
772 let ls = LabelSelector {
773 match_expressions: Some(label_selector_requirements),
774 ..Default::default()
775 };
776
777 let constraint = TopologySpreadConstraint {
778 label_selector: Some(ls),
779 min_domains: config.min_domains,
780 max_skew: config.max_skew,
781 topology_key: "topology.kubernetes.io/zone".to_string(),
782 when_unsatisfiable: if config.soft {
783 "ScheduleAnyway".to_string()
784 } else {
785 "DoNotSchedule".to_string()
786 },
787 match_label_keys: None,
797 ..Default::default()
800 };
801 Some(vec![constraint])
802 } else {
803 None
804 }
805 } else {
806 None
807 };
808
809 let mut pod_annotations = btreemap! {
810 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
816 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
817
818 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
820 };
821 for (key, value) in annotations_in {
822 pod_annotations.insert(self.make_label_key(&key), value);
824 }
825 if self.config.enable_prometheus_scrape_annotations {
826 if let Some(internal_http_port) = ports_in
827 .iter()
828 .find(|port| port.name == "internal-http")
829 .map(|port| port.port_hint.to_string())
830 {
831 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
833 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
834 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
835 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
836 }
837 }
838 for (key, value) in &self.config.service_annotations {
839 pod_annotations.insert(key.clone(), value.clone());
840 }
841
842 let default_node_selector = if disk {
843 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
844 } else {
845 vec![]
849 };
850
851 let node_selector: BTreeMap<String, String> = default_node_selector
852 .into_iter()
853 .chain(self.config.service_node_selector.clone())
854 .chain(node_selector)
855 .collect();
856
857 let node_affinity = if let Some(availability_zones) = availability_zones {
858 let selector = NodeSelectorTerm {
859 match_expressions: Some(vec![NodeSelectorRequirement {
860 key: "materialize.cloud/availability-zone".to_string(),
861 operator: "In".to_string(),
862 values: Some(availability_zones),
863 }]),
864 match_fields: None,
865 };
866
867 if scheduling_config.soften_az_affinity {
868 Some(NodeAffinity {
869 preferred_during_scheduling_ignored_during_execution: Some(vec![
870 PreferredSchedulingTerm {
871 preference: selector,
872 weight: scheduling_config.soften_az_affinity_weight,
873 },
874 ]),
875 required_during_scheduling_ignored_during_execution: None,
876 })
877 } else {
878 Some(NodeAffinity {
879 preferred_during_scheduling_ignored_during_execution: None,
880 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
881 node_selector_terms: vec![selector],
882 }),
883 })
884 }
885 } else {
886 None
887 };
888
889 let mut affinity = Affinity {
890 pod_anti_affinity: anti_affinity,
891 pod_affinity,
892 node_affinity,
893 ..Default::default()
894 };
895 if let Some(service_affinity) = &self.config.service_affinity {
896 affinity.merge_from(serde_json::from_str(service_affinity)?);
897 }
898
899 let container_name = image
900 .rsplit_once('/')
901 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
902 .context("`image` is not ORG/NAME:VERSION")?
903 .0
904 .to_string();
905
906 let container_security_context = if scheduling_config.security_context_enabled {
907 Some(SecurityContext {
908 privileged: Some(false),
909 run_as_non_root: Some(true),
910 allow_privilege_escalation: Some(false),
911 seccomp_profile: Some(SeccompProfile {
912 type_: "RuntimeDefault".to_string(),
913 ..Default::default()
914 }),
915 capabilities: Some(Capabilities {
916 drop: Some(vec!["ALL".to_string()]),
917 ..Default::default()
918 }),
919 ..Default::default()
920 })
921 } else {
922 None
923 };
924
925 let init_containers = init_container_image.map(|image| {
926 vec![Container {
927 name: "init".to_string(),
928 image: Some(image),
929 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
930 resources: Some(ResourceRequirements {
931 claims: None,
932 limits: Some(limits.clone()),
933 requests: Some(requests.clone()),
934 }),
935 security_context: container_security_context.clone(),
936 env: Some(vec![
937 EnvVar {
938 name: "MZ_NAMESPACE".to_string(),
939 value_from: Some(EnvVarSource {
940 field_ref: Some(ObjectFieldSelector {
941 field_path: "metadata.namespace".to_string(),
942 ..Default::default()
943 }),
944 ..Default::default()
945 }),
946 ..Default::default()
947 },
948 EnvVar {
949 name: "MZ_POD_NAME".to_string(),
950 value_from: Some(EnvVarSource {
951 field_ref: Some(ObjectFieldSelector {
952 field_path: "metadata.name".to_string(),
953 ..Default::default()
954 }),
955 ..Default::default()
956 }),
957 ..Default::default()
958 },
959 EnvVar {
960 name: "MZ_NODE_NAME".to_string(),
961 value_from: Some(EnvVarSource {
962 field_ref: Some(ObjectFieldSelector {
963 field_path: "spec.nodeName".to_string(),
964 ..Default::default()
965 }),
966 ..Default::default()
967 }),
968 ..Default::default()
969 },
970 ]),
971 ..Default::default()
972 }]
973 });
974
975 let env = if self.config.coverage {
976 Some(vec![EnvVar {
977 name: "LLVM_PROFILE_FILE".to_string(),
978 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
979 ..Default::default()
980 }])
981 } else {
982 None
983 };
984
985 let mut volume_mounts = vec![];
986
987 if self.config.coverage {
988 volume_mounts.push(VolumeMount {
989 name: "coverage".to_string(),
990 mount_path: "/coverage".to_string(),
991 ..Default::default()
992 })
993 }
994
995 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
996 (true, Some(ephemeral_volume_storage_class)) => {
997 volume_mounts.push(VolumeMount {
998 name: "scratch".to_string(),
999 mount_path: "/scratch".to_string(),
1000 ..Default::default()
1001 });
1002 args.push("--scratch-directory=/scratch".into());
1003
1004 Some(vec![Volume {
1005 name: "scratch".to_string(),
1006 ephemeral: Some(EphemeralVolumeSource {
1007 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1008 spec: PersistentVolumeClaimSpec {
1009 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1010 storage_class_name: Some(
1011 ephemeral_volume_storage_class.to_string(),
1012 ),
1013 resources: Some(VolumeResourceRequirements {
1014 requests: Some(BTreeMap::from([(
1015 "storage".to_string(),
1016 Quantity(
1017 disk_limit
1018 .unwrap_or(DiskLimit::ARBITRARY)
1019 .0
1020 .as_u64()
1021 .to_string(),
1022 ),
1023 )])),
1024 ..Default::default()
1025 }),
1026 ..Default::default()
1027 },
1028 ..Default::default()
1029 }),
1030 ..Default::default()
1031 }),
1032 ..Default::default()
1033 }])
1034 }
1035 (true, None) => {
1036 return Err(anyhow!(
1037 "service requested disk but no ephemeral volume storage class was configured"
1038 ));
1039 }
1040 (false, _) => None,
1041 };
1042
1043 if let Some(name_prefix) = &self.config.name_prefix {
1044 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1045 }
1046
1047 let volume_claim_templates = if self.config.coverage {
1048 Some(vec![PersistentVolumeClaim {
1049 metadata: ObjectMeta {
1050 name: Some("coverage".to_string()),
1051 ..Default::default()
1052 },
1053 spec: Some(PersistentVolumeClaimSpec {
1054 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1055 resources: Some(VolumeResourceRequirements {
1056 requests: Some(BTreeMap::from([(
1057 "storage".to_string(),
1058 Quantity("10Gi".to_string()),
1059 )])),
1060 ..Default::default()
1061 }),
1062 ..Default::default()
1063 }),
1064 ..Default::default()
1065 }])
1066 } else {
1067 None
1068 };
1069
1070 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1071 Some(PodSecurityContext {
1072 fs_group: Some(fs_group),
1073 run_as_user: Some(fs_group),
1074 run_as_group: Some(fs_group),
1075 ..Default::default()
1076 })
1077 } else {
1078 None
1079 };
1080
1081 let mut tolerations = vec![
1082 Toleration {
1087 effect: Some("NoExecute".into()),
1088 key: Some("node.kubernetes.io/not-ready".into()),
1089 operator: Some("Exists".into()),
1090 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1091 value: None,
1092 },
1093 Toleration {
1094 effect: Some("NoExecute".into()),
1095 key: Some("node.kubernetes.io/unreachable".into()),
1096 operator: Some("Exists".into()),
1097 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1098 value: None,
1099 },
1100 ];
1101 if let Some(service_tolerations) = &self.config.service_tolerations {
1102 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1103 }
1104 let tolerations = Some(tolerations);
1105
1106 let mut pod_template_spec = PodTemplateSpec {
1107 metadata: Some(ObjectMeta {
1108 labels: Some(labels.clone()),
1109 annotations: Some(pod_annotations), ..Default::default()
1111 }),
1112 spec: Some(PodSpec {
1113 init_containers,
1114 containers: vec![Container {
1115 name: container_name,
1116 image: Some(image),
1117 args: Some(args),
1118 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1119 ports: Some(
1120 ports_in
1121 .iter()
1122 .map(|port| ContainerPort {
1123 container_port: port.port_hint.into(),
1124 name: Some(port.name.clone()),
1125 ..Default::default()
1126 })
1127 .collect(),
1128 ),
1129 security_context: container_security_context.clone(),
1130 resources: Some(ResourceRequirements {
1131 claims: None,
1132 limits: Some(limits),
1133 requests: Some(requests),
1134 }),
1135 volume_mounts: if !volume_mounts.is_empty() {
1136 Some(volume_mounts)
1137 } else {
1138 None
1139 },
1140 env,
1141 ..Default::default()
1142 }],
1143 volumes,
1144 security_context,
1145 node_selector: Some(node_selector),
1146 scheduler_name: self.config.scheduler_name.clone(),
1147 service_account: self.config.service_account.clone(),
1148 affinity: Some(affinity),
1149 topology_spread_constraints: topology_spread,
1150 tolerations,
1151 termination_grace_period_seconds: Some(0),
1173 ..Default::default()
1174 }),
1175 };
1176 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1177 let mut hasher = Sha256::new();
1178 hasher.update(pod_template_json);
1179 let pod_template_hash = format!("{:x}", hasher.finalize());
1180 pod_template_spec
1181 .metadata
1182 .as_mut()
1183 .unwrap()
1184 .annotations
1185 .as_mut()
1186 .unwrap()
1187 .insert(
1188 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1189 pod_template_hash.clone(),
1190 );
1191
1192 let stateful_set = StatefulSet {
1193 metadata: ObjectMeta {
1194 name: Some(name.clone()),
1195 ..Default::default()
1196 },
1197 spec: Some(StatefulSetSpec {
1198 selector: LabelSelector {
1199 match_labels: Some(match_labels),
1200 ..Default::default()
1201 },
1202 service_name: Some(name.clone()),
1203 replicas: Some(scale.into()),
1204 template: pod_template_spec,
1205 pod_management_policy: Some("Parallel".to_string()),
1206 volume_claim_templates,
1207 ..Default::default()
1208 }),
1209 status: None,
1210 };
1211
1212 self.send_command(WorkerCommand::EnsureService {
1213 desc: ServiceDescription {
1214 name,
1215 scale,
1216 service,
1217 stateful_set,
1218 pod_template_hash,
1219 },
1220 });
1221
1222 self.service_infos
1223 .lock()
1224 .expect("poisoned lock")
1225 .insert(id.to_string(), ServiceInfo { scale });
1226
1227 Ok(Box::new(KubernetesService { hosts, ports }))
1228 }
1229
1230 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1232 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1233 self.service_infos.lock().expect("poisoned lock").remove(id);
1234
1235 self.send_command(WorkerCommand::DropService {
1236 name: self.service_name(id),
1237 });
1238
1239 Ok(())
1240 }
1241
1242 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1244 let (result_tx, result_rx) = oneshot::channel();
1245 self.send_command(WorkerCommand::ListServices {
1246 namespace: self.namespace.clone(),
1247 result_tx,
1248 });
1249
1250 let list = result_rx.await.expect("worker task not dropped");
1251 Ok(list)
1252 }
1253
1254 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1255 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1256 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1257 let service_id_label = "environmentd.materialize.cloud/service-id";
1258 let service_id = pod
1259 .labels()
1260 .get(service_id_label)
1261 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1262 .clone();
1263
1264 let oomed = pod
1265 .status
1266 .as_ref()
1267 .and_then(|status| status.container_statuses.as_ref())
1268 .map(|container_statuses| {
1269 container_statuses.iter().any(|cs| {
1270 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1274 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1275 let termination_state = current_state.or(last_state);
1276
1277 let exit_code = termination_state.map(|s| s.exit_code);
1284 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1285 })
1286 })
1287 .unwrap_or(false);
1288
1289 let (pod_ready, last_probe_time) = pod
1290 .status
1291 .and_then(|status| status.conditions)
1292 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1293 .map(|c| (c.status == "True", c.last_probe_time))
1294 .unwrap_or((false, None));
1295
1296 let status = if pod_ready {
1297 ServiceStatus::Online
1298 } else {
1299 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1300 };
1301 let time = if let Some(time) = last_probe_time {
1302 time.0
1303 } else {
1304 Utc::now()
1305 };
1306
1307 Ok(ServiceEvent {
1308 service_id,
1309 process_id,
1310 status,
1311 time,
1312 })
1313 }
1314
1315 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1316 .touched_objects()
1317 .filter_map(|object| async move {
1318 match object {
1319 Ok(pod) => Some(into_service_event(pod)),
1320 Err(error) => {
1321 tracing::warn!("service watch error: {error}");
1324 None
1325 }
1326 }
1327 });
1328 Box::pin(stream)
1329 }
1330
1331 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1332 *self.scheduling_config.write().expect("poisoned") = config;
1333 }
1334}
1335
1336impl OrchestratorWorker {
1337 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1338 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1339 }
1340
1341 async fn run(mut self) {
1342 {
1343 info!("initializing Kubernetes orchestrator worker");
1344 let start = Instant::now();
1345
1346 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1350 let orchestrator_pod = Retry::default()
1351 .clamp_backoff(Duration::from_secs(10))
1352 .retry_async(|_| self.pod_api.get(&hostname))
1353 .await
1354 .expect("always retries on error");
1355 self.owner_references
1356 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1357
1358 info!(
1359 "Kubernetes orchestrator worker initialized in {:?}",
1360 start.elapsed()
1361 );
1362 }
1363
1364 while let Some(cmd) = self.command_rx.recv().await {
1365 self.handle_command(cmd).await;
1366 }
1367 }
1368
1369 async fn handle_command(&self, cmd: WorkerCommand) {
1375 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1376 where
1377 F: Fn() -> U,
1378 U: Future<Output = Result<R, K8sError>>,
1379 {
1380 Retry::default()
1381 .clamp_backoff(Duration::from_secs(10))
1382 .retry_async(|_| {
1383 f().map_err(
1384 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1385 )
1386 })
1387 .await
1388 .expect("always retries on error")
1389 }
1390
1391 use WorkerCommand::*;
1392 match cmd {
1393 EnsureService { desc } => {
1394 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1395 }
1396 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1397 ListServices {
1398 namespace,
1399 result_tx,
1400 } => {
1401 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1402 let _ = result_tx.send(result);
1403 }
1404 FetchServiceMetrics {
1405 name,
1406 info,
1407 result_tx,
1408 } => {
1409 let result = self.fetch_service_metrics(&name, &info).await;
1410 let _ = result_tx.send(result);
1411 }
1412 }
1413 }
1414
1415 async fn fetch_service_metrics(
1416 &self,
1417 name: &str,
1418 info: &ServiceInfo,
1419 ) -> Vec<ServiceProcessMetrics> {
1420 if !self.collect_pod_metrics {
1421 return (0..info.scale)
1422 .map(|_| ServiceProcessMetrics::default())
1423 .collect();
1424 }
1425
1426 async fn get_metrics(
1432 self_: &OrchestratorWorker,
1433 service_name: &str,
1434 i: usize,
1435 ) -> ServiceProcessMetrics {
1436 let name = format!("{service_name}-{i}");
1437
1438 let disk_usage_fut = get_disk_usage(self_, service_name, i);
1439 let (metrics, disk_usage) =
1440 match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await {
1441 (Ok(metrics), Ok(disk_usage)) => (metrics, disk_usage),
1442 (Ok(metrics), Err(e)) => {
1443 warn!("Failed to fetch disk usage for {name}: {e}");
1444 (metrics, None)
1445 }
1446 (Err(e), _) => {
1447 warn!("Failed to get metrics for {name}: {e}");
1448 return ServiceProcessMetrics::default();
1449 }
1450 };
1451 let Some(PodMetricsContainer {
1452 usage:
1453 PodMetricsContainerUsage {
1454 cpu: Quantity(cpu_str),
1455 memory: Quantity(mem_str),
1456 },
1457 ..
1458 }) = metrics.containers.get(0)
1459 else {
1460 warn!("metrics result contained no containers for {name}");
1461 return ServiceProcessMetrics::default();
1462 };
1463
1464 let cpu = match parse_k8s_quantity(cpu_str) {
1465 Ok(q) => match q.try_to_integer(-9, true) {
1466 Some(i) => Some(i),
1467 None => {
1468 tracing::error!("CPU value {q:? }out of range");
1469 None
1470 }
1471 },
1472 Err(e) => {
1473 tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1474 None
1475 }
1476 };
1477 let memory = match parse_k8s_quantity(mem_str) {
1478 Ok(q) => match q.try_to_integer(0, false) {
1479 Some(i) => Some(i),
1480 None => {
1481 tracing::error!("Memory value {q:?} out of range");
1482 None
1483 }
1484 },
1485 Err(e) => {
1486 tracing::error!("Failed to parse memory value {mem_str}: {e}");
1487 None
1488 }
1489 };
1490
1491 ServiceProcessMetrics {
1492 cpu_nano_cores: cpu,
1493 memory_bytes: memory,
1494 disk_usage_bytes: disk_usage,
1495 }
1496 }
1497
1498 async fn get_disk_usage(
1504 self_: &OrchestratorWorker,
1505 service_name: &str,
1506 i: usize,
1507 ) -> anyhow::Result<Option<u64>> {
1508 #[derive(Deserialize)]
1509 pub(crate) struct Usage {
1510 disk_bytes: Option<u64>,
1511 swap_bytes: Option<u64>,
1512 }
1513
1514 let service = self_
1515 .service_api
1516 .get(service_name)
1517 .await
1518 .with_context(|| format!("failed to get service {service_name}"))?;
1519 let namespace = service
1520 .metadata
1521 .namespace
1522 .context("missing service namespace")?;
1523 let internal_http_port = service
1524 .spec
1525 .and_then(|spec| spec.ports)
1526 .and_then(|ports| {
1527 ports
1528 .into_iter()
1529 .find(|p| p.name == Some("internal-http".into()))
1530 })
1531 .map(|p| p.port);
1532 let Some(port) = internal_http_port else {
1533 bail!("internal-http port missing in service spec");
1534 };
1535 let metrics_url = format!(
1536 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1537 /api/usage-metrics"
1538 );
1539
1540 let http_client = reqwest::Client::builder()
1541 .timeout(Duration::from_secs(10))
1542 .build()
1543 .context("error building HTTP client")?;
1544 let resp = http_client.get(metrics_url).send().await?;
1545 let Usage {
1546 disk_bytes,
1547 swap_bytes,
1548 } = resp.json().await?;
1549
1550 let bytes = if let (Some(disk), Some(swap)) = (disk_bytes, swap_bytes) {
1551 Some(disk + swap)
1552 } else {
1553 disk_bytes.or(swap_bytes)
1554 };
1555 Ok(bytes)
1556 }
1557
1558 let ret =
1559 futures::future::join_all((0..info.scale).map(|i| get_metrics(self, name, i.into())));
1560
1561 ret.await
1562 }
1563
1564 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1565 desc.service
1571 .metadata
1572 .owner_references
1573 .get_or_insert(vec![])
1574 .extend(self.owner_references.iter().cloned());
1575 desc.stateful_set
1576 .metadata
1577 .owner_references
1578 .get_or_insert(vec![])
1579 .extend(self.owner_references.iter().cloned());
1580
1581 self.service_api
1582 .patch(
1583 &desc.name,
1584 &PatchParams::apply(FIELD_MANAGER).force(),
1585 &Patch::Apply(desc.service),
1586 )
1587 .await?;
1588 self.stateful_set_api
1589 .patch(
1590 &desc.name,
1591 &PatchParams::apply(FIELD_MANAGER).force(),
1592 &Patch::Apply(desc.stateful_set),
1593 )
1594 .await?;
1595
1596 for pod_id in 0..desc.scale {
1601 let pod_name = format!("{}-{pod_id}", desc.name);
1602 let pod = match self.pod_api.get(&pod_name).await {
1603 Ok(pod) => pod,
1604 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1606 Err(e) => return Err(e),
1607 };
1608 if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION) != Some(&desc.pod_template_hash)
1609 {
1610 match self
1611 .pod_api
1612 .delete(&pod_name, &DeleteParams::default())
1613 .await
1614 {
1615 Ok(_) => (),
1616 Err(kube::Error::Api(e)) if e.code == 404 => (),
1618 Err(e) => return Err(e),
1619 }
1620 }
1621 }
1622
1623 Ok(())
1624 }
1625
1626 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1627 let res = self
1628 .stateful_set_api
1629 .delete(name, &DeleteParams::default())
1630 .await;
1631 match res {
1632 Ok(_) => (),
1633 Err(K8sError::Api(e)) if e.code == 404 => (),
1634 Err(e) => return Err(e),
1635 }
1636
1637 let res = self
1638 .service_api
1639 .delete(name, &DeleteParams::default())
1640 .await;
1641 match res {
1642 Ok(_) => Ok(()),
1643 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1644 Err(e) => Err(e),
1645 }
1646 }
1647
1648 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1649 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1650 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1651 Ok(stateful_sets
1652 .into_iter()
1653 .filter_map(|ss| {
1654 ss.metadata
1655 .name
1656 .unwrap()
1657 .strip_prefix(&name_prefix)
1658 .map(Into::into)
1659 })
1660 .collect())
1661 }
1662}
1663
1664#[derive(Debug, Clone)]
1665struct KubernetesService {
1666 hosts: Vec<String>,
1667 ports: BTreeMap<String, u16>,
1668}
1669
1670impl Service for KubernetesService {
1671 fn addresses(&self, port: &str) -> Vec<String> {
1672 let port = self.ports[port];
1673 self.hosts
1674 .iter()
1675 .map(|host| format!("{host}:{port}"))
1676 .collect()
1677 }
1678}
1679
1680#[cfg(test)]
1681mod tests {
1682 use super::*;
1683
1684 #[mz_ore::test]
1685 fn k8s_quantity_base10_large() {
1686 let cases = &[
1687 ("42", 42),
1688 ("42k", 42000),
1689 ("42M", 42000000),
1690 ("42G", 42000000000),
1691 ("42T", 42000000000000),
1692 ("42P", 42000000000000000),
1693 ];
1694
1695 for (input, expected) in cases {
1696 let quantity = parse_k8s_quantity(input).unwrap();
1697 let number = quantity.try_to_integer(0, true).unwrap();
1698 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1699 }
1700 }
1701
1702 #[mz_ore::test]
1703 fn k8s_quantity_base10_small() {
1704 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1705
1706 for (input, expected) in cases {
1707 let quantity = parse_k8s_quantity(input).unwrap();
1708 let number = quantity.try_to_integer(-9, true).unwrap();
1709 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1710 }
1711 }
1712
1713 #[mz_ore::test]
1714 fn k8s_quantity_base2() {
1715 let cases = &[
1716 ("42Ki", 42 << 10),
1717 ("42Mi", 42 << 20),
1718 ("42Gi", 42 << 30),
1719 ("42Ti", 42 << 40),
1720 ("42Pi", 42 << 50),
1721 ];
1722
1723 for (input, expected) in cases {
1724 let quantity = parse_k8s_quantity(input).unwrap();
1725 let number = quantity.try_to_integer(0, false).unwrap();
1726 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1727 }
1728 }
1729}