1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19 api::{
20 apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21 core::v1::{
22 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23 EnvVarSource, KeyToPath, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25 Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26 VolumeMount,
27 },
28 networking::v1::{
29 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31 },
32 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33 },
34 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40 ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::{
54 Certificate, CertificatePrivateKeyAlgorithm,
55};
56use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
57use mz_orchestrator_tracing::TracingCliArgs;
58use mz_ore::instrument;
59
60static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
61 major: 0,
62 minor: 140,
63 patch: 0,
64 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
65 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
66});
67const V143: Version = Version::new(0, 143, 0);
68const V144: Version = Version::new(0, 144, 0);
69static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
70 major: 0,
71 minor: 147,
72 patch: 0,
73 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
74 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
75});
76const V153: Version = Version::new(0, 153, 0);
77static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
78 major: 0,
79 minor: 154,
80 patch: 0,
81 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
82 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
83});
84
85#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91 Initializing,
94 ReadyToPromote,
96 Promoting,
98 IsLeader,
100}
101
102#[derive(Deserialize, Serialize)]
103pub struct LoginCredentials {
104 username: String,
105 password: String,
107}
108
109#[derive(Debug, Serialize)]
110pub struct ConnectionInfo {
111 pub environmentd_url: String,
112 pub mz_system_secret_name: Option<String>,
113 pub listeners_configmap: ConfigMap,
114}
115
116#[derive(Debug, Serialize)]
117pub struct Resources {
118 pub generation: u64,
119 pub environmentd_network_policies: Vec<NetworkPolicy>,
120 pub service_account: Box<Option<ServiceAccount>>,
121 pub role: Box<Role>,
122 pub role_binding: Box<RoleBinding>,
123 pub public_service: Box<Service>,
124 pub generation_service: Box<Service>,
125 pub persist_pubsub_service: Box<Service>,
126 pub environmentd_certificate: Box<Option<Certificate>>,
127 pub environmentd_statefulset: Box<StatefulSet>,
128 pub connection_info: Box<ConnectionInfo>,
129}
130
131impl Resources {
132 pub fn new(
133 config: &super::MaterializeControllerArgs,
134 tracing: &TracingCliArgs,
135 orchestratord_namespace: &str,
136 mz: &Materialize,
137 generation: u64,
138 ) -> Self {
139 let environmentd_network_policies =
140 create_environmentd_network_policies(config, mz, orchestratord_namespace);
141
142 let service_account = Box::new(create_service_account_object(config, mz));
143 let role = Box::new(create_role_object(mz));
144 let role_binding = Box::new(create_role_binding_object(mz));
145 let public_service = Box::new(create_public_service_object(config, mz, generation));
146 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
147 let persist_pubsub_service =
148 Box::new(create_persist_pubsub_service(config, mz, generation));
149 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
150 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
151 config, tracing, mz, generation,
152 ));
153 let connection_info = Box::new(create_connection_info(config, mz, generation));
154
155 Self {
156 generation,
157 environmentd_network_policies,
158 service_account,
159 role,
160 role_binding,
161 public_service,
162 generation_service,
163 persist_pubsub_service,
164 environmentd_certificate,
165 environmentd_statefulset,
166 connection_info,
167 }
168 }
169
170 #[instrument]
171 pub async fn apply(
172 &self,
173 client: &Client,
174 increment_generation: bool,
175 force_promote: bool,
176 namespace: &str,
177 ) -> Result<Option<Action>, anyhow::Error> {
178 let environmentd_network_policy_api: Api<NetworkPolicy> =
179 Api::namespaced(client.clone(), namespace);
180 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
181 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
182 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
183 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
184 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
185 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
186 let pod_api: Api<Pod> = Api::namespaced(client.clone(), namespace);
187 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
188 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
189
190 for policy in &self.environmentd_network_policies {
191 trace!("applying network policy {}", policy.name_unchecked());
192 apply_resource(&environmentd_network_policy_api, policy).await?;
193 }
194
195 if let Some(service_account) = &*self.service_account {
196 trace!("applying environmentd service account");
197 apply_resource(&service_account_api, service_account).await?;
198 }
199
200 trace!("applying environmentd role");
201 apply_resource(&role_api, &*self.role).await?;
202
203 trace!("applying environmentd role binding");
204 apply_resource(&role_binding_api, &*self.role_binding).await?;
205
206 trace!("applying environmentd per-generation service");
207 apply_resource(&service_api, &*self.generation_service).await?;
208
209 trace!("creating persist pubsub service");
210 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
211
212 if let Some(certificate) = &*self.environmentd_certificate {
213 trace!("creating new environmentd certificate");
214 apply_resource(&certificate_api, certificate).await?;
215 }
216
217 trace!("applying listeners configmap");
218 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
219
220 trace!("creating new environmentd statefulset");
221 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
222
223 if increment_generation {
240 let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
241
242 let statefulset = get_resource(
243 &statefulset_api,
244 &self.environmentd_statefulset.name_unchecked(),
245 )
246 .await?;
247 if statefulset
248 .and_then(|statefulset| statefulset.status)
249 .and_then(|status| status.ready_replicas)
250 .unwrap_or(0)
251 == 0
252 {
253 trace!("environmentd statefulset is not ready yet...");
254 return Ok(Some(retry_action));
255 }
256
257 let http_client = match &self.connection_info.mz_system_secret_name {
258 Some(mz_system_secret_name) => {
259 let http_client = reqwest::Client::builder()
260 .timeout(std::time::Duration::from_secs(10))
261 .cookie_store(true)
262 .danger_accept_invalid_certs(true)
264 .build()
265 .unwrap();
266 if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
267 if let Some(password) =
268 data.get("external_login_password_mz_system").cloned()
269 {
270 let password = String::from_utf8_lossy(&password.0).to_string();
271 let login_url = reqwest::Url::parse(&format!(
272 "{}/api/login",
273 self.connection_info.environmentd_url,
274 ))
275 .unwrap();
276 match http_client
277 .post(login_url)
278 .body(serde_json::to_string(&LoginCredentials {
279 username: "mz_system".to_owned(),
280 password,
281 })?)
282 .header("Content-Type", "application/json")
283 .send()
284 .await
285 {
286 Ok(response) => {
287 if let Err(e) = response.error_for_status() {
288 trace!(
289 "failed to login to environmentd, retrying... ({e})"
290 );
291 return Ok(Some(retry_action));
292 }
293 }
294 Err(e) => {
295 trace!("failed to connect to environmentd, retrying... ({e})");
296 return Ok(Some(retry_action));
297 }
298 };
299 }
300 };
301 http_client
302 }
303 None => reqwest::Client::builder()
304 .timeout(std::time::Duration::from_secs(10))
305 .build()
306 .unwrap(),
307 };
308 let status_url = reqwest::Url::parse(&format!(
309 "{}/api/leader/status",
310 self.connection_info.environmentd_url,
311 ))
312 .unwrap();
313
314 match http_client.get(status_url.clone()).send().await {
315 Ok(response) => {
316 let response: BTreeMap<String, DeploymentStatus> =
317 match response.error_for_status() {
318 Ok(response) => response.json().await?,
319 Err(e) => {
320 trace!("failed to get status of environmentd, retrying... ({e})");
321 return Ok(Some(retry_action));
322 }
323 };
324 if force_promote {
325 trace!("skipping cluster catchup");
326 let skip_catchup_url = reqwest::Url::parse(&format!(
327 "{}/api/leader/skip-catchup",
328 self.connection_info.environmentd_url,
329 ))
330 .unwrap();
331 let response = http_client.post(skip_catchup_url).send().await?;
332 if response.status() == StatusCode::BAD_REQUEST {
333 let err: SkipCatchupError = response.json().await?;
334 bail!("failed to skip catchup: {}", err.message);
335 }
336 } else if response["status"] == DeploymentStatus::Initializing {
337 trace!("environmentd is still initializing, retrying...");
338 return Ok(Some(retry_action));
339 } else {
340 trace!("environmentd is ready");
341 }
342 }
343 Err(e) => {
344 trace!("failed to connect to environmentd, retrying... ({e})");
345 return Ok(Some(retry_action));
346 }
347 }
348
349 let promote_url = reqwest::Url::parse(&format!(
350 "{}/api/leader/promote",
351 self.connection_info.environmentd_url,
352 ))
353 .unwrap();
354
355 trace!("promoting new environmentd to leader");
364 let response = http_client.post(promote_url).send().await?;
365 let response: BecomeLeaderResponse = match response.error_for_status() {
366 Ok(response) => response.json().await?,
367 Err(e) => {
368 trace!("failed to promote environmentd, retrying... ({e})");
369 return Ok(Some(retry_action));
370 }
371 };
372 if let BecomeLeaderResult::Failure { message } = response.result {
373 bail!("failed to promote new environmentd: {message}");
374 }
375
376 match http_client.get(status_url.clone()).send().await {
388 Ok(response) => {
389 let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
390 if response["status"] != DeploymentStatus::IsLeader {
391 trace!(
392 "environmentd is still promoting (status: {:?}), retrying...",
393 response["status"]
394 );
395 return Ok(Some(retry_action));
396 } else {
397 trace!("environmentd is ready");
398 }
399 }
400 Err(e) => {
401 trace!("failed to connect to environmentd, retrying... ({e})");
402 return Ok(Some(retry_action));
403 }
404 }
405 } else {
406 trace!("restarting environmentd pod to pick up statefulset changes");
407 delete_resource(
408 &pod_api,
409 &statefulset_pod_name(&*self.environmentd_statefulset, 0),
410 )
411 .await?;
412 }
413
414 Ok(None)
415 }
416
417 #[instrument]
418 pub async fn promote_services(
419 &self,
420 client: &Client,
421 namespace: &str,
422 ) -> Result<(), anyhow::Error> {
423 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
424
425 trace!("applying environmentd public service");
426 apply_resource(&service_api, &*self.public_service).await?;
427
428 Ok(())
429 }
430
431 #[instrument]
432 pub async fn teardown_generation(
433 &self,
434 client: &Client,
435 mz: &Materialize,
436 generation: u64,
437 ) -> Result<(), anyhow::Error> {
438 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
439 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
440 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
441
442 trace!("deleting environmentd statefulset for generation {generation}");
443 delete_resource(
444 &statefulset_api,
445 &mz.environmentd_statefulset_name(generation),
446 )
447 .await?;
448
449 trace!("deleting persist pubsub service for generation {generation}");
450 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
451
452 trace!("deleting environmentd per-generation service for generation {generation}");
453 delete_resource(
454 &service_api,
455 &mz.environmentd_generation_service_name(generation),
456 )
457 .await?;
458
459 trace!("deleting listeners configmap for generation {generation}");
460 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
461
462 Ok(())
463 }
464
465 pub fn generate_hash(&self) -> String {
468 let mut hasher = Sha256::new();
469 hasher.update(&serde_json::to_string(self).unwrap());
470 format!("{:x}", hasher.finalize())
471 }
472}
473
474fn create_environmentd_network_policies(
475 config: &super::MaterializeControllerArgs,
476 mz: &Materialize,
477 orchestratord_namespace: &str,
478) -> Vec<NetworkPolicy> {
479 let mut network_policies = Vec::new();
480 if config.network_policies.internal_enabled {
481 let environmentd_label_selector = LabelSelector {
482 match_labels: Some(
483 mz.default_labels()
484 .into_iter()
485 .chain([(
486 "materialize.cloud/app".to_owned(),
487 mz.environmentd_app_name(),
488 )])
489 .collect(),
490 ),
491 ..Default::default()
492 };
493 let orchestratord_label_selector = LabelSelector {
494 match_labels: Some(
495 config
496 .orchestratord_pod_selector_labels
497 .iter()
498 .cloned()
499 .map(|kv| (kv.key, kv.value))
500 .collect(),
501 ),
502 ..Default::default()
503 };
504 let all_pods_label_selector = LabelSelector {
507 match_labels: Some(mz.default_labels()),
508 ..Default::default()
509 };
510 network_policies.extend([
511 NetworkPolicy {
514 metadata: mz
515 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
516 spec: Some(NetworkPolicySpec {
517 egress: Some(vec![NetworkPolicyEgressRule {
518 to: Some(vec![NetworkPolicyPeer {
519 pod_selector: Some(all_pods_label_selector.clone()),
520 ..Default::default()
521 }]),
522 ..Default::default()
523 }]),
524 ingress: Some(vec![NetworkPolicyIngressRule {
525 from: Some(vec![NetworkPolicyPeer {
526 pod_selector: Some(all_pods_label_selector.clone()),
527 ..Default::default()
528 }]),
529 ..Default::default()
530 }]),
531 pod_selector: Some(all_pods_label_selector.clone()),
532 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
533 ..Default::default()
534 }),
535 },
536 NetworkPolicy {
539 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
540 spec: Some(NetworkPolicySpec {
541 ingress: Some(vec![NetworkPolicyIngressRule {
542 from: Some(vec![NetworkPolicyPeer {
543 namespace_selector: Some(LabelSelector {
544 match_labels: Some(btreemap! {
545 "kubernetes.io/metadata.name".into()
546 => orchestratord_namespace.into(),
547 }),
548 ..Default::default()
549 }),
550 pod_selector: Some(orchestratord_label_selector),
551 ..Default::default()
552 }]),
553 ports: Some(vec![
554 NetworkPolicyPort {
555 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
556 protocol: Some("TCP".to_string()),
557 ..Default::default()
558 },
559 NetworkPolicyPort {
560 port: Some(IntOrString::Int(
561 config.environmentd_internal_http_port.into(),
562 )),
563 protocol: Some("TCP".to_string()),
564 ..Default::default()
565 },
566 ]),
567 ..Default::default()
568 }]),
569 pod_selector: Some(environmentd_label_selector),
570 policy_types: Some(vec!["Ingress".to_owned()]),
571 ..Default::default()
572 }),
573 },
574 ]);
575 }
576 if config.network_policies.ingress_enabled {
577 let mut ingress_label_selector = mz.default_labels();
578 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
579 network_policies.extend([NetworkPolicy {
580 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
581 spec: Some(NetworkPolicySpec {
582 ingress: Some(vec![NetworkPolicyIngressRule {
583 from: Some(
584 config
585 .network_policies
586 .ingress_cidrs
587 .iter()
588 .map(|cidr| NetworkPolicyPeer {
589 ip_block: Some(IPBlock {
590 cidr: cidr.to_owned(),
591 except: None,
592 }),
593 ..Default::default()
594 })
595 .collect(),
596 ),
597 ports: Some(vec![
598 NetworkPolicyPort {
599 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
600 protocol: Some("TCP".to_string()),
601 ..Default::default()
602 },
603 NetworkPolicyPort {
604 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
605 protocol: Some("TCP".to_string()),
606 ..Default::default()
607 },
608 ]),
609 ..Default::default()
610 }]),
611 pod_selector: Some(LabelSelector {
612 match_expressions: None,
613 match_labels: Some(ingress_label_selector),
614 }),
615 policy_types: Some(vec!["Ingress".to_owned()]),
616 ..Default::default()
617 }),
618 }]);
619 }
620 if config.network_policies.egress_enabled {
621 network_policies.extend([NetworkPolicy {
622 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
623 spec: Some(NetworkPolicySpec {
624 egress: Some(vec![NetworkPolicyEgressRule {
625 to: Some(
626 config
627 .network_policies
628 .egress_cidrs
629 .iter()
630 .map(|cidr| NetworkPolicyPeer {
631 ip_block: Some(IPBlock {
632 cidr: cidr.to_owned(),
633 except: None,
634 }),
635 ..Default::default()
636 })
637 .collect(),
638 ),
639 ..Default::default()
640 }]),
641 pod_selector: Some(LabelSelector {
642 match_expressions: None,
643 match_labels: Some(mz.default_labels()),
644 }),
645 policy_types: Some(vec!["Egress".to_owned()]),
646 ..Default::default()
647 }),
648 }]);
649 }
650 network_policies
651}
652
653fn create_service_account_object(
654 config: &super::MaterializeControllerArgs,
655 mz: &Materialize,
656) -> Option<ServiceAccount> {
657 if mz.create_service_account() {
658 let mut annotations: BTreeMap<String, String> = mz
659 .spec
660 .service_account_annotations
661 .clone()
662 .unwrap_or_default();
663 if let (CloudProvider::Aws, Some(role_arn)) = (
664 config.cloud_provider,
665 mz.spec
666 .environmentd_iam_role_arn
667 .as_deref()
668 .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
669 ) {
670 warn!(
671 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
672 );
673 annotations.insert(
674 "eks.amazonaws.com/role-arn".to_string(),
675 role_arn.to_string(),
676 );
677 };
678
679 let mut labels = mz.default_labels();
680 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
681
682 Some(ServiceAccount {
683 metadata: ObjectMeta {
684 annotations: Some(annotations),
685 labels: Some(labels),
686 ..mz.managed_resource_meta(mz.service_account_name())
687 },
688 ..Default::default()
689 })
690 } else {
691 None
692 }
693}
694
695fn create_role_object(mz: &Materialize) -> Role {
696 Role {
697 metadata: mz.managed_resource_meta(mz.role_name()),
698 rules: Some(vec![
699 PolicyRule {
700 api_groups: Some(vec!["apps".to_string()]),
701 resources: Some(vec!["statefulsets".to_string()]),
702 verbs: vec![
703 "get".to_string(),
704 "list".to_string(),
705 "watch".to_string(),
706 "create".to_string(),
707 "update".to_string(),
708 "patch".to_string(),
709 "delete".to_string(),
710 ],
711 ..Default::default()
712 },
713 PolicyRule {
714 api_groups: Some(vec!["".to_string()]),
715 resources: Some(vec![
716 "persistentvolumeclaims".to_string(),
717 "pods".to_string(),
718 "secrets".to_string(),
719 "services".to_string(),
720 ]),
721 verbs: vec![
722 "get".to_string(),
723 "list".to_string(),
724 "watch".to_string(),
725 "create".to_string(),
726 "update".to_string(),
727 "patch".to_string(),
728 "delete".to_string(),
729 ],
730 ..Default::default()
731 },
732 PolicyRule {
733 api_groups: Some(vec!["".to_string()]),
734 resources: Some(vec!["configmaps".to_string()]),
735 verbs: vec!["get".to_string()],
736 ..Default::default()
737 },
738 PolicyRule {
739 api_groups: Some(vec!["materialize.cloud".to_string()]),
740 resources: Some(vec!["vpcendpoints".to_string()]),
741 verbs: vec![
742 "get".to_string(),
743 "list".to_string(),
744 "watch".to_string(),
745 "create".to_string(),
746 "update".to_string(),
747 "patch".to_string(),
748 "delete".to_string(),
749 ],
750 ..Default::default()
751 },
752 PolicyRule {
753 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
754 resources: Some(vec!["pods".to_string()]),
755 verbs: vec!["get".to_string(), "list".to_string()],
756 ..Default::default()
757 },
758 PolicyRule {
759 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
760 resources: Some(vec![
761 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
762 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
763 ]),
764 verbs: vec!["get".to_string()],
765 ..Default::default()
766 },
767 ]),
768 }
769}
770
771fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
772 RoleBinding {
773 metadata: mz.managed_resource_meta(mz.role_binding_name()),
774 role_ref: RoleRef {
775 api_group: "".to_string(),
776 kind: "Role".to_string(),
777 name: mz.role_name(),
778 },
779 subjects: Some(vec![Subject {
780 api_group: Some("".to_string()),
781 kind: "ServiceAccount".to_string(),
782 name: mz.service_account_name(),
783 namespace: Some(mz.namespace()),
784 }]),
785 }
786}
787
788fn create_public_service_object(
789 config: &super::MaterializeControllerArgs,
790 mz: &Materialize,
791 generation: u64,
792) -> Service {
793 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
794}
795
796fn create_generation_service_object(
797 config: &super::MaterializeControllerArgs,
798 mz: &Materialize,
799 generation: u64,
800) -> Service {
801 create_base_service_object(
802 config,
803 mz,
804 generation,
805 &mz.environmentd_generation_service_name(generation),
806 )
807}
808
809fn create_base_service_object(
810 config: &super::MaterializeControllerArgs,
811 mz: &Materialize,
812 generation: u64,
813 service_name: &str,
814) -> Service {
815 let ports = vec![
816 ServicePort {
817 port: config.environmentd_sql_port.into(),
818 protocol: Some("TCP".to_string()),
819 name: Some("sql".to_string()),
820 ..Default::default()
821 },
822 ServicePort {
823 port: config.environmentd_http_port.into(),
824 protocol: Some("TCP".to_string()),
825 name: Some("https".to_string()),
826 ..Default::default()
827 },
828 ServicePort {
829 port: config.environmentd_internal_sql_port.into(),
830 protocol: Some("TCP".to_string()),
831 name: Some("internal-sql".to_string()),
832 ..Default::default()
833 },
834 ServicePort {
835 port: config.environmentd_internal_http_port.into(),
836 protocol: Some("TCP".to_string()),
837 name: Some("internal-http".to_string()),
838 ..Default::default()
839 },
840 ];
841
842 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
843
844 let spec = ServiceSpec {
845 type_: Some("ClusterIP".to_string()),
846 cluster_ip: Some("None".to_string()),
847 selector: Some(selector),
848 ports: Some(ports),
849 ..Default::default()
850 };
851
852 Service {
853 metadata: mz.managed_resource_meta(service_name.to_string()),
854 spec: Some(spec),
855 status: None,
856 }
857}
858
859fn create_persist_pubsub_service(
860 config: &super::MaterializeControllerArgs,
861 mz: &Materialize,
862 generation: u64,
863) -> Service {
864 Service {
865 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
866 spec: Some(ServiceSpec {
867 type_: Some("ClusterIP".to_string()),
868 cluster_ip: Some("None".to_string()),
869 selector: Some(btreemap! {
870 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
871 }),
872 ports: Some(vec![ServicePort {
873 name: Some("grpc".to_string()),
874 protocol: Some("TCP".to_string()),
875 port: config.environmentd_internal_persist_pubsub_port.into(),
876 ..Default::default()
877 }]),
878 ..Default::default()
879 }),
880 status: None,
881 }
882}
883
884fn create_environmentd_certificate(
885 config: &super::MaterializeControllerArgs,
886 mz: &Materialize,
887) -> Option<Certificate> {
888 create_certificate(
889 config.default_certificate_specs.internal.clone(),
890 mz,
891 mz.spec.internal_certificate_spec.clone(),
892 mz.environmentd_certificate_name(),
893 mz.environmentd_certificate_secret_name(),
894 Some(vec![
895 mz.environmentd_service_name(),
896 mz.environmentd_service_internal_fqdn(),
897 ]),
898 CertificatePrivateKeyAlgorithm::Ed25519,
899 None,
900 )
901}
902
903fn create_environmentd_statefulset_object(
904 config: &super::MaterializeControllerArgs,
905 tracing: &TracingCliArgs,
906 mz: &Materialize,
907 generation: u64,
908) -> StatefulSet {
909 let mut env = vec![
919 EnvVar {
920 name: "MZ_METADATA_BACKEND_URL".to_string(),
921 value_from: Some(EnvVarSource {
922 secret_key_ref: Some(SecretKeySelector {
923 name: mz.backend_secret_name(),
924 key: "metadata_backend_url".to_string(),
925 optional: Some(false),
926 }),
927 ..Default::default()
928 }),
929 ..Default::default()
930 },
931 EnvVar {
932 name: "MZ_PERSIST_BLOB_URL".to_string(),
933 value_from: Some(EnvVarSource {
934 secret_key_ref: Some(SecretKeySelector {
935 name: mz.backend_secret_name(),
936 key: "persist_backend_url".to_string(),
937 optional: Some(false),
938 }),
939 ..Default::default()
940 }),
941 ..Default::default()
942 },
943 ];
944
945 env.push(EnvVar {
946 name: "AWS_REGION".to_string(),
947 value: Some(config.region.clone()),
948 ..Default::default()
949 });
950
951 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
952
953 let mut args = vec![];
954
955 if let Some(helm_chart_version) = &config.helm_chart_version {
956 args.push(format!("--helm-chart-version={helm_chart_version}"));
957 }
958
959 args.push(format!(
961 "--environment-id={}",
962 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
963 ));
964
965 args.push(format!(
967 "--clusterd-image={}",
968 matching_image_from_environmentd_image_ref(
969 &mz.spec.environmentd_image_ref,
970 "clusterd",
971 None
972 )
973 ));
974
975 args.extend(
977 [
978 config
979 .environmentd_cluster_replica_sizes
980 .as_ref()
981 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
982 config
983 .bootstrap_default_cluster_replica_size
984 .as_ref()
985 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
986 config
987 .bootstrap_builtin_system_cluster_replica_size
988 .as_ref()
989 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
990 config
991 .bootstrap_builtin_probe_cluster_replica_size
992 .as_ref()
993 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
994 config
995 .bootstrap_builtin_support_cluster_replica_size
996 .as_ref()
997 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
998 config
999 .bootstrap_builtin_catalog_server_cluster_replica_size
1000 .as_ref()
1001 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1002 config
1003 .bootstrap_builtin_analytics_cluster_replica_size
1004 .as_ref()
1005 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1006 config
1007 .bootstrap_builtin_system_cluster_replication_factor
1008 .as_ref()
1009 .map(|replication_factor| {
1010 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1011 }),
1012 config
1013 .bootstrap_builtin_probe_cluster_replication_factor
1014 .as_ref()
1015 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1016 config
1017 .bootstrap_builtin_support_cluster_replication_factor
1018 .as_ref()
1019 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1020 config
1021 .bootstrap_builtin_analytics_cluster_replication_factor
1022 .as_ref()
1023 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1024 ]
1025 .into_iter()
1026 .flatten(),
1027 );
1028
1029 args.extend(
1030 config
1031 .environmentd_allowed_origins
1032 .iter()
1033 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1034 );
1035
1036 args.push(format!(
1037 "--secrets-controller={}",
1038 config.secrets_controller
1039 ));
1040
1041 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1042 if let Ok(cluster_replica_sizes) =
1043 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1044 {
1045 let cluster_replica_sizes: Vec<_> =
1046 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1047 args.push(format!(
1048 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1049 cluster_replica_sizes.join("', '")
1050 ));
1051 }
1052 }
1053 if !config.cloud_provider.is_cloud() {
1054 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1055 }
1056
1057 if config.enable_internal_statement_logging {
1058 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1059 }
1060
1061 if config.disable_statement_logging {
1062 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1063 }
1064
1065 if !mz.spec.enable_rbac {
1066 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1067 }
1068
1069 args.push("--persist-isolated-runtime-threads=-1".to_string());
1073
1074 if config.cloud_provider == CloudProvider::Aws {
1076 if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1077 for az in azs {
1078 args.push(format!("--availability-zone={az}"));
1079 }
1080 }
1081
1082 if let Some(environmentd_connection_role_arn) = mz
1083 .spec
1084 .environmentd_connection_role_arn
1085 .as_deref()
1086 .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1087 {
1088 args.push(format!(
1089 "--aws-connection-role-arn={}",
1090 environmentd_connection_role_arn
1091 ));
1092 }
1093 if let Some(account_id) = &config.aws_info.aws_account_id {
1094 args.push(format!("--aws-account-id={account_id}"));
1095 }
1096
1097 args.extend([format!(
1098 "--aws-secrets-controller-tags=Environment={}",
1099 mz.name_unchecked()
1100 )]);
1101 args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1102 }
1103
1104 args.extend([
1106 "--orchestrator=kubernetes".into(),
1107 format!(
1108 "--orchestrator-kubernetes-service-account={}",
1109 &mz.service_account_name()
1110 ),
1111 format!(
1112 "--orchestrator-kubernetes-image-pull-policy={}",
1113 config.image_pull_policy.as_kebab_case_str(),
1114 ),
1115 ]);
1116 for selector in &config.clusterd_node_selector {
1117 args.push(format!(
1118 "--orchestrator-kubernetes-service-node-selector={}={}",
1119 selector.key, selector.value,
1120 ));
1121 }
1122 if mz.meets_minimum_version(&V144) {
1123 if let Some(affinity) = &config.clusterd_affinity {
1124 let affinity = serde_json::to_string(affinity).unwrap();
1125 args.push(format!(
1126 "--orchestrator-kubernetes-service-affinity={affinity}"
1127 ))
1128 }
1129 if let Some(tolerations) = &config.clusterd_tolerations {
1130 let tolerations = serde_json::to_string(tolerations).unwrap();
1131 args.push(format!(
1132 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1133 ))
1134 }
1135 }
1136 if let Some(scheduler_name) = &config.scheduler_name {
1137 args.push(format!(
1138 "--orchestrator-kubernetes-scheduler-name={}",
1139 scheduler_name
1140 ));
1141 }
1142 if mz.meets_minimum_version(&V154_DEV0) {
1143 args.extend(
1144 mz.spec
1145 .pod_annotations
1146 .as_ref()
1147 .map(|annotations| annotations.iter())
1148 .unwrap_or_default()
1149 .map(|(key, val)| {
1150 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1151 }),
1152 );
1153 }
1154 args.extend(
1155 mz.default_labels()
1156 .iter()
1157 .chain(
1158 mz.spec
1159 .pod_labels
1160 .as_ref()
1161 .map(|labels| labels.iter())
1162 .unwrap_or_default(),
1163 )
1164 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1165 );
1166 if let Some(status) = &mz.status {
1167 args.push(format!(
1168 "--orchestrator-kubernetes-name-prefix=mz{}-",
1169 status.resource_id
1170 ));
1171 }
1172
1173 args.extend(["--log-format=json".into()]);
1175 if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1176 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1177 }
1178 args.extend([
1180 format!(
1181 "--opentelemetry-resource=organization_id={}",
1182 mz.spec.environment_id
1183 ),
1184 format!(
1185 "--opentelemetry-resource=environment_name={}",
1186 mz.name_unchecked()
1187 ),
1188 ]);
1189
1190 if let Some(segment_api_key) = &config.segment_api_key {
1191 args.push(format!("--segment-api-key={}", segment_api_key));
1192 if config.segment_client_side {
1193 args.push("--segment-client-side".into());
1194 }
1195 }
1196
1197 let mut volumes = Vec::new();
1198 let mut volume_mounts = Vec::new();
1199 if issuer_ref_defined(
1200 &config.default_certificate_specs.internal,
1201 &mz.spec.internal_certificate_spec,
1202 ) {
1203 volumes.push(Volume {
1204 name: "certificate".to_owned(),
1205 secret: Some(SecretVolumeSource {
1206 default_mode: Some(0o400),
1207 secret_name: Some(mz.environmentd_certificate_secret_name()),
1208 items: None,
1209 optional: Some(false),
1210 }),
1211 ..Default::default()
1212 });
1213 volume_mounts.push(VolumeMount {
1214 name: "certificate".to_owned(),
1215 mount_path: "/etc/materialized".to_owned(),
1216 read_only: Some(true),
1217 ..Default::default()
1218 });
1219 args.extend([
1220 "--tls-mode=require".into(),
1221 "--tls-cert=/etc/materialized/tls.crt".into(),
1222 "--tls-key=/etc/materialized/tls.key".into(),
1223 ]);
1224 } else {
1225 args.push("--tls-mode=disable".to_string());
1226 }
1227 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1228 args.push(format!(
1229 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1230 ephemeral_volume_class
1231 ));
1232 }
1233 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1235
1236 if let Some(sentry_dsn) = &tracing.sentry_dsn {
1238 args.push(format!("--sentry-dsn={}", sentry_dsn));
1239 if let Some(sentry_environment) = &tracing.sentry_environment {
1240 args.push(format!("--sentry-environment={}", sentry_environment));
1241 }
1242 args.push(format!("--sentry-tag=region={}", config.region));
1243 }
1244
1245 args.push(format!(
1247 "--persist-pubsub-url=http://{}:{}",
1248 mz.persist_pubsub_service_name(generation),
1249 config.environmentd_internal_persist_pubsub_port,
1250 ));
1251 args.push(format!(
1252 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1253 config.environmentd_internal_persist_pubsub_port
1254 ));
1255
1256 args.push(format!("--deploy-generation={}", generation));
1257
1258 args.push(format!(
1260 "--internal-console-redirect-url={}",
1261 &config.internal_console_proxy_url,
1262 ));
1263
1264 if !config.collect_pod_metrics {
1265 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1266 }
1267 if config.enable_prometheus_scrape_annotations {
1268 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1269 }
1270
1271 if config.disable_license_key_checks {
1272 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1273 args.push("--disable-license-key-checks".into());
1274 }
1275 } else if mz.meets_minimum_version(&V140_DEV0) {
1276 volume_mounts.push(VolumeMount {
1277 name: "license-key".to_string(),
1278 mount_path: "/license_key".to_string(),
1279 ..Default::default()
1280 });
1281 volumes.push(Volume {
1282 name: "license-key".to_string(),
1283 secret: Some(SecretVolumeSource {
1284 default_mode: Some(256),
1285 optional: Some(false),
1286 secret_name: Some(mz.backend_secret_name()),
1287 items: Some(vec![KeyToPath {
1288 key: "license_key".to_string(),
1289 path: "license_key".to_string(),
1290 ..Default::default()
1291 }]),
1292 ..Default::default()
1293 }),
1294 ..Default::default()
1295 });
1296 env.push(EnvVar {
1297 name: "MZ_LICENSE_KEY".to_string(),
1298 value: Some("/license_key/license_key".to_string()),
1299 ..Default::default()
1300 });
1301 }
1302
1303 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1305 args.extend(extra_args.iter().cloned());
1306 }
1307
1308 let probe = Probe {
1309 initial_delay_seconds: Some(1),
1310 failure_threshold: Some(12),
1311 tcp_socket: Some(TCPSocketAction {
1312 host: None,
1313 port: IntOrString::Int(config.environmentd_sql_port.into()),
1314 }),
1315 ..Default::default()
1316 };
1317
1318 let security_context = if config.enable_security_context {
1319 Some(SecurityContext {
1323 run_as_non_root: Some(true),
1324 capabilities: Some(Capabilities {
1325 drop: Some(vec!["ALL".to_string()]),
1326 ..Default::default()
1327 }),
1328 seccomp_profile: Some(SeccompProfile {
1329 type_: "RuntimeDefault".to_string(),
1330 ..Default::default()
1331 }),
1332 allow_privilege_escalation: Some(false),
1333 ..Default::default()
1334 })
1335 } else {
1336 None
1337 };
1338
1339 let ports = vec![
1340 ContainerPort {
1341 container_port: config.environmentd_sql_port.into(),
1342 name: Some("sql".to_owned()),
1343 ..Default::default()
1344 },
1345 ContainerPort {
1346 container_port: config.environmentd_internal_sql_port.into(),
1347 name: Some("internal-sql".to_owned()),
1348 ..Default::default()
1349 },
1350 ContainerPort {
1351 container_port: config.environmentd_http_port.into(),
1352 name: Some("http".to_owned()),
1353 ..Default::default()
1354 },
1355 ContainerPort {
1356 container_port: config.environmentd_internal_http_port.into(),
1357 name: Some("internal-http".to_owned()),
1358 ..Default::default()
1359 },
1360 ContainerPort {
1361 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1362 name: Some("persist-pubsub".to_owned()),
1363 ..Default::default()
1364 },
1365 ];
1366
1367 if mz.meets_minimum_version(&V147_DEV0) {
1369 volume_mounts.push(VolumeMount {
1370 name: "listeners-configmap".to_string(),
1371 mount_path: "/listeners".to_string(),
1372 ..Default::default()
1373 });
1374 volumes.push(Volume {
1375 name: "listeners-configmap".to_string(),
1376 config_map: Some(ConfigMapVolumeSource {
1377 name: mz.listeners_configmap_name(generation),
1378 default_mode: Some(256),
1379 optional: Some(false),
1380 items: Some(vec![KeyToPath {
1381 key: "listeners.json".to_string(),
1382 path: "listeners.json".to_string(),
1383 ..Default::default()
1384 }]),
1385 }),
1386 ..Default::default()
1387 });
1388 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1389 if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1390 args.push("--system-parameter-default=enable_password_auth=true".into());
1391 env.push(EnvVar {
1392 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1393 value_from: Some(EnvVarSource {
1394 secret_key_ref: Some(SecretKeySelector {
1395 name: mz.backend_secret_name(),
1396 key: "external_login_password_mz_system".to_string(),
1397 optional: Some(false),
1398 }),
1399 ..Default::default()
1400 }),
1401 ..Default::default()
1402 })
1403 }
1404 } else {
1405 args.extend([
1406 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1407 format!(
1408 "--http-listen-addr=0.0.0.0:{}",
1409 config.environmentd_http_port
1410 ),
1411 format!(
1412 "--internal-sql-listen-addr=0.0.0.0:{}",
1413 config.environmentd_internal_sql_port
1414 ),
1415 format!(
1416 "--internal-http-listen-addr=0.0.0.0:{}",
1417 config.environmentd_internal_http_port
1418 ),
1419 ]);
1420 }
1421
1422 let container = Container {
1423 name: "environmentd".to_owned(),
1424 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1425 image_pull_policy: Some(config.image_pull_policy.to_string()),
1426 ports: Some(ports),
1427 args: Some(args),
1428 env: Some(env),
1429 volume_mounts: Some(volume_mounts),
1430 liveness_probe: Some(probe.clone()),
1431 readiness_probe: Some(probe),
1432 resources: mz
1433 .spec
1434 .environmentd_resource_requirements
1435 .clone()
1436 .or_else(|| config.environmentd_default_resources.clone()),
1437 security_context: security_context.clone(),
1438 ..Default::default()
1439 };
1440
1441 let mut pod_template_labels = mz.default_labels();
1442 pod_template_labels.insert(
1443 "materialize.cloud/name".to_owned(),
1444 mz.environmentd_statefulset_name(generation),
1445 );
1446 pod_template_labels.insert(
1447 "materialize.cloud/app".to_owned(),
1448 mz.environmentd_app_name(),
1449 );
1450 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1451 pod_template_labels.extend(
1452 mz.spec
1453 .pod_labels
1454 .as_ref()
1455 .map(|labels| labels.iter())
1456 .unwrap_or_default()
1457 .map(|(key, value)| (key.clone(), value.clone())),
1458 );
1459
1460 let mut pod_template_annotations = btreemap! {
1461 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1463
1464 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1466 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1467 "materialize.cloud/generation".to_owned() => generation.to_string(),
1468 };
1469 if config.enable_prometheus_scrape_annotations {
1470 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1471 pod_template_annotations.insert(
1472 "prometheus.io/port".to_owned(),
1473 config.environmentd_internal_http_port.to_string(),
1474 );
1475 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1476 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1477 pod_template_annotations.insert(
1478 "materialize.prometheus.io/mz_usage_path".to_owned(),
1479 "/metrics/mz_usage".to_string(),
1480 );
1481 pod_template_annotations.insert(
1482 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1483 "/metrics/mz_frontier".to_string(),
1484 );
1485 pod_template_annotations.insert(
1486 "materialize.prometheus.io/mz_compute_path".to_owned(),
1487 "/metrics/mz_compute".to_string(),
1488 );
1489 pod_template_annotations.insert(
1490 "materialize.prometheus.io/mz_storage_path".to_owned(),
1491 "/metrics/mz_storage".to_string(),
1492 );
1493 }
1494 pod_template_annotations.extend(
1495 mz.spec
1496 .pod_annotations
1497 .as_ref()
1498 .map(|annotations| annotations.iter())
1499 .unwrap_or_default()
1500 .map(|(key, value)| (key.clone(), value.clone())),
1501 );
1502
1503 let mut tolerations = vec![
1504 Toleration {
1508 effect: Some("NoExecute".into()),
1509 key: Some("node.kubernetes.io/not-ready".into()),
1510 operator: Some("Exists".into()),
1511 toleration_seconds: Some(30),
1512 value: None,
1513 },
1514 Toleration {
1515 effect: Some("NoExecute".into()),
1516 key: Some("node.kubernetes.io/unreachable".into()),
1517 operator: Some("Exists".into()),
1518 toleration_seconds: Some(30),
1519 value: None,
1520 },
1521 ];
1522 if let Some(user_tolerations) = &config.environmentd_tolerations {
1523 tolerations.extend(user_tolerations.iter().cloned());
1524 }
1525 let tolerations = Some(tolerations);
1526
1527 let pod_template_spec = PodTemplateSpec {
1528 metadata: Some(ObjectMeta {
1531 labels: Some(pod_template_labels),
1532 annotations: Some(pod_template_annotations), ..Default::default()
1534 }),
1535 spec: Some(PodSpec {
1536 containers: vec![container],
1537 node_selector: Some(
1538 config
1539 .environmentd_node_selector
1540 .iter()
1541 .map(|selector| (selector.key.clone(), selector.value.clone()))
1542 .collect(),
1543 ),
1544 affinity: config.environmentd_affinity.clone(),
1545 scheduler_name: config.scheduler_name.clone(),
1546 service_account_name: Some(mz.service_account_name()),
1547 volumes: Some(volumes),
1548 security_context: Some(PodSecurityContext {
1549 fs_group: Some(999),
1550 run_as_user: Some(999),
1551 run_as_group: Some(999),
1552 ..Default::default()
1553 }),
1554 tolerations,
1555 termination_grace_period_seconds: Some(0),
1576 ..Default::default()
1577 }),
1578 };
1579
1580 let mut match_labels = BTreeMap::new();
1581 match_labels.insert(
1582 "materialize.cloud/name".to_owned(),
1583 mz.environmentd_statefulset_name(generation),
1584 );
1585
1586 let statefulset_spec = StatefulSetSpec {
1587 replicas: Some(1),
1588 template: pod_template_spec,
1589 update_strategy: Some(StatefulSetUpdateStrategy {
1590 rolling_update: None,
1591 type_: Some("OnDelete".to_owned()),
1592 }),
1593 service_name: Some(mz.environmentd_service_name()),
1594 selector: LabelSelector {
1595 match_expressions: None,
1596 match_labels: Some(match_labels),
1597 },
1598 ..Default::default()
1599 };
1600
1601 StatefulSet {
1602 metadata: ObjectMeta {
1603 annotations: Some(btreemap! {
1604 "materialize.cloud/generation".to_owned() => generation.to_string(),
1605 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1606 }),
1607 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1608 },
1609 spec: Some(statefulset_spec),
1610 status: None,
1611 }
1612}
1613
1614fn create_connection_info(
1615 config: &super::MaterializeControllerArgs,
1616 mz: &Materialize,
1617 generation: u64,
1618) -> ConnectionInfo {
1619 let external_enable_tls = issuer_ref_defined(
1620 &config.default_certificate_specs.internal,
1621 &mz.spec.internal_certificate_spec,
1622 );
1623 let authenticator_kind = mz.spec.authenticator_kind;
1624 let mut listeners_config = ListenersConfig {
1625 sql: btreemap! {
1626 "external".to_owned() => SqlListenerConfig{
1627 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1628 authenticator_kind,
1629 allowed_roles: AllowedRoles::Normal,
1630 enable_tls: external_enable_tls,
1631 },
1632 "internal".to_owned() => SqlListenerConfig{
1633 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1634 authenticator_kind: AuthenticatorKind::None,
1635 allowed_roles: AllowedRoles::NormalAndInternal,
1637 enable_tls: false,
1638 },
1639 },
1640 http: btreemap! {
1641 "external".to_owned() => HttpListenerConfig{
1642 base: BaseListenerConfig {
1643 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1644 authenticator_kind,
1645 allowed_roles: AllowedRoles::Normal,
1646 enable_tls: external_enable_tls,
1647 },
1648 routes: HttpRoutesEnabled{
1649 base: true,
1650 webhook: true,
1651 internal: false,
1652 metrics: false,
1653 profiling: false,
1654 }
1655 },
1656 "internal".to_owned() => HttpListenerConfig{
1657 base: BaseListenerConfig {
1658 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1659 authenticator_kind: AuthenticatorKind::None,
1660 allowed_roles: AllowedRoles::NormalAndInternal,
1662 enable_tls: false,
1663 },
1664 routes: HttpRoutesEnabled{
1665 base: true,
1666 webhook: true,
1667 internal: true,
1668 metrics: true,
1669 profiling: true,
1670 }
1671 },
1672 },
1673 };
1674 if authenticator_kind == AuthenticatorKind::Password {
1675 listeners_config.sql.remove("internal");
1676 listeners_config.http.remove("internal");
1677
1678 listeners_config.sql.get_mut("external").map(|listener| {
1679 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1680 listener
1681 });
1682 listeners_config.http.get_mut("external").map(|listener| {
1683 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1684 listener.routes.internal = true;
1685 listener.routes.profiling = true;
1686 listener
1687 });
1688
1689 listeners_config.http.insert(
1690 "metrics".to_owned(),
1691 HttpListenerConfig {
1692 base: BaseListenerConfig {
1693 addr: SocketAddr::new(
1694 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1695 config.environmentd_internal_http_port,
1696 ),
1697 authenticator_kind: AuthenticatorKind::None,
1698 allowed_roles: AllowedRoles::NormalAndInternal,
1699 enable_tls: false,
1700 },
1701 routes: HttpRoutesEnabled {
1702 base: false,
1703 webhook: false,
1704 internal: false,
1705 metrics: true,
1706 profiling: false,
1707 },
1708 },
1709 );
1710 }
1711
1712 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1713 let listeners_configmap = ConfigMap {
1714 binary_data: None,
1715 data: Some(btreemap! {
1716 "listeners.json".to_owned() => listeners_json,
1717 }),
1718 immutable: None,
1719 metadata: ObjectMeta {
1720 annotations: Some(btreemap! {
1721 "materialize.cloud/generation".to_owned() => generation.to_string(),
1722 }),
1723 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1724 },
1725 };
1726
1727 let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1728 AuthenticatorKind::Password => {
1729 let scheme = if external_enable_tls { "https" } else { "http" };
1730 (
1731 scheme,
1732 config.environmentd_http_port,
1733 Some(mz.spec.backend_secret_name.clone()),
1734 )
1735 }
1736 _ => ("http", config.environmentd_internal_http_port, None),
1737 };
1738 let environmentd_url = format!(
1739 "{}://{}.{}.svc.cluster.local:{}",
1740 scheme,
1741 mz.environmentd_generation_service_name(generation),
1742 mz.namespace(),
1743 leader_api_port,
1744 );
1745 ConnectionInfo {
1746 environmentd_url,
1747 listeners_configmap,
1748 mz_system_secret_name,
1749 }
1750}
1751
1752#[derive(Debug, Deserialize, PartialEq, Eq)]
1754struct BecomeLeaderResponse {
1755 result: BecomeLeaderResult,
1756}
1757
1758#[derive(Debug, Deserialize, PartialEq, Eq)]
1759enum BecomeLeaderResult {
1760 Success,
1761 Failure { message: String },
1762}
1763
1764#[derive(Debug, Deserialize, PartialEq, Eq)]
1765struct SkipCatchupError {
1766 message: String,
1767}
1768
1769fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String {
1770 format!("{}-{}", statefulset.name_unchecked(), idx)
1771}