1use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZero;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use std::{env, fmt};
16
17use anyhow::{Context, anyhow, bail};
18use async_trait::async_trait;
19use chrono::DateTime;
20use clap::ValueEnum;
21use cloud_resource_controller::KubernetesResourceReader;
22use futures::TryFutureExt;
23use futures::stream::{BoxStream, StreamExt};
24use k8s_openapi::DeepMerge;
25use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy};
26use k8s_openapi::api::core::v1::{
27 Affinity, Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, EphemeralVolumeSource,
28 NodeAffinity, NodeSelector, NodeSelectorRequirement, NodeSelectorTerm, ObjectFieldSelector,
29 ObjectReference, PersistentVolumeClaim, PersistentVolumeClaimSpec,
30 PersistentVolumeClaimTemplate, Pod, PodAffinity, PodAffinityTerm, PodAntiAffinity,
31 PodSecurityContext, PodSpec, PodTemplateSpec, PreferredSchedulingTerm, ResourceRequirements,
32 SeccompProfile, Secret, SecurityContext, Service as K8sService, ServicePort, ServiceSpec,
33 Sysctl, Toleration, TopologySpreadConstraint, Volume, VolumeMount, VolumeResourceRequirements,
34 WeightedPodAffinityTerm,
35};
36use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
37use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
38 LabelSelector, LabelSelectorRequirement, OwnerReference,
39};
40use k8s_openapi::jiff::Timestamp;
41use kube::ResourceExt;
42use kube::api::{Api, DeleteParams, ObjectMeta, PartialObjectMetaExt, Patch, PatchParams};
43use kube::client::Client;
44use kube::error::Error as K8sError;
45use kube::runtime::{WatchStreamExt, watcher};
46use maplit::btreemap;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_cloud_resources::crd::vpc_endpoint::v1::VpcEndpoint;
49use mz_orchestrator::{
50 DiskLimit, LabelSelectionLogic, LabelSelector as MzLabelSelector, NamespacedOrchestrator,
51 OfflineReason, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
52 ServiceProcessMetrics, ServiceStatus, scheduling_config::*,
53};
54use mz_ore::cast::CastInto;
55use mz_ore::retry::Retry;
56use mz_ore::task::AbortOnDropHandle;
57use serde::Deserialize;
58use sha2::{Digest, Sha256};
59use tokio::sync::{mpsc, oneshot};
60use tracing::{error, info, warn};
61
62pub mod cloud_resource_controller;
63pub mod secrets;
64pub mod util;
65
66const FIELD_MANAGER: &str = "environmentd";
67const NODE_FAILURE_THRESHOLD_SECONDS: i64 = 30;
68
69const POD_TEMPLATE_HASH_ANNOTATION: &str = "environmentd.materialize.cloud/pod-template-hash";
70
71#[derive(Debug, Clone)]
73pub struct KubernetesOrchestratorConfig {
74 pub context: String,
77 pub scheduler_name: Option<String>,
79 pub service_annotations: BTreeMap<String, String>,
81 pub service_labels: BTreeMap<String, String>,
83 pub service_node_selector: BTreeMap<String, String>,
85 pub service_affinity: Option<String>,
87 pub service_tolerations: Option<String>,
89 pub service_account: Option<String>,
91 pub image_pull_policy: KubernetesImagePullPolicy,
93 pub aws_external_id_prefix: Option<AwsExternalIdPrefix>,
96 pub coverage: bool,
98 pub ephemeral_volume_storage_class: Option<String>,
104 pub service_fs_group: Option<i64>,
106 pub name_prefix: Option<String>,
108 pub collect_pod_metrics: bool,
110 pub enable_prometheus_scrape_annotations: bool,
112}
113
114impl KubernetesOrchestratorConfig {
115 pub fn name_prefix(&self) -> String {
116 self.name_prefix.clone().unwrap_or_default()
117 }
118}
119
120#[derive(ValueEnum, Debug, Clone, Copy)]
122pub enum KubernetesImagePullPolicy {
123 Always,
125 IfNotPresent,
127 Never,
129}
130
131impl fmt::Display for KubernetesImagePullPolicy {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 match self {
134 KubernetesImagePullPolicy::Always => f.write_str("Always"),
135 KubernetesImagePullPolicy::IfNotPresent => f.write_str("IfNotPresent"),
136 KubernetesImagePullPolicy::Never => f.write_str("Never"),
137 }
138 }
139}
140
141impl KubernetesImagePullPolicy {
142 pub fn as_kebab_case_str(&self) -> &'static str {
143 match self {
144 Self::Always => "always",
145 Self::IfNotPresent => "if-not-present",
146 Self::Never => "never",
147 }
148 }
149}
150
151pub struct KubernetesOrchestrator {
153 client: Client,
154 kubernetes_namespace: String,
155 config: KubernetesOrchestratorConfig,
156 secret_api: Api<Secret>,
157 vpc_endpoint_api: Api<VpcEndpoint>,
158 namespaces: Mutex<BTreeMap<String, Arc<dyn NamespacedOrchestrator>>>,
159 resource_reader: Arc<KubernetesResourceReader>,
160}
161
162impl fmt::Debug for KubernetesOrchestrator {
163 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
164 f.debug_struct("KubernetesOrchestrator").finish()
165 }
166}
167
168impl KubernetesOrchestrator {
169 pub async fn new(
171 config: KubernetesOrchestratorConfig,
172 ) -> Result<KubernetesOrchestrator, anyhow::Error> {
173 let (client, kubernetes_namespace) = util::create_client(config.context.clone()).await?;
174 let resource_reader =
175 Arc::new(KubernetesResourceReader::new(config.context.clone()).await?);
176 Ok(KubernetesOrchestrator {
177 client: client.clone(),
178 kubernetes_namespace,
179 config,
180 secret_api: Api::default_namespaced(client.clone()),
181 vpc_endpoint_api: Api::default_namespaced(client),
182 namespaces: Mutex::new(BTreeMap::new()),
183 resource_reader,
184 })
185 }
186}
187
188impl Orchestrator for KubernetesOrchestrator {
189 fn namespace(&self, namespace: &str) -> Arc<dyn NamespacedOrchestrator> {
190 let mut namespaces = self.namespaces.lock().expect("lock poisoned");
191 Arc::clone(namespaces.entry(namespace.into()).or_insert_with(|| {
192 let (command_tx, command_rx) = mpsc::unbounded_channel();
193 let worker = OrchestratorWorker {
194 metrics_api: Api::default_namespaced(self.client.clone()),
195 service_api: Api::default_namespaced(self.client.clone()),
196 stateful_set_api: Api::default_namespaced(self.client.clone()),
197 pod_api: Api::default_namespaced(self.client.clone()),
198 owner_references: vec![],
199 command_rx,
200 name_prefix: self.config.name_prefix.clone().unwrap_or_default(),
201 collect_pod_metrics: self.config.collect_pod_metrics,
202 }
203 .spawn(format!("kubernetes-orchestrator-worker:{namespace}"));
204
205 Arc::new(NamespacedKubernetesOrchestrator {
206 pod_api: Api::default_namespaced(self.client.clone()),
207 kubernetes_namespace: self.kubernetes_namespace.clone(),
208 namespace: namespace.into(),
209 config: self.config.clone(),
210 scheduling_config: Default::default(),
212 service_infos: std::sync::Mutex::new(BTreeMap::new()),
213 command_tx,
214 _worker: worker,
215 })
216 }))
217 }
218}
219
220#[derive(Clone, Copy)]
221struct ServiceInfo {
222 scale: NonZero<u16>,
223}
224
225struct NamespacedKubernetesOrchestrator {
226 pod_api: Api<Pod>,
227 kubernetes_namespace: String,
228 namespace: String,
229 config: KubernetesOrchestratorConfig,
230 scheduling_config: std::sync::RwLock<ServiceSchedulingConfig>,
231 service_infos: std::sync::Mutex<BTreeMap<String, ServiceInfo>>,
232 command_tx: mpsc::UnboundedSender<WorkerCommand>,
233 _worker: AbortOnDropHandle<()>,
234}
235
236impl fmt::Debug for NamespacedKubernetesOrchestrator {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 f.debug_struct("NamespacedKubernetesOrchestrator")
239 .field("kubernetes_namespace", &self.kubernetes_namespace)
240 .field("namespace", &self.namespace)
241 .field("config", &self.config)
242 .finish()
243 }
244}
245
246enum WorkerCommand {
252 EnsureService {
253 desc: ServiceDescription,
254 },
255 DropService {
256 name: String,
257 },
258 ListServices {
259 namespace: String,
260 result_tx: oneshot::Sender<Vec<String>>,
261 },
262 FetchServiceMetrics {
263 name: String,
264 info: ServiceInfo,
265 result_tx: oneshot::Sender<Vec<ServiceProcessMetrics>>,
266 },
267}
268
269#[derive(Debug, Clone)]
271struct ServiceDescription {
272 name: String,
273 scale: NonZero<u16>,
274 service: K8sService,
275 stateful_set: StatefulSet,
276 pod_template_hash: String,
277}
278
279struct OrchestratorWorker {
291 metrics_api: Api<PodMetrics>,
292 service_api: Api<K8sService>,
293 stateful_set_api: Api<StatefulSet>,
294 pod_api: Api<Pod>,
295 owner_references: Vec<OwnerReference>,
296 command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
297 name_prefix: String,
298 collect_pod_metrics: bool,
299}
300
301#[derive(Deserialize, Clone, Debug)]
302pub struct PodMetricsContainer {
303 pub name: String,
304 pub usage: PodMetricsContainerUsage,
305}
306
307#[derive(Deserialize, Clone, Debug)]
308pub struct PodMetricsContainerUsage {
309 pub cpu: Quantity,
310 pub memory: Quantity,
311}
312
313#[derive(Deserialize, Clone, Debug)]
314pub struct PodMetrics {
315 pub metadata: ObjectMeta,
316 pub timestamp: String,
317 pub window: String,
318 pub containers: Vec<PodMetricsContainer>,
319}
320
321impl k8s_openapi::Resource for PodMetrics {
322 const GROUP: &'static str = "metrics.k8s.io";
323 const KIND: &'static str = "PodMetrics";
324 const VERSION: &'static str = "v1beta1";
325 const API_VERSION: &'static str = "metrics.k8s.io/v1beta1";
326 const URL_PATH_SEGMENT: &'static str = "pods";
327
328 type Scope = k8s_openapi::NamespaceResourceScope;
329}
330
331impl k8s_openapi::Metadata for PodMetrics {
332 type Ty = ObjectMeta;
333
334 fn metadata(&self) -> &Self::Ty {
335 &self.metadata
336 }
337
338 fn metadata_mut(&mut self) -> &mut Self::Ty {
339 &mut self.metadata
340 }
341}
342
343#[derive(Deserialize, Clone, Debug)]
352pub struct MetricIdentifier {
353 #[serde(rename = "metricName")]
354 pub name: String,
355 }
357
358#[derive(Deserialize, Clone, Debug)]
359pub struct MetricValue {
360 #[serde(rename = "describedObject")]
361 pub described_object: ObjectReference,
362 #[serde(flatten)]
363 pub metric_identifier: MetricIdentifier,
364 pub timestamp: String,
365 pub value: Quantity,
366 }
368
369impl NamespacedKubernetesOrchestrator {
370 fn service_name(&self, id: &str) -> String {
371 format!(
372 "{}{}-{id}",
373 self.config.name_prefix.as_deref().unwrap_or(""),
374 self.namespace
375 )
376 }
377
378 fn watch_pod_params(&self) -> watcher::Config {
381 let ns_selector = format!(
382 "environmentd.materialize.cloud/namespace={}",
383 self.namespace
384 );
385 watcher::Config::default().timeout(59).labels(&ns_selector)
387 }
388
389 fn make_label_key(&self, key: &str) -> String {
392 format!("{}.environmentd.materialize.cloud/{}", self.namespace, key)
393 }
394
395 fn label_selector_to_k8s(
396 &self,
397 MzLabelSelector { label_name, logic }: MzLabelSelector,
398 ) -> Result<LabelSelectorRequirement, anyhow::Error> {
399 let (operator, values) = match logic {
400 LabelSelectionLogic::Eq { value } => Ok(("In", vec![value])),
401 LabelSelectionLogic::NotEq { value } => Ok(("NotIn", vec![value])),
402 LabelSelectionLogic::Exists => Ok(("Exists", vec![])),
403 LabelSelectionLogic::NotExists => Ok(("DoesNotExist", vec![])),
404 LabelSelectionLogic::InSet { values } => {
405 if values.is_empty() {
406 Err(anyhow!(
407 "Invalid selector logic for {label_name}: empty `in` set"
408 ))
409 } else {
410 Ok(("In", values))
411 }
412 }
413 LabelSelectionLogic::NotInSet { values } => {
414 if values.is_empty() {
415 Err(anyhow!(
416 "Invalid selector logic for {label_name}: empty `notin` set"
417 ))
418 } else {
419 Ok(("NotIn", values))
420 }
421 }
422 }?;
423 let lsr = LabelSelectorRequirement {
424 key: self.make_label_key(&label_name),
425 operator: operator.to_string(),
426 values: Some(values),
427 };
428 Ok(lsr)
429 }
430
431 fn send_command(&self, cmd: WorkerCommand) {
432 self.command_tx.send(cmd).expect("worker task not dropped");
433 }
434}
435
436#[derive(Debug)]
437struct ScaledQuantity {
438 integral_part: u64,
439 exponent: i8,
440 base10: bool,
441}
442
443impl ScaledQuantity {
444 pub fn try_to_integer(&self, scale: i8, base10: bool) -> Option<u64> {
445 if base10 != self.base10 {
446 return None;
447 }
448 let exponent = self.exponent - scale;
449 let mut result = self.integral_part;
450 let base = if self.base10 { 10 } else { 2 };
451 if exponent < 0 {
452 for _ in exponent..0 {
453 result /= base;
454 }
455 } else {
456 for _ in 0..exponent {
457 result = result.checked_mul(base)?;
458 }
459 }
460 Some(result)
461 }
462}
463
464fn parse_k8s_quantity(s: &str) -> Result<ScaledQuantity, anyhow::Error> {
474 const DEC_SUFFIXES: &[(&str, i8)] = &[
475 ("n", -9),
476 ("u", -6),
477 ("m", -3),
478 ("", 0),
479 ("k", 3), ("M", 6),
481 ("G", 9),
482 ("T", 12),
483 ("P", 15),
484 ("E", 18),
485 ];
486 const BIN_SUFFIXES: &[(&str, i8)] = &[
487 ("", 0),
488 ("Ki", 10),
489 ("Mi", 20),
490 ("Gi", 30),
491 ("Ti", 40),
492 ("Pi", 50),
493 ("Ei", 60),
494 ];
495
496 let (positive, s) = match s.chars().next() {
497 Some('+') => (true, &s[1..]),
498 Some('-') => (false, &s[1..]),
499 _ => (true, s),
500 };
501
502 if !positive {
503 anyhow::bail!("Negative numbers not supported")
504 }
505
506 fn is_suffix_char(ch: char) -> bool {
507 "numkMGTPEKi".contains(ch)
508 }
509 let (num, suffix) = match s.find(is_suffix_char) {
510 None => (s, ""),
511 Some(idx) => s.split_at(idx),
512 };
513 let num: u64 = num.parse()?;
514 let (exponent, base10) = if let Some((_, exponent)) =
515 DEC_SUFFIXES.iter().find(|(target, _)| suffix == *target)
516 {
517 (exponent, true)
518 } else if let Some((_, exponent)) = BIN_SUFFIXES.iter().find(|(target, _)| suffix == *target) {
519 (exponent, false)
520 } else {
521 anyhow::bail!("Unrecognized suffix: {suffix}");
522 };
523 Ok(ScaledQuantity {
524 integral_part: num,
525 exponent: *exponent,
526 base10,
527 })
528}
529
530#[async_trait]
531impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
532 async fn fetch_service_metrics(
533 &self,
534 id: &str,
535 ) -> Result<Vec<ServiceProcessMetrics>, anyhow::Error> {
536 let info = if let Some(info) = self.service_infos.lock().expect("poisoned lock").get(id) {
537 *info
538 } else {
539 tracing::error!("Failed to get info for {id}");
541 anyhow::bail!("Failed to get info for {id}");
542 };
543
544 let (result_tx, result_rx) = oneshot::channel();
545 self.send_command(WorkerCommand::FetchServiceMetrics {
546 name: self.service_name(id),
547 info,
548 result_tx,
549 });
550
551 let metrics = result_rx.await.expect("worker task not dropped");
552 Ok(metrics)
553 }
554
555 fn ensure_service(
556 &self,
557 id: &str,
558 ServiceConfig {
559 image,
560 init_container_image,
561 args,
562 ports: ports_in,
563 memory_limit,
564 memory_request,
565 cpu_limit,
566 cpu_request,
567 scale,
568 labels: labels_in,
569 annotations: annotations_in,
570 availability_zones,
571 other_replicas_selector,
572 replicas_selector,
573 disk_limit,
574 node_selector,
575 }: ServiceConfig,
576 ) -> Result<Box<dyn Service>, anyhow::Error> {
577 let scheduling_config: ServiceSchedulingConfig =
579 self.scheduling_config.read().expect("poisoned").clone();
580
581 let disk = disk_limit != Some(DiskLimit::ZERO);
583
584 let name = self.service_name(id);
585 let mut match_labels = btreemap! {
591 "environmentd.materialize.cloud/namespace".into() => self.namespace.clone(),
592 "environmentd.materialize.cloud/service-id".into() => id.into(),
593 };
594 for (key, value) in &self.config.service_labels {
595 match_labels.insert(key.clone(), value.clone());
596 }
597
598 let mut labels = match_labels.clone();
599 for (key, value) in labels_in {
600 labels.insert(self.make_label_key(&key), value);
601 }
602
603 labels.insert(self.make_label_key("scale"), scale.to_string());
604
605 for port in &ports_in {
606 labels.insert(
607 format!("environmentd.materialize.cloud/port-{}", port.name),
608 "true".into(),
609 );
610 }
611 let mut limits = BTreeMap::new();
612 let mut requests = BTreeMap::new();
613 if let Some(memory_limit) = memory_limit {
614 limits.insert(
615 "memory".into(),
616 Quantity(memory_limit.0.as_u64().to_string()),
617 );
618 requests.insert(
619 "memory".into(),
620 Quantity(memory_limit.0.as_u64().to_string()),
621 );
622 }
623 if let Some(memory_request) = memory_request {
624 requests.insert(
625 "memory".into(),
626 Quantity(memory_request.0.as_u64().to_string()),
627 );
628 }
629 if let Some(cpu_limit) = cpu_limit {
630 limits.insert(
631 "cpu".into(),
632 Quantity(format!("{}m", cpu_limit.as_millicpus())),
633 );
634 requests.insert(
635 "cpu".into(),
636 Quantity(format!("{}m", cpu_limit.as_millicpus())),
637 );
638 }
639 if let Some(cpu_request) = cpu_request {
640 requests.insert(
641 "cpu".into(),
642 Quantity(format!("{}m", cpu_request.as_millicpus())),
643 );
644 }
645 let service = K8sService {
646 metadata: ObjectMeta {
647 name: Some(name.clone()),
648 ..Default::default()
649 },
650 spec: Some(ServiceSpec {
651 ports: Some(
652 ports_in
653 .iter()
654 .map(|port| ServicePort {
655 port: port.port_hint.into(),
656 name: Some(port.name.clone()),
657 ..Default::default()
658 })
659 .collect(),
660 ),
661 cluster_ip: Some("None".to_string()),
662 selector: Some(match_labels.clone()),
663 ..Default::default()
664 }),
665 status: None,
666 };
667
668 let hosts = (0..scale.get())
669 .map(|i| {
670 format!(
671 "{name}-{i}.{name}.{}.svc.cluster.local",
672 self.kubernetes_namespace
673 )
674 })
675 .collect::<Vec<_>>();
676 let ports = ports_in
677 .iter()
678 .map(|p| (p.name.clone(), p.port_hint))
679 .collect::<BTreeMap<_, _>>();
680
681 let mut listen_addrs = BTreeMap::new();
682 let mut peer_addrs = vec![BTreeMap::new(); hosts.len()];
683 for (name, port) in &ports {
684 listen_addrs.insert(name.clone(), format!("0.0.0.0:{port}"));
685 for (i, host) in hosts.iter().enumerate() {
686 peer_addrs[i].insert(name.clone(), format!("{host}:{port}"));
687 }
688 }
689 let mut args = args(ServiceAssignments {
690 listen_addrs: &listen_addrs,
691 peer_addrs: &peer_addrs,
692 });
693
694 let anti_affinity = Some({
703 let label_selector_requirements = other_replicas_selector
704 .clone()
705 .into_iter()
706 .map(|ls| self.label_selector_to_k8s(ls))
707 .collect::<Result<Vec<_>, _>>()?;
708 let ls = LabelSelector {
709 match_expressions: Some(label_selector_requirements),
710 ..Default::default()
711 };
712 let pat = PodAffinityTerm {
713 label_selector: Some(ls),
714 topology_key: "kubernetes.io/hostname".to_string(),
715 ..Default::default()
716 };
717
718 if !scheduling_config.soften_replication_anti_affinity {
719 PodAntiAffinity {
720 required_during_scheduling_ignored_during_execution: Some(vec![pat]),
721 ..Default::default()
722 }
723 } else {
724 PodAntiAffinity {
725 preferred_during_scheduling_ignored_during_execution: Some(vec![
726 WeightedPodAffinityTerm {
727 weight: scheduling_config.soften_replication_anti_affinity_weight,
728 pod_affinity_term: pat,
729 },
730 ]),
731 ..Default::default()
732 }
733 }
734 });
735
736 let pod_affinity = if let Some(weight) = scheduling_config.multi_pod_az_affinity_weight {
737 let ls = LabelSelector {
739 match_labels: Some(match_labels.clone()),
740 ..Default::default()
741 };
742 let pat = PodAffinityTerm {
743 label_selector: Some(ls),
744 topology_key: "topology.kubernetes.io/zone".to_string(),
745 ..Default::default()
746 };
747
748 Some(PodAffinity {
749 preferred_during_scheduling_ignored_during_execution: Some(vec![
750 WeightedPodAffinityTerm {
751 weight,
752 pod_affinity_term: pat,
753 },
754 ]),
755 ..Default::default()
756 })
757 } else {
758 None
759 };
760
761 let topology_spread = if scheduling_config.topology_spread.enabled {
762 let config = &scheduling_config.topology_spread;
763
764 if !config.ignore_non_singular_scale || scale.get() == 1 {
765 let label_selector_requirements = (if config.ignore_non_singular_scale {
766 let mut replicas_selector_ignoring_scale = replicas_selector.clone();
767
768 replicas_selector_ignoring_scale.push(mz_orchestrator::LabelSelector {
769 label_name: "scale".into(),
770 logic: mz_orchestrator::LabelSelectionLogic::Eq {
771 value: "1".to_string(),
772 },
773 });
774
775 replicas_selector_ignoring_scale
776 } else {
777 replicas_selector
778 })
779 .into_iter()
780 .map(|ls| self.label_selector_to_k8s(ls))
781 .collect::<Result<Vec<_>, _>>()?;
782 let ls = LabelSelector {
783 match_expressions: Some(label_selector_requirements),
784 ..Default::default()
785 };
786
787 if config.soft && config.min_domains.is_some() {
788 warn!(
789 "topology spread is soft but min_domains is set; \
790 Kubernetes rejects minDomains with ScheduleAnyway, \
791 so min_domains will be ignored"
792 );
793 }
794 if availability_zones.is_some() && config.min_domains.is_some() {
795 warn!(
796 "topology spread has min_domains set but availability_zones \
797 constrains eligible topology domains via node affinity; \
798 minDomains will be ignored to avoid preventing pod scheduling"
799 );
800 }
801
802 let constraint = TopologySpreadConstraint {
803 label_selector: Some(ls),
804 min_domains: topology_spread_min_domains(
805 config.soft,
806 availability_zones.is_some(),
807 config.min_domains,
808 ),
809 max_skew: config.max_skew,
810 topology_key: "topology.kubernetes.io/zone".to_string(),
811 when_unsatisfiable: if config.soft {
812 "ScheduleAnyway".to_string()
813 } else {
814 "DoNotSchedule".to_string()
815 },
816 match_label_keys: None,
826 ..Default::default()
829 };
830 Some(vec![constraint])
831 } else {
832 None
833 }
834 } else {
835 None
836 };
837
838 let mut pod_annotations = btreemap! {
839 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
845 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
846
847 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
849 };
850 for (key, value) in annotations_in {
851 pod_annotations.insert(self.make_label_key(&key), value);
853 }
854 if self.config.enable_prometheus_scrape_annotations {
855 if let Some(internal_http_port) = ports_in
856 .iter()
857 .find(|port| port.name == "internal-http")
858 .map(|port| port.port_hint.to_string())
859 {
860 pod_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
862 pod_annotations.insert("prometheus.io/port".to_owned(), internal_http_port);
863 pod_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
864 pod_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
865 }
866 }
867 for (key, value) in &self.config.service_annotations {
868 pod_annotations.insert(key.clone(), value.clone());
869 }
870
871 let default_node_selector = if disk {
872 vec![("materialize.cloud/disk".to_string(), disk.to_string())]
873 } else {
874 vec![]
878 };
879
880 let node_selector: BTreeMap<String, String> = default_node_selector
881 .into_iter()
882 .chain(self.config.service_node_selector.clone())
883 .chain(node_selector)
884 .collect();
885
886 let node_affinity = if let Some(availability_zones) = availability_zones {
887 let selector = NodeSelectorTerm {
888 match_expressions: Some(vec![NodeSelectorRequirement {
889 key: "materialize.cloud/availability-zone".to_string(),
890 operator: "In".to_string(),
891 values: Some(availability_zones),
892 }]),
893 match_fields: None,
894 };
895
896 if scheduling_config.soften_az_affinity {
897 Some(NodeAffinity {
898 preferred_during_scheduling_ignored_during_execution: Some(vec![
899 PreferredSchedulingTerm {
900 preference: selector,
901 weight: scheduling_config.soften_az_affinity_weight,
902 },
903 ]),
904 required_during_scheduling_ignored_during_execution: None,
905 })
906 } else {
907 Some(NodeAffinity {
908 preferred_during_scheduling_ignored_during_execution: None,
909 required_during_scheduling_ignored_during_execution: Some(NodeSelector {
910 node_selector_terms: vec![selector],
911 }),
912 })
913 }
914 } else {
915 None
916 };
917
918 let mut affinity = Affinity {
919 pod_anti_affinity: anti_affinity,
920 pod_affinity,
921 node_affinity,
922 ..Default::default()
923 };
924 if let Some(service_affinity) = &self.config.service_affinity {
925 affinity.merge_from(serde_json::from_str(service_affinity)?);
926 }
927
928 let container_name = image
929 .rsplit_once('/')
930 .and_then(|(_, name_version)| name_version.rsplit_once(':'))
931 .context("`image` is not ORG/NAME:VERSION")?
932 .0
933 .to_string();
934
935 let container_security_context = if scheduling_config.security_context_enabled {
936 Some(SecurityContext {
937 privileged: Some(false),
938 run_as_non_root: Some(true),
939 allow_privilege_escalation: Some(false),
940 seccomp_profile: Some(SeccompProfile {
941 type_: "RuntimeDefault".to_string(),
942 ..Default::default()
943 }),
944 capabilities: Some(Capabilities {
945 drop: Some(vec!["ALL".to_string()]),
946 ..Default::default()
947 }),
948 ..Default::default()
949 })
950 } else {
951 None
952 };
953
954 let init_containers = init_container_image.map(|image| {
955 vec![Container {
956 name: "init".to_string(),
957 image: Some(image),
958 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
959 resources: Some(ResourceRequirements {
960 claims: None,
961 limits: Some(limits.clone()),
962 requests: Some(requests.clone()),
963 }),
964 security_context: container_security_context.clone(),
965 env: Some(vec![
966 EnvVar {
967 name: "MZ_NAMESPACE".to_string(),
968 value_from: Some(EnvVarSource {
969 field_ref: Some(ObjectFieldSelector {
970 field_path: "metadata.namespace".to_string(),
971 ..Default::default()
972 }),
973 ..Default::default()
974 }),
975 ..Default::default()
976 },
977 EnvVar {
978 name: "MZ_POD_NAME".to_string(),
979 value_from: Some(EnvVarSource {
980 field_ref: Some(ObjectFieldSelector {
981 field_path: "metadata.name".to_string(),
982 ..Default::default()
983 }),
984 ..Default::default()
985 }),
986 ..Default::default()
987 },
988 EnvVar {
989 name: "MZ_NODE_NAME".to_string(),
990 value_from: Some(EnvVarSource {
991 field_ref: Some(ObjectFieldSelector {
992 field_path: "spec.nodeName".to_string(),
993 ..Default::default()
994 }),
995 ..Default::default()
996 }),
997 ..Default::default()
998 },
999 ]),
1000 ..Default::default()
1001 }]
1002 });
1003
1004 let env = if self.config.coverage {
1005 Some(vec![EnvVar {
1006 name: "LLVM_PROFILE_FILE".to_string(),
1007 value: Some(format!("/coverage/{}-%p-%9m%c.profraw", self.namespace)),
1008 ..Default::default()
1009 }])
1010 } else {
1011 None
1012 };
1013
1014 let mut volume_mounts = vec![];
1015
1016 if self.config.coverage {
1017 volume_mounts.push(VolumeMount {
1018 name: "coverage".to_string(),
1019 mount_path: "/coverage".to_string(),
1020 ..Default::default()
1021 })
1022 }
1023
1024 let volumes = match (disk, &self.config.ephemeral_volume_storage_class) {
1025 (true, Some(ephemeral_volume_storage_class)) => {
1026 volume_mounts.push(VolumeMount {
1027 name: "scratch".to_string(),
1028 mount_path: "/scratch".to_string(),
1029 ..Default::default()
1030 });
1031 args.push("--scratch-directory=/scratch".into());
1032
1033 Some(vec![Volume {
1034 name: "scratch".to_string(),
1035 ephemeral: Some(EphemeralVolumeSource {
1036 volume_claim_template: Some(PersistentVolumeClaimTemplate {
1037 spec: PersistentVolumeClaimSpec {
1038 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1039 storage_class_name: Some(
1040 ephemeral_volume_storage_class.to_string(),
1041 ),
1042 resources: Some(VolumeResourceRequirements {
1043 requests: Some(BTreeMap::from([(
1044 "storage".to_string(),
1045 Quantity(
1046 disk_limit
1047 .unwrap_or(DiskLimit::ARBITRARY)
1048 .0
1049 .as_u64()
1050 .to_string(),
1051 ),
1052 )])),
1053 ..Default::default()
1054 }),
1055 ..Default::default()
1056 },
1057 ..Default::default()
1058 }),
1059 ..Default::default()
1060 }),
1061 ..Default::default()
1062 }])
1063 }
1064 (true, None) => {
1065 return Err(anyhow!(
1066 "service requested disk but no ephemeral volume storage class was configured"
1067 ));
1068 }
1069 (false, _) => None,
1070 };
1071
1072 if let Some(name_prefix) = &self.config.name_prefix {
1073 args.push(format!("--secrets-reader-name-prefix={}", name_prefix));
1074 }
1075
1076 let volume_claim_templates = if self.config.coverage {
1077 Some(vec![PersistentVolumeClaim {
1078 metadata: ObjectMeta {
1079 name: Some("coverage".to_string()),
1080 ..Default::default()
1081 },
1082 spec: Some(PersistentVolumeClaimSpec {
1083 access_modes: Some(vec!["ReadWriteOnce".to_string()]),
1084 resources: Some(VolumeResourceRequirements {
1085 requests: Some(BTreeMap::from([(
1086 "storage".to_string(),
1087 Quantity("10Gi".to_string()),
1088 )])),
1089 ..Default::default()
1090 }),
1091 ..Default::default()
1092 }),
1093 ..Default::default()
1094 }])
1095 } else {
1096 None
1097 };
1098
1099 let tcp_keepalive_sysctls = vec![
1100 Sysctl {
1101 name: "net.ipv4.tcp_keepalive_time".to_string(),
1102 value: "300".to_string(),
1103 },
1104 Sysctl {
1105 name: "net.ipv4.tcp_keepalive_intvl".to_string(),
1106 value: "30".to_string(),
1107 },
1108 Sysctl {
1109 name: "net.ipv4.tcp_keepalive_probes".to_string(),
1110 value: "3".to_string(),
1111 },
1112 ];
1113
1114 let security_context = if let Some(fs_group) = self.config.service_fs_group {
1115 Some(PodSecurityContext {
1116 fs_group: Some(fs_group),
1117 run_as_user: Some(fs_group),
1118 run_as_group: Some(fs_group),
1119 sysctls: Some(tcp_keepalive_sysctls),
1120 ..Default::default()
1121 })
1122 } else {
1123 Some(PodSecurityContext {
1124 sysctls: Some(tcp_keepalive_sysctls),
1125 ..Default::default()
1126 })
1127 };
1128
1129 let mut tolerations = vec![
1130 Toleration {
1135 effect: Some("NoExecute".into()),
1136 key: Some("node.kubernetes.io/not-ready".into()),
1137 operator: Some("Exists".into()),
1138 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1139 value: None,
1140 },
1141 Toleration {
1142 effect: Some("NoExecute".into()),
1143 key: Some("node.kubernetes.io/unreachable".into()),
1144 operator: Some("Exists".into()),
1145 toleration_seconds: Some(NODE_FAILURE_THRESHOLD_SECONDS),
1146 value: None,
1147 },
1148 ];
1149 if let Some(service_tolerations) = &self.config.service_tolerations {
1150 tolerations.extend(serde_json::from_str::<Vec<_>>(service_tolerations)?);
1151 }
1152 let tolerations = Some(tolerations);
1153
1154 let mut pod_template_spec = PodTemplateSpec {
1155 metadata: Some(ObjectMeta {
1156 labels: Some(labels.clone()),
1157 ..Default::default()
1160 }),
1161 spec: Some(PodSpec {
1162 init_containers,
1163 containers: vec![Container {
1164 name: container_name,
1165 image: Some(image),
1166 args: Some(args),
1167 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
1168 ports: Some(
1169 ports_in
1170 .iter()
1171 .map(|port| ContainerPort {
1172 container_port: port.port_hint.into(),
1173 name: Some(port.name.clone()),
1174 ..Default::default()
1175 })
1176 .collect(),
1177 ),
1178 security_context: container_security_context.clone(),
1179 resources: Some(ResourceRequirements {
1180 claims: None,
1181 limits: Some(limits),
1182 requests: Some(requests),
1183 }),
1184 volume_mounts: if !volume_mounts.is_empty() {
1185 Some(volume_mounts)
1186 } else {
1187 None
1188 },
1189 env,
1190 ..Default::default()
1191 }],
1192 volumes,
1193 security_context,
1194 node_selector: Some(node_selector),
1195 scheduler_name: self.config.scheduler_name.clone(),
1196 service_account: self.config.service_account.clone(),
1197 affinity: Some(affinity),
1198 topology_spread_constraints: topology_spread,
1199 tolerations,
1200 termination_grace_period_seconds: Some(0),
1222 ..Default::default()
1223 }),
1224 };
1225 let pod_template_json = serde_json::to_string(&pod_template_spec).unwrap();
1226 let mut hasher = Sha256::new();
1227 hasher.update(pod_template_json);
1228 let pod_template_hash = format!("{:x}", hasher.finalize());
1229 pod_annotations.insert(
1230 POD_TEMPLATE_HASH_ANNOTATION.to_owned(),
1231 pod_template_hash.clone(),
1232 );
1233
1234 pod_template_spec.metadata.as_mut().unwrap().annotations = Some(pod_annotations);
1235
1236 let stateful_set = StatefulSet {
1237 metadata: ObjectMeta {
1238 name: Some(name.clone()),
1239 ..Default::default()
1240 },
1241 spec: Some(StatefulSetSpec {
1242 selector: LabelSelector {
1243 match_labels: Some(match_labels),
1244 ..Default::default()
1245 },
1246 service_name: Some(name.clone()),
1247 replicas: Some(scale.cast_into()),
1248 template: pod_template_spec,
1249 update_strategy: Some(StatefulSetUpdateStrategy {
1250 type_: Some("OnDelete".to_owned()),
1251 ..Default::default()
1252 }),
1253 pod_management_policy: Some("Parallel".to_string()),
1254 volume_claim_templates,
1255 ..Default::default()
1256 }),
1257 status: None,
1258 };
1259
1260 self.send_command(WorkerCommand::EnsureService {
1261 desc: ServiceDescription {
1262 name,
1263 scale,
1264 service,
1265 stateful_set,
1266 pod_template_hash,
1267 },
1268 });
1269
1270 self.service_infos
1271 .lock()
1272 .expect("poisoned lock")
1273 .insert(id.to_string(), ServiceInfo { scale });
1274
1275 Ok(Box::new(KubernetesService { hosts, ports }))
1276 }
1277
1278 fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
1280 fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
1281 self.service_infos.lock().expect("poisoned lock").remove(id);
1282
1283 self.send_command(WorkerCommand::DropService {
1284 name: self.service_name(id),
1285 });
1286
1287 Ok(())
1288 }
1289
1290 async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
1292 let (result_tx, result_rx) = oneshot::channel();
1293 self.send_command(WorkerCommand::ListServices {
1294 namespace: self.namespace.clone(),
1295 result_tx,
1296 });
1297
1298 let list = result_rx.await.expect("worker task not dropped");
1299 Ok(list)
1300 }
1301
1302 fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
1303 fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
1304 let process_id = pod.name_any().split('-').next_back().unwrap().parse()?;
1305 let service_id_label = "environmentd.materialize.cloud/service-id";
1306 let service_id = pod
1307 .labels()
1308 .get(service_id_label)
1309 .ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
1310 .clone();
1311
1312 let oomed = pod
1313 .status
1314 .as_ref()
1315 .and_then(|status| status.container_statuses.as_ref())
1316 .map(|container_statuses| {
1317 container_statuses.iter().any(|cs| {
1318 let current_state = cs.state.as_ref().and_then(|s| s.terminated.as_ref());
1322 let last_state = cs.last_state.as_ref().and_then(|s| s.terminated.as_ref());
1323 let termination_state = current_state.or(last_state);
1324
1325 let exit_code = termination_state.map(|s| s.exit_code);
1332 exit_code.is_some_and(|e| [135, 137, 167].contains(&e))
1333 })
1334 })
1335 .unwrap_or(false);
1336
1337 let (pod_ready, last_probe_time) = pod
1338 .status
1339 .and_then(|status| status.conditions)
1340 .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
1341 .map(|c| (c.status == "True", c.last_probe_time))
1342 .unwrap_or((false, None));
1343
1344 let status = if pod_ready {
1345 ServiceStatus::Online
1346 } else {
1347 ServiceStatus::Offline(oomed.then_some(OfflineReason::OomKilled))
1348 };
1349 let time = if let Some(time) = last_probe_time {
1350 time.0
1351 } else {
1352 Timestamp::now()
1353 };
1354
1355 Ok(ServiceEvent {
1356 service_id,
1357 process_id,
1358 status,
1359 time: DateTime::from_timestamp_nanos(
1360 time.as_nanosecond().try_into().expect("must fit"),
1361 ),
1362 })
1363 }
1364
1365 let stream = watcher(self.pod_api.clone(), self.watch_pod_params())
1366 .touched_objects()
1367 .filter_map(|object| async move {
1368 match object {
1369 Ok(pod) => Some(into_service_event(pod)),
1370 Err(error) => {
1371 tracing::warn!("service watch error: {error}");
1374 None
1375 }
1376 }
1377 });
1378 Box::pin(stream)
1379 }
1380
1381 fn update_scheduling_config(&self, config: ServiceSchedulingConfig) {
1382 *self.scheduling_config.write().expect("poisoned") = config;
1383 }
1384}
1385
1386impl OrchestratorWorker {
1387 fn spawn(self, name: String) -> AbortOnDropHandle<()> {
1388 mz_ore::task::spawn(|| name, self.run()).abort_on_drop()
1389 }
1390
1391 async fn run(mut self) {
1392 {
1393 info!("initializing Kubernetes orchestrator worker");
1394 let start = Instant::now();
1395
1396 let hostname = env::var("HOSTNAME").unwrap_or_else(|_| panic!("HOSTNAME environment variable missing or invalid; required for Kubernetes orchestrator"));
1400 let orchestrator_pod = Retry::default()
1401 .clamp_backoff(Duration::from_secs(10))
1402 .retry_async(|_| self.pod_api.get(&hostname))
1403 .await
1404 .expect("always retries on error");
1405 self.owner_references
1406 .extend(orchestrator_pod.owner_references().into_iter().cloned());
1407
1408 if !self.collect_pod_metrics {
1409 info!(
1410 "pod metrics collection is disabled; resource usage graphs in the console will not be available"
1411 );
1412 }
1413
1414 info!(
1415 "Kubernetes orchestrator worker initialized in {:?}",
1416 start.elapsed()
1417 );
1418 }
1419
1420 while let Some(cmd) = self.command_rx.recv().await {
1421 self.handle_command(cmd).await;
1422 }
1423 }
1424
1425 async fn handle_command(&self, cmd: WorkerCommand) {
1431 async fn retry<F, U, R>(f: F, cmd_type: &str) -> R
1432 where
1433 F: Fn() -> U,
1434 U: Future<Output = Result<R, K8sError>>,
1435 {
1436 Retry::default()
1437 .clamp_backoff(Duration::from_secs(10))
1438 .retry_async(|_| {
1439 f().map_err(
1440 |error| tracing::error!(%cmd_type, "orchestrator call failed: {error}"),
1441 )
1442 })
1443 .await
1444 .expect("always retries on error")
1445 }
1446
1447 use WorkerCommand::*;
1448 match cmd {
1449 EnsureService { desc } => {
1450 retry(|| self.ensure_service(desc.clone()), "EnsureService").await
1451 }
1452 DropService { name } => retry(|| self.drop_service(&name), "DropService").await,
1453 ListServices {
1454 namespace,
1455 result_tx,
1456 } => {
1457 let result = retry(|| self.list_services(&namespace), "ListServices").await;
1458 let _ = result_tx.send(result);
1459 }
1460 FetchServiceMetrics {
1461 name,
1462 info,
1463 result_tx,
1464 } => {
1465 let result = self.fetch_service_metrics(&name, &info).await;
1466 let _ = result_tx.send(result);
1467 }
1468 }
1469 }
1470
1471 async fn fetch_service_metrics(
1472 &self,
1473 name: &str,
1474 info: &ServiceInfo,
1475 ) -> Vec<ServiceProcessMetrics> {
1476 if !self.collect_pod_metrics {
1477 return (0..info.scale.get())
1478 .map(|_| ServiceProcessMetrics::default())
1479 .collect();
1480 }
1481
1482 #[derive(Deserialize)]
1484 pub(crate) struct ClusterdUsage {
1485 disk_bytes: Option<u64>,
1486 memory_bytes: Option<u64>,
1487 swap_bytes: Option<u64>,
1488 heap_limit: Option<u64>,
1489 }
1490
1491 async fn get_metrics(
1497 self_: &OrchestratorWorker,
1498 service_name: &str,
1499 i: usize,
1500 ) -> ServiceProcessMetrics {
1501 let name = format!("{service_name}-{i}");
1502
1503 let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1504 let (metrics, clusterd_usage) =
1505 match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1506 {
1507 (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
1508 (Ok(metrics), Err(e)) => {
1509 warn!("Failed to fetch clusterd usage for {name}: {e}");
1510 (metrics, None)
1511 }
1512 (Err(e), _) => {
1513 warn!("Failed to get metrics for {name}: {e}");
1514 return ServiceProcessMetrics::default();
1515 }
1516 };
1517 let Some(PodMetricsContainer {
1518 usage:
1519 PodMetricsContainerUsage {
1520 cpu: Quantity(cpu_str),
1521 memory: Quantity(mem_str),
1522 },
1523 ..
1524 }) = metrics.containers.get(0)
1525 else {
1526 warn!("metrics result contained no containers for {name}");
1527 return ServiceProcessMetrics::default();
1528 };
1529
1530 let mut process_metrics = ServiceProcessMetrics::default();
1531
1532 match parse_k8s_quantity(cpu_str) {
1533 Ok(q) => match q.try_to_integer(-9, true) {
1534 Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1535 None => error!("CPU value {q:?} out of range"),
1536 },
1537 Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1538 }
1539 match parse_k8s_quantity(mem_str) {
1540 Ok(q) => match q.try_to_integer(0, false) {
1541 Some(mem) => process_metrics.memory_bytes = Some(mem),
1542 None => error!("memory value {q:?} out of range"),
1543 },
1544 Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1545 }
1546
1547 if let Some(usage) = clusterd_usage {
1548 process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1554 (Some(disk), Some(swap)) => Some(disk + swap),
1555 (disk, swap) => disk.or(swap),
1556 };
1557
1558 process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1561 (Some(memory), Some(swap)) => Some(memory + swap),
1562 (Some(memory), None) => Some(memory),
1563 (None, _) => None,
1564 };
1565
1566 process_metrics.heap_limit = usage.heap_limit;
1567 }
1568
1569 process_metrics
1570 }
1571
1572 async fn get_clusterd_usage(
1578 self_: &OrchestratorWorker,
1579 service_name: &str,
1580 i: usize,
1581 ) -> anyhow::Result<ClusterdUsage> {
1582 let service = self_
1583 .service_api
1584 .get(service_name)
1585 .await
1586 .with_context(|| format!("failed to get service {service_name}"))?;
1587 let namespace = service
1588 .metadata
1589 .namespace
1590 .context("missing service namespace")?;
1591 let internal_http_port = service
1592 .spec
1593 .and_then(|spec| spec.ports)
1594 .and_then(|ports| {
1595 ports
1596 .into_iter()
1597 .find(|p| p.name == Some("internal-http".into()))
1598 })
1599 .map(|p| p.port);
1600 let Some(port) = internal_http_port else {
1601 bail!("internal-http port missing in service spec");
1602 };
1603 let metrics_url = format!(
1604 "http://{service_name}-{i}.{service_name}.{namespace}.svc.cluster.local:{port}\
1605 /api/usage-metrics"
1606 );
1607
1608 let http_client = reqwest::Client::builder()
1609 .timeout(Duration::from_secs(10))
1610 .build()
1611 .context("error building HTTP client")?;
1612 let resp = http_client.get(metrics_url).send().await?;
1613 let usage = resp.json().await?;
1614
1615 Ok(usage)
1616 }
1617
1618 let ret = futures::future::join_all(
1619 (0..info.scale.cast_into()).map(|i| get_metrics(self, name, i)),
1620 );
1621
1622 ret.await
1623 }
1624
1625 async fn ensure_service(&self, mut desc: ServiceDescription) -> Result<(), K8sError> {
1626 desc.service
1632 .metadata
1633 .owner_references
1634 .get_or_insert(vec![])
1635 .extend(self.owner_references.iter().cloned());
1636 desc.stateful_set
1637 .metadata
1638 .owner_references
1639 .get_or_insert(vec![])
1640 .extend(self.owner_references.iter().cloned());
1641
1642 let ss_spec = desc.stateful_set.spec.as_ref().unwrap();
1643 let pod_metadata = ss_spec.template.metadata.as_ref().unwrap();
1644 let pod_annotations = pod_metadata.annotations.clone();
1645
1646 self.service_api
1647 .patch(
1648 &desc.name,
1649 &PatchParams::apply(FIELD_MANAGER).force(),
1650 &Patch::Apply(desc.service),
1651 )
1652 .await?;
1653 self.stateful_set_api
1654 .patch(
1655 &desc.name,
1656 &PatchParams::apply(FIELD_MANAGER).force(),
1657 &Patch::Apply(desc.stateful_set),
1658 )
1659 .await?;
1660
1661 for pod_id in 0..desc.scale.get() {
1672 let pod_name = format!("{}-{pod_id}", desc.name);
1673 let pod = match self.pod_api.get(&pod_name).await {
1674 Ok(pod) => pod,
1675 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1677 Err(e) => return Err(e),
1678 };
1679
1680 let result = if pod.annotations().get(POD_TEMPLATE_HASH_ANNOTATION)
1681 != Some(&desc.pod_template_hash)
1682 {
1683 self.pod_api
1684 .delete(&pod_name, &DeleteParams::default())
1685 .await
1686 .map(|_| ())
1687 } else {
1688 let metadata = ObjectMeta {
1689 annotations: pod_annotations.clone(),
1690 ..Default::default()
1691 }
1692 .into_request_partial::<Pod>();
1693 self.pod_api
1694 .patch_metadata(
1695 &pod_name,
1696 &PatchParams::apply(FIELD_MANAGER).force(),
1697 &Patch::Apply(&metadata),
1698 )
1699 .await
1700 .map(|_| ())
1701 };
1702
1703 match result {
1704 Ok(()) => (),
1705 Err(kube::Error::Api(e)) if e.code == 404 => continue,
1707 Err(e) => return Err(e),
1708 }
1709 }
1710
1711 Ok(())
1712 }
1713
1714 async fn drop_service(&self, name: &str) -> Result<(), K8sError> {
1715 let res = self
1716 .stateful_set_api
1717 .delete(name, &DeleteParams::default())
1718 .await;
1719 match res {
1720 Ok(_) => (),
1721 Err(K8sError::Api(e)) if e.code == 404 => (),
1722 Err(e) => return Err(e),
1723 }
1724
1725 let res = self
1726 .service_api
1727 .delete(name, &DeleteParams::default())
1728 .await;
1729 match res {
1730 Ok(_) => Ok(()),
1731 Err(K8sError::Api(e)) if e.code == 404 => Ok(()),
1732 Err(e) => Err(e),
1733 }
1734 }
1735
1736 async fn list_services(&self, namespace: &str) -> Result<Vec<String>, K8sError> {
1737 let stateful_sets = self.stateful_set_api.list(&Default::default()).await?;
1738 let name_prefix = format!("{}{namespace}-", self.name_prefix);
1739 Ok(stateful_sets
1740 .into_iter()
1741 .filter_map(|ss| {
1742 ss.metadata
1743 .name
1744 .unwrap()
1745 .strip_prefix(&name_prefix)
1746 .map(Into::into)
1747 })
1748 .collect())
1749 }
1750}
1751
1752#[derive(Debug, Clone)]
1753struct KubernetesService {
1754 hosts: Vec<String>,
1755 ports: BTreeMap<String, u16>,
1756}
1757
1758impl Service for KubernetesService {
1759 fn addresses(&self, port: &str) -> Vec<String> {
1760 let port = self.ports[port];
1761 self.hosts
1762 .iter()
1763 .map(|host| format!("{host}:{port}"))
1764 .collect()
1765 }
1766}
1767
1768fn topology_spread_min_domains(
1776 soft: bool,
1777 az_pinned: bool,
1778 min_domains: Option<i32>,
1779) -> Option<i32> {
1780 if soft || az_pinned { None } else { min_domains }
1781}
1782
1783#[cfg(test)]
1784mod tests {
1785 use super::*;
1786
1787 #[mz_ore::test]
1788 fn topology_spread_min_domains_suppression() {
1789 assert_eq!(topology_spread_min_domains(false, false, Some(3)), Some(3));
1791 assert_eq!(topology_spread_min_domains(false, false, None), None);
1793 assert_eq!(topology_spread_min_domains(true, false, Some(3)), None);
1795 assert_eq!(topology_spread_min_domains(false, true, Some(3)), None);
1797 assert_eq!(topology_spread_min_domains(true, true, Some(3)), None);
1799 }
1800
1801 #[mz_ore::test]
1802 fn k8s_quantity_base10_large() {
1803 let cases = &[
1804 ("42", 42),
1805 ("42k", 42000),
1806 ("42M", 42000000),
1807 ("42G", 42000000000),
1808 ("42T", 42000000000000),
1809 ("42P", 42000000000000000),
1810 ];
1811
1812 for (input, expected) in cases {
1813 let quantity = parse_k8s_quantity(input).unwrap();
1814 let number = quantity.try_to_integer(0, true).unwrap();
1815 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1816 }
1817 }
1818
1819 #[mz_ore::test]
1820 fn k8s_quantity_base10_small() {
1821 let cases = &[("42n", 42), ("42u", 42000), ("42m", 42000000)];
1822
1823 for (input, expected) in cases {
1824 let quantity = parse_k8s_quantity(input).unwrap();
1825 let number = quantity.try_to_integer(-9, true).unwrap();
1826 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1827 }
1828 }
1829
1830 #[mz_ore::test]
1831 fn k8s_quantity_base2() {
1832 let cases = &[
1833 ("42Ki", 42 << 10),
1834 ("42Mi", 42 << 20),
1835 ("42Gi", 42 << 30),
1836 ("42Ti", 42 << 40),
1837 ("42Pi", 42 << 50),
1838 ];
1839
1840 for (input, expected) in cases {
1841 let quantity = parse_k8s_quantity(input).unwrap();
1842 let number = quantity.try_to_integer(0, false).unwrap();
1843 assert_eq!(number, *expected, "input={input}, quantity={quantity:?}");
1844 }
1845 }
1846}