1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19 api::{
20 apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21 core::v1::{
22 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23 EnvVarSource, KeyToPath, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25 Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26 VolumeMount,
27 },
28 networking::v1::{
29 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31 },
32 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33 },
34 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40 ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
54use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
55use mz_orchestrator_tracing::TracingCliArgs;
56use mz_ore::instrument;
57
58static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
59 major: 0,
60 minor: 140,
61 patch: 0,
62 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
63 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
64});
65const V143: Version = Version::new(0, 143, 0);
66const V144: Version = Version::new(0, 144, 0);
67static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68 major: 0,
69 minor: 147,
70 patch: 0,
71 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
72 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74const V153: Version = Version::new(0, 153, 0);
75static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
76 major: 0,
77 minor: 154,
78 patch: 0,
79 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
80 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
81});
82
83#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
88pub enum DeploymentStatus {
89 Initializing,
92 ReadyToPromote,
94 Promoting,
96 IsLeader,
98}
99
100#[derive(Deserialize, Serialize)]
101pub struct LoginCredentials {
102 username: String,
103 password: String,
105}
106
107#[derive(Debug, Serialize)]
108pub struct ConnectionInfo {
109 pub environmentd_url: String,
110 pub mz_system_secret_name: Option<String>,
111 pub listeners_configmap: ConfigMap,
112}
113
114#[derive(Debug, Serialize)]
115pub struct Resources {
116 pub generation: u64,
117 pub environmentd_network_policies: Vec<NetworkPolicy>,
118 pub service_account: Box<Option<ServiceAccount>>,
119 pub role: Box<Role>,
120 pub role_binding: Box<RoleBinding>,
121 pub public_service: Box<Service>,
122 pub generation_service: Box<Service>,
123 pub persist_pubsub_service: Box<Service>,
124 pub environmentd_certificate: Box<Option<Certificate>>,
125 pub environmentd_statefulset: Box<StatefulSet>,
126 pub connection_info: Box<ConnectionInfo>,
127}
128
129impl Resources {
130 pub fn new(
131 config: &super::MaterializeControllerArgs,
132 tracing: &TracingCliArgs,
133 orchestratord_namespace: &str,
134 mz: &Materialize,
135 generation: u64,
136 ) -> Self {
137 let environmentd_network_policies =
138 create_environmentd_network_policies(config, mz, orchestratord_namespace);
139
140 let service_account = Box::new(create_service_account_object(config, mz));
141 let role = Box::new(create_role_object(mz));
142 let role_binding = Box::new(create_role_binding_object(mz));
143 let public_service = Box::new(create_public_service_object(config, mz, generation));
144 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
145 let persist_pubsub_service =
146 Box::new(create_persist_pubsub_service(config, mz, generation));
147 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
148 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
149 config, tracing, mz, generation,
150 ));
151 let connection_info = Box::new(create_connection_info(config, mz, generation));
152
153 Self {
154 generation,
155 environmentd_network_policies,
156 service_account,
157 role,
158 role_binding,
159 public_service,
160 generation_service,
161 persist_pubsub_service,
162 environmentd_certificate,
163 environmentd_statefulset,
164 connection_info,
165 }
166 }
167
168 #[instrument]
169 pub async fn apply(
170 &self,
171 client: &Client,
172 increment_generation: bool,
173 force_promote: bool,
174 namespace: &str,
175 ) -> Result<Option<Action>, anyhow::Error> {
176 let environmentd_network_policy_api: Api<NetworkPolicy> =
177 Api::namespaced(client.clone(), namespace);
178 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
179 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
180 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
181 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
182 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
183 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
184 let pod_api: Api<Pod> = Api::namespaced(client.clone(), namespace);
185 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
186 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
187
188 for policy in &self.environmentd_network_policies {
189 trace!("applying network policy {}", policy.name_unchecked());
190 apply_resource(&environmentd_network_policy_api, policy).await?;
191 }
192
193 if let Some(service_account) = &*self.service_account {
194 trace!("applying environmentd service account");
195 apply_resource(&service_account_api, service_account).await?;
196 }
197
198 trace!("applying environmentd role");
199 apply_resource(&role_api, &*self.role).await?;
200
201 trace!("applying environmentd role binding");
202 apply_resource(&role_binding_api, &*self.role_binding).await?;
203
204 trace!("applying environmentd per-generation service");
205 apply_resource(&service_api, &*self.generation_service).await?;
206
207 trace!("creating persist pubsub service");
208 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
209
210 if let Some(certificate) = &*self.environmentd_certificate {
211 trace!("creating new environmentd certificate");
212 apply_resource(&certificate_api, certificate).await?;
213 }
214
215 trace!("applying listeners configmap");
216 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
217
218 trace!("creating new environmentd statefulset");
219 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
220
221 if increment_generation {
238 let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
239
240 let statefulset = get_resource(
241 &statefulset_api,
242 &self.environmentd_statefulset.name_unchecked(),
243 )
244 .await?;
245 if statefulset
246 .and_then(|statefulset| statefulset.status)
247 .and_then(|status| status.ready_replicas)
248 .unwrap_or(0)
249 == 0
250 {
251 trace!("environmentd statefulset is not ready yet...");
252 return Ok(Some(retry_action));
253 }
254
255 let http_client = match &self.connection_info.mz_system_secret_name {
256 Some(mz_system_secret_name) => {
257 let http_client = reqwest::Client::builder()
258 .timeout(std::time::Duration::from_secs(10))
259 .cookie_store(true)
260 .danger_accept_invalid_certs(true)
262 .build()
263 .unwrap();
264 if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
265 if let Some(password) =
266 data.get("external_login_password_mz_system").cloned()
267 {
268 let password = String::from_utf8_lossy(&password.0).to_string();
269 let login_url = reqwest::Url::parse(&format!(
270 "{}/api/login",
271 self.connection_info.environmentd_url,
272 ))
273 .unwrap();
274 match http_client
275 .post(login_url)
276 .body(serde_json::to_string(&LoginCredentials {
277 username: "mz_system".to_owned(),
278 password,
279 })?)
280 .header("Content-Type", "application/json")
281 .send()
282 .await
283 {
284 Ok(response) => {
285 if let Err(e) = response.error_for_status() {
286 trace!(
287 "failed to login to environmentd, retrying... ({e})"
288 );
289 return Ok(Some(retry_action));
290 }
291 }
292 Err(e) => {
293 trace!("failed to connect to environmentd, retrying... ({e})");
294 return Ok(Some(retry_action));
295 }
296 };
297 }
298 };
299 http_client
300 }
301 None => reqwest::Client::builder()
302 .timeout(std::time::Duration::from_secs(10))
303 .build()
304 .unwrap(),
305 };
306 let status_url = reqwest::Url::parse(&format!(
307 "{}/api/leader/status",
308 self.connection_info.environmentd_url,
309 ))
310 .unwrap();
311
312 match http_client.get(status_url.clone()).send().await {
313 Ok(response) => {
314 let response: BTreeMap<String, DeploymentStatus> =
315 match response.error_for_status() {
316 Ok(response) => response.json().await?,
317 Err(e) => {
318 trace!("failed to get status of environmentd, retrying... ({e})");
319 return Ok(Some(retry_action));
320 }
321 };
322 if force_promote {
323 trace!("skipping cluster catchup");
324 let skip_catchup_url = reqwest::Url::parse(&format!(
325 "{}/api/leader/skip-catchup",
326 self.connection_info.environmentd_url,
327 ))
328 .unwrap();
329 let response = http_client.post(skip_catchup_url).send().await?;
330 if response.status() == StatusCode::BAD_REQUEST {
331 let err: SkipCatchupError = response.json().await?;
332 bail!("failed to skip catchup: {}", err.message);
333 }
334 } else if response["status"] == DeploymentStatus::Initializing {
335 trace!("environmentd is still initializing, retrying...");
336 return Ok(Some(retry_action));
337 } else {
338 trace!("environmentd is ready");
339 }
340 }
341 Err(e) => {
342 trace!("failed to connect to environmentd, retrying... ({e})");
343 return Ok(Some(retry_action));
344 }
345 }
346
347 let promote_url = reqwest::Url::parse(&format!(
348 "{}/api/leader/promote",
349 self.connection_info.environmentd_url,
350 ))
351 .unwrap();
352
353 trace!("promoting new environmentd to leader");
362 let response = http_client.post(promote_url).send().await?;
363 let response: BecomeLeaderResponse = match response.error_for_status() {
364 Ok(response) => response.json().await?,
365 Err(e) => {
366 trace!("failed to promote environmentd, retrying... ({e})");
367 return Ok(Some(retry_action));
368 }
369 };
370 if let BecomeLeaderResult::Failure { message } = response.result {
371 bail!("failed to promote new environmentd: {message}");
372 }
373
374 match http_client.get(status_url.clone()).send().await {
386 Ok(response) => {
387 let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
388 if response["status"] != DeploymentStatus::IsLeader {
389 trace!(
390 "environmentd is still promoting (status: {:?}), retrying...",
391 response["status"]
392 );
393 return Ok(Some(retry_action));
394 } else {
395 trace!("environmentd is ready");
396 }
397 }
398 Err(e) => {
399 trace!("failed to connect to environmentd, retrying... ({e})");
400 return Ok(Some(retry_action));
401 }
402 }
403 } else {
404 trace!("restarting environmentd pod to pick up statefulset changes");
405 delete_resource(
406 &pod_api,
407 &statefulset_pod_name(&*self.environmentd_statefulset, 0),
408 )
409 .await?;
410 }
411
412 Ok(None)
413 }
414
415 #[instrument]
416 pub async fn promote_services(
417 &self,
418 client: &Client,
419 namespace: &str,
420 ) -> Result<(), anyhow::Error> {
421 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
422
423 trace!("applying environmentd public service");
424 apply_resource(&service_api, &*self.public_service).await?;
425
426 Ok(())
427 }
428
429 #[instrument]
430 pub async fn teardown_generation(
431 &self,
432 client: &Client,
433 mz: &Materialize,
434 generation: u64,
435 ) -> Result<(), anyhow::Error> {
436 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
437 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
438 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
439
440 trace!("deleting environmentd statefulset for generation {generation}");
441 delete_resource(
442 &statefulset_api,
443 &mz.environmentd_statefulset_name(generation),
444 )
445 .await?;
446
447 trace!("deleting persist pubsub service for generation {generation}");
448 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
449
450 trace!("deleting environmentd per-generation service for generation {generation}");
451 delete_resource(
452 &service_api,
453 &mz.environmentd_generation_service_name(generation),
454 )
455 .await?;
456
457 trace!("deleting listeners configmap for generation {generation}");
458 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
459
460 Ok(())
461 }
462
463 pub fn generate_hash(&self) -> String {
466 let mut hasher = Sha256::new();
467 hasher.update(&serde_json::to_string(self).unwrap());
468 format!("{:x}", hasher.finalize())
469 }
470}
471
472fn create_environmentd_network_policies(
473 config: &super::MaterializeControllerArgs,
474 mz: &Materialize,
475 orchestratord_namespace: &str,
476) -> Vec<NetworkPolicy> {
477 let mut network_policies = Vec::new();
478 if config.network_policies.internal_enabled {
479 let environmentd_label_selector = LabelSelector {
480 match_labels: Some(
481 mz.default_labels()
482 .into_iter()
483 .chain([(
484 "materialize.cloud/app".to_owned(),
485 mz.environmentd_app_name(),
486 )])
487 .collect(),
488 ),
489 ..Default::default()
490 };
491 let orchestratord_label_selector = LabelSelector {
492 match_labels: Some(
493 config
494 .orchestratord_pod_selector_labels
495 .iter()
496 .cloned()
497 .map(|kv| (kv.key, kv.value))
498 .collect(),
499 ),
500 ..Default::default()
501 };
502 let all_pods_label_selector = LabelSelector {
505 match_labels: Some(mz.default_labels()),
506 ..Default::default()
507 };
508 network_policies.extend([
509 NetworkPolicy {
512 metadata: mz
513 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
514 spec: Some(NetworkPolicySpec {
515 egress: Some(vec![NetworkPolicyEgressRule {
516 to: Some(vec![NetworkPolicyPeer {
517 pod_selector: Some(all_pods_label_selector.clone()),
518 ..Default::default()
519 }]),
520 ..Default::default()
521 }]),
522 ingress: Some(vec![NetworkPolicyIngressRule {
523 from: Some(vec![NetworkPolicyPeer {
524 pod_selector: Some(all_pods_label_selector.clone()),
525 ..Default::default()
526 }]),
527 ..Default::default()
528 }]),
529 pod_selector: all_pods_label_selector.clone(),
530 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
531 ..Default::default()
532 }),
533 },
534 NetworkPolicy {
537 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
538 spec: Some(NetworkPolicySpec {
539 ingress: Some(vec![NetworkPolicyIngressRule {
540 from: Some(vec![NetworkPolicyPeer {
541 namespace_selector: Some(LabelSelector {
542 match_labels: Some(btreemap! {
543 "kubernetes.io/metadata.name".into()
544 => orchestratord_namespace.into(),
545 }),
546 ..Default::default()
547 }),
548 pod_selector: Some(orchestratord_label_selector),
549 ..Default::default()
550 }]),
551 ports: Some(vec![
552 NetworkPolicyPort {
553 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
554 protocol: Some("TCP".to_string()),
555 ..Default::default()
556 },
557 NetworkPolicyPort {
558 port: Some(IntOrString::Int(
559 config.environmentd_internal_http_port.into(),
560 )),
561 protocol: Some("TCP".to_string()),
562 ..Default::default()
563 },
564 ]),
565 ..Default::default()
566 }]),
567 pod_selector: environmentd_label_selector,
568 policy_types: Some(vec!["Ingress".to_owned()]),
569 ..Default::default()
570 }),
571 },
572 ]);
573 }
574 if config.network_policies.ingress_enabled {
575 let mut ingress_label_selector = mz.default_labels();
576 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
577 network_policies.extend([NetworkPolicy {
578 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
579 spec: Some(NetworkPolicySpec {
580 ingress: Some(vec![NetworkPolicyIngressRule {
581 from: Some(
582 config
583 .network_policies
584 .ingress_cidrs
585 .iter()
586 .map(|cidr| NetworkPolicyPeer {
587 ip_block: Some(IPBlock {
588 cidr: cidr.to_owned(),
589 except: None,
590 }),
591 ..Default::default()
592 })
593 .collect(),
594 ),
595 ports: Some(vec![
596 NetworkPolicyPort {
597 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
598 protocol: Some("TCP".to_string()),
599 ..Default::default()
600 },
601 NetworkPolicyPort {
602 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
603 protocol: Some("TCP".to_string()),
604 ..Default::default()
605 },
606 ]),
607 ..Default::default()
608 }]),
609 pod_selector: LabelSelector {
610 match_expressions: None,
611 match_labels: Some(ingress_label_selector),
612 },
613 policy_types: Some(vec!["Ingress".to_owned()]),
614 ..Default::default()
615 }),
616 }]);
617 }
618 if config.network_policies.egress_enabled {
619 network_policies.extend([NetworkPolicy {
620 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
621 spec: Some(NetworkPolicySpec {
622 egress: Some(vec![NetworkPolicyEgressRule {
623 to: Some(
624 config
625 .network_policies
626 .egress_cidrs
627 .iter()
628 .map(|cidr| NetworkPolicyPeer {
629 ip_block: Some(IPBlock {
630 cidr: cidr.to_owned(),
631 except: None,
632 }),
633 ..Default::default()
634 })
635 .collect(),
636 ),
637 ..Default::default()
638 }]),
639 pod_selector: LabelSelector {
640 match_expressions: None,
641 match_labels: Some(mz.default_labels()),
642 },
643 policy_types: Some(vec!["Egress".to_owned()]),
644 ..Default::default()
645 }),
646 }]);
647 }
648 network_policies
649}
650
651fn create_service_account_object(
652 config: &super::MaterializeControllerArgs,
653 mz: &Materialize,
654) -> Option<ServiceAccount> {
655 if mz.create_service_account() {
656 let mut annotations: BTreeMap<String, String> = mz
657 .spec
658 .service_account_annotations
659 .clone()
660 .unwrap_or_default();
661 if let (CloudProvider::Aws, Some(role_arn)) = (
662 config.cloud_provider,
663 mz.spec
664 .environmentd_iam_role_arn
665 .as_deref()
666 .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
667 ) {
668 warn!(
669 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
670 );
671 annotations.insert(
672 "eks.amazonaws.com/role-arn".to_string(),
673 role_arn.to_string(),
674 );
675 };
676
677 let mut labels = mz.default_labels();
678 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
679
680 Some(ServiceAccount {
681 metadata: ObjectMeta {
682 annotations: Some(annotations),
683 labels: Some(labels),
684 ..mz.managed_resource_meta(mz.service_account_name())
685 },
686 ..Default::default()
687 })
688 } else {
689 None
690 }
691}
692
693fn create_role_object(mz: &Materialize) -> Role {
694 Role {
695 metadata: mz.managed_resource_meta(mz.role_name()),
696 rules: Some(vec![
697 PolicyRule {
698 api_groups: Some(vec!["apps".to_string()]),
699 resources: Some(vec!["statefulsets".to_string()]),
700 verbs: vec![
701 "get".to_string(),
702 "list".to_string(),
703 "watch".to_string(),
704 "create".to_string(),
705 "update".to_string(),
706 "patch".to_string(),
707 "delete".to_string(),
708 ],
709 ..Default::default()
710 },
711 PolicyRule {
712 api_groups: Some(vec!["".to_string()]),
713 resources: Some(vec![
714 "persistentvolumeclaims".to_string(),
715 "pods".to_string(),
716 "secrets".to_string(),
717 "services".to_string(),
718 ]),
719 verbs: vec![
720 "get".to_string(),
721 "list".to_string(),
722 "watch".to_string(),
723 "create".to_string(),
724 "update".to_string(),
725 "patch".to_string(),
726 "delete".to_string(),
727 ],
728 ..Default::default()
729 },
730 PolicyRule {
731 api_groups: Some(vec!["".to_string()]),
732 resources: Some(vec!["configmaps".to_string()]),
733 verbs: vec!["get".to_string()],
734 ..Default::default()
735 },
736 PolicyRule {
737 api_groups: Some(vec!["materialize.cloud".to_string()]),
738 resources: Some(vec!["vpcendpoints".to_string()]),
739 verbs: vec![
740 "get".to_string(),
741 "list".to_string(),
742 "watch".to_string(),
743 "create".to_string(),
744 "update".to_string(),
745 "patch".to_string(),
746 "delete".to_string(),
747 ],
748 ..Default::default()
749 },
750 PolicyRule {
751 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
752 resources: Some(vec!["pods".to_string()]),
753 verbs: vec!["get".to_string(), "list".to_string()],
754 ..Default::default()
755 },
756 PolicyRule {
757 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
758 resources: Some(vec![
759 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
760 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
761 ]),
762 verbs: vec!["get".to_string()],
763 ..Default::default()
764 },
765 ]),
766 }
767}
768
769fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
770 RoleBinding {
771 metadata: mz.managed_resource_meta(mz.role_binding_name()),
772 role_ref: RoleRef {
773 api_group: "".to_string(),
774 kind: "Role".to_string(),
775 name: mz.role_name(),
776 },
777 subjects: Some(vec![Subject {
778 api_group: Some("".to_string()),
779 kind: "ServiceAccount".to_string(),
780 name: mz.service_account_name(),
781 namespace: Some(mz.namespace()),
782 }]),
783 }
784}
785
786fn create_public_service_object(
787 config: &super::MaterializeControllerArgs,
788 mz: &Materialize,
789 generation: u64,
790) -> Service {
791 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
792}
793
794fn create_generation_service_object(
795 config: &super::MaterializeControllerArgs,
796 mz: &Materialize,
797 generation: u64,
798) -> Service {
799 create_base_service_object(
800 config,
801 mz,
802 generation,
803 &mz.environmentd_generation_service_name(generation),
804 )
805}
806
807fn create_base_service_object(
808 config: &super::MaterializeControllerArgs,
809 mz: &Materialize,
810 generation: u64,
811 service_name: &str,
812) -> Service {
813 let ports = vec![
814 ServicePort {
815 port: config.environmentd_sql_port.into(),
816 protocol: Some("TCP".to_string()),
817 name: Some("sql".to_string()),
818 ..Default::default()
819 },
820 ServicePort {
821 port: config.environmentd_http_port.into(),
822 protocol: Some("TCP".to_string()),
823 name: Some("https".to_string()),
824 ..Default::default()
825 },
826 ServicePort {
827 port: config.environmentd_internal_sql_port.into(),
828 protocol: Some("TCP".to_string()),
829 name: Some("internal-sql".to_string()),
830 ..Default::default()
831 },
832 ServicePort {
833 port: config.environmentd_internal_http_port.into(),
834 protocol: Some("TCP".to_string()),
835 name: Some("internal-http".to_string()),
836 ..Default::default()
837 },
838 ];
839
840 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
841
842 let spec = ServiceSpec {
843 type_: Some("ClusterIP".to_string()),
844 cluster_ip: Some("None".to_string()),
845 selector: Some(selector),
846 ports: Some(ports),
847 ..Default::default()
848 };
849
850 Service {
851 metadata: mz.managed_resource_meta(service_name.to_string()),
852 spec: Some(spec),
853 status: None,
854 }
855}
856
857fn create_persist_pubsub_service(
858 config: &super::MaterializeControllerArgs,
859 mz: &Materialize,
860 generation: u64,
861) -> Service {
862 Service {
863 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
864 spec: Some(ServiceSpec {
865 type_: Some("ClusterIP".to_string()),
866 cluster_ip: Some("None".to_string()),
867 selector: Some(btreemap! {
868 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
869 }),
870 ports: Some(vec![ServicePort {
871 name: Some("grpc".to_string()),
872 protocol: Some("TCP".to_string()),
873 port: config.environmentd_internal_persist_pubsub_port.into(),
874 ..Default::default()
875 }]),
876 ..Default::default()
877 }),
878 status: None,
879 }
880}
881
882fn create_environmentd_certificate(
883 config: &super::MaterializeControllerArgs,
884 mz: &Materialize,
885) -> Option<Certificate> {
886 create_certificate(
887 config.default_certificate_specs.internal.clone(),
888 mz,
889 mz.spec.internal_certificate_spec.clone(),
890 mz.environmentd_certificate_name(),
891 mz.environmentd_certificate_secret_name(),
892 Some(vec![
893 mz.environmentd_service_name(),
894 mz.environmentd_service_internal_fqdn(),
895 ]),
896 )
897}
898
899fn create_environmentd_statefulset_object(
900 config: &super::MaterializeControllerArgs,
901 tracing: &TracingCliArgs,
902 mz: &Materialize,
903 generation: u64,
904) -> StatefulSet {
905 let mut env = vec![
915 EnvVar {
916 name: "MZ_METADATA_BACKEND_URL".to_string(),
917 value_from: Some(EnvVarSource {
918 secret_key_ref: Some(SecretKeySelector {
919 name: mz.backend_secret_name(),
920 key: "metadata_backend_url".to_string(),
921 optional: Some(false),
922 }),
923 ..Default::default()
924 }),
925 ..Default::default()
926 },
927 EnvVar {
928 name: "MZ_PERSIST_BLOB_URL".to_string(),
929 value_from: Some(EnvVarSource {
930 secret_key_ref: Some(SecretKeySelector {
931 name: mz.backend_secret_name(),
932 key: "persist_backend_url".to_string(),
933 optional: Some(false),
934 }),
935 ..Default::default()
936 }),
937 ..Default::default()
938 },
939 ];
940
941 env.push(EnvVar {
942 name: "AWS_REGION".to_string(),
943 value: Some(config.region.clone()),
944 ..Default::default()
945 });
946
947 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
948
949 let mut args = vec![];
950
951 if let Some(helm_chart_version) = &config.helm_chart_version {
952 args.push(format!("--helm-chart-version={helm_chart_version}"));
953 }
954
955 args.push(format!(
957 "--environment-id={}",
958 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
959 ));
960
961 args.push(format!(
963 "--clusterd-image={}",
964 matching_image_from_environmentd_image_ref(
965 &mz.spec.environmentd_image_ref,
966 "clusterd",
967 None
968 )
969 ));
970
971 args.extend(
973 [
974 config
975 .environmentd_cluster_replica_sizes
976 .as_ref()
977 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
978 config
979 .bootstrap_default_cluster_replica_size
980 .as_ref()
981 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
982 config
983 .bootstrap_builtin_system_cluster_replica_size
984 .as_ref()
985 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
986 config
987 .bootstrap_builtin_probe_cluster_replica_size
988 .as_ref()
989 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
990 config
991 .bootstrap_builtin_support_cluster_replica_size
992 .as_ref()
993 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
994 config
995 .bootstrap_builtin_catalog_server_cluster_replica_size
996 .as_ref()
997 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
998 config
999 .bootstrap_builtin_analytics_cluster_replica_size
1000 .as_ref()
1001 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1002 config
1003 .bootstrap_builtin_system_cluster_replication_factor
1004 .as_ref()
1005 .map(|replication_factor| {
1006 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1007 }),
1008 config
1009 .bootstrap_builtin_probe_cluster_replication_factor
1010 .as_ref()
1011 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1012 config
1013 .bootstrap_builtin_support_cluster_replication_factor
1014 .as_ref()
1015 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1016 config
1017 .bootstrap_builtin_analytics_cluster_replication_factor
1018 .as_ref()
1019 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1020 ]
1021 .into_iter()
1022 .flatten(),
1023 );
1024
1025 args.extend(
1026 config
1027 .environmentd_allowed_origins
1028 .iter()
1029 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1030 );
1031
1032 args.push(format!(
1033 "--secrets-controller={}",
1034 config.secrets_controller
1035 ));
1036
1037 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1038 if let Ok(cluster_replica_sizes) =
1039 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1040 {
1041 let cluster_replica_sizes: Vec<_> =
1042 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1043 args.push(format!(
1044 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1045 cluster_replica_sizes.join("', '")
1046 ));
1047 }
1048 }
1049 if !config.cloud_provider.is_cloud() {
1050 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1051 }
1052
1053 if config.enable_internal_statement_logging {
1054 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1055 }
1056
1057 if config.disable_statement_logging {
1058 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1059 }
1060
1061 if !mz.spec.enable_rbac {
1062 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1063 }
1064
1065 args.push("--persist-isolated-runtime-threads=-1".to_string());
1069
1070 if config.cloud_provider == CloudProvider::Aws {
1072 if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1073 for az in azs {
1074 args.push(format!("--availability-zone={az}"));
1075 }
1076 }
1077
1078 if let Some(environmentd_connection_role_arn) = mz
1079 .spec
1080 .environmentd_connection_role_arn
1081 .as_deref()
1082 .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1083 {
1084 args.push(format!(
1085 "--aws-connection-role-arn={}",
1086 environmentd_connection_role_arn
1087 ));
1088 }
1089 if let Some(account_id) = &config.aws_info.aws_account_id {
1090 args.push(format!("--aws-account-id={account_id}"));
1091 }
1092
1093 args.extend([format!(
1094 "--aws-secrets-controller-tags=Environment={}",
1095 mz.name_unchecked()
1096 )]);
1097 args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1098 }
1099
1100 args.extend([
1102 "--orchestrator=kubernetes".into(),
1103 format!(
1104 "--orchestrator-kubernetes-service-account={}",
1105 &mz.service_account_name()
1106 ),
1107 format!(
1108 "--orchestrator-kubernetes-image-pull-policy={}",
1109 config.image_pull_policy.as_kebab_case_str(),
1110 ),
1111 ]);
1112 for selector in &config.clusterd_node_selector {
1113 args.push(format!(
1114 "--orchestrator-kubernetes-service-node-selector={}={}",
1115 selector.key, selector.value,
1116 ));
1117 }
1118 if mz.meets_minimum_version(&V144) {
1119 if let Some(affinity) = &config.clusterd_affinity {
1120 let affinity = serde_json::to_string(affinity).unwrap();
1121 args.push(format!(
1122 "--orchestrator-kubernetes-service-affinity={affinity}"
1123 ))
1124 }
1125 if let Some(tolerations) = &config.clusterd_tolerations {
1126 let tolerations = serde_json::to_string(tolerations).unwrap();
1127 args.push(format!(
1128 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1129 ))
1130 }
1131 }
1132 if let Some(scheduler_name) = &config.scheduler_name {
1133 args.push(format!(
1134 "--orchestrator-kubernetes-scheduler-name={}",
1135 scheduler_name
1136 ));
1137 }
1138 if mz.meets_minimum_version(&V154_DEV0) {
1139 args.extend(
1140 mz.spec
1141 .pod_annotations
1142 .as_ref()
1143 .map(|annotations| annotations.iter())
1144 .unwrap_or_default()
1145 .map(|(key, val)| {
1146 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1147 }),
1148 );
1149 }
1150 args.extend(
1151 mz.default_labels()
1152 .iter()
1153 .chain(
1154 mz.spec
1155 .pod_labels
1156 .as_ref()
1157 .map(|labels| labels.iter())
1158 .unwrap_or_default(),
1159 )
1160 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1161 );
1162 if let Some(status) = &mz.status {
1163 args.push(format!(
1164 "--orchestrator-kubernetes-name-prefix=mz{}-",
1165 status.resource_id
1166 ));
1167 }
1168
1169 args.extend(["--log-format=json".into()]);
1171 if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1172 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1173 }
1174 args.extend([
1176 format!(
1177 "--opentelemetry-resource=organization_id={}",
1178 mz.spec.environment_id
1179 ),
1180 format!(
1181 "--opentelemetry-resource=environment_name={}",
1182 mz.name_unchecked()
1183 ),
1184 ]);
1185
1186 if let Some(segment_api_key) = &config.segment_api_key {
1187 args.push(format!("--segment-api-key={}", segment_api_key));
1188 if config.segment_client_side {
1189 args.push("--segment-client-side".into());
1190 }
1191 }
1192
1193 let mut volumes = Vec::new();
1194 let mut volume_mounts = Vec::new();
1195 if issuer_ref_defined(
1196 &config.default_certificate_specs.internal,
1197 &mz.spec.internal_certificate_spec,
1198 ) {
1199 volumes.push(Volume {
1200 name: "certificate".to_owned(),
1201 secret: Some(SecretVolumeSource {
1202 default_mode: Some(0o400),
1203 secret_name: Some(mz.environmentd_certificate_secret_name()),
1204 items: None,
1205 optional: Some(false),
1206 }),
1207 ..Default::default()
1208 });
1209 volume_mounts.push(VolumeMount {
1210 name: "certificate".to_owned(),
1211 mount_path: "/etc/materialized".to_owned(),
1212 read_only: Some(true),
1213 ..Default::default()
1214 });
1215 args.extend([
1216 "--tls-mode=require".into(),
1217 "--tls-cert=/etc/materialized/tls.crt".into(),
1218 "--tls-key=/etc/materialized/tls.key".into(),
1219 ]);
1220 } else {
1221 args.push("--tls-mode=disable".to_string());
1222 }
1223 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1224 args.push(format!(
1225 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1226 ephemeral_volume_class
1227 ));
1228 }
1229 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1231
1232 if let Some(sentry_dsn) = &tracing.sentry_dsn {
1234 args.push(format!("--sentry-dsn={}", sentry_dsn));
1235 if let Some(sentry_environment) = &tracing.sentry_environment {
1236 args.push(format!("--sentry-environment={}", sentry_environment));
1237 }
1238 args.push(format!("--sentry-tag=region={}", config.region));
1239 }
1240
1241 args.push(format!(
1243 "--persist-pubsub-url=http://{}:{}",
1244 mz.persist_pubsub_service_name(generation),
1245 config.environmentd_internal_persist_pubsub_port,
1246 ));
1247 args.push(format!(
1248 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1249 config.environmentd_internal_persist_pubsub_port
1250 ));
1251
1252 args.push(format!("--deploy-generation={}", generation));
1253
1254 args.push(format!(
1256 "--internal-console-redirect-url={}",
1257 &config.internal_console_proxy_url,
1258 ));
1259
1260 if !config.collect_pod_metrics {
1261 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1262 }
1263 if config.enable_prometheus_scrape_annotations {
1264 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1265 }
1266
1267 if config.disable_license_key_checks {
1268 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1269 args.push("--disable-license-key-checks".into());
1270 }
1271 } else if mz.meets_minimum_version(&V140_DEV0) {
1272 volume_mounts.push(VolumeMount {
1273 name: "license-key".to_string(),
1274 mount_path: "/license_key".to_string(),
1275 ..Default::default()
1276 });
1277 volumes.push(Volume {
1278 name: "license-key".to_string(),
1279 secret: Some(SecretVolumeSource {
1280 default_mode: Some(256),
1281 optional: Some(false),
1282 secret_name: Some(mz.backend_secret_name()),
1283 items: Some(vec![KeyToPath {
1284 key: "license_key".to_string(),
1285 path: "license_key".to_string(),
1286 ..Default::default()
1287 }]),
1288 ..Default::default()
1289 }),
1290 ..Default::default()
1291 });
1292 env.push(EnvVar {
1293 name: "MZ_LICENSE_KEY".to_string(),
1294 value: Some("/license_key/license_key".to_string()),
1295 ..Default::default()
1296 });
1297 }
1298
1299 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1301 args.extend(extra_args.iter().cloned());
1302 }
1303
1304 let probe = Probe {
1305 initial_delay_seconds: Some(1),
1306 failure_threshold: Some(12),
1307 tcp_socket: Some(TCPSocketAction {
1308 host: None,
1309 port: IntOrString::Int(config.environmentd_sql_port.into()),
1310 }),
1311 ..Default::default()
1312 };
1313
1314 let security_context = if config.enable_security_context {
1315 Some(SecurityContext {
1319 run_as_non_root: Some(true),
1320 capabilities: Some(Capabilities {
1321 drop: Some(vec!["ALL".to_string()]),
1322 ..Default::default()
1323 }),
1324 seccomp_profile: Some(SeccompProfile {
1325 type_: "RuntimeDefault".to_string(),
1326 ..Default::default()
1327 }),
1328 allow_privilege_escalation: Some(false),
1329 ..Default::default()
1330 })
1331 } else {
1332 None
1333 };
1334
1335 let ports = vec![
1336 ContainerPort {
1337 container_port: config.environmentd_sql_port.into(),
1338 name: Some("sql".to_owned()),
1339 ..Default::default()
1340 },
1341 ContainerPort {
1342 container_port: config.environmentd_internal_sql_port.into(),
1343 name: Some("internal-sql".to_owned()),
1344 ..Default::default()
1345 },
1346 ContainerPort {
1347 container_port: config.environmentd_http_port.into(),
1348 name: Some("http".to_owned()),
1349 ..Default::default()
1350 },
1351 ContainerPort {
1352 container_port: config.environmentd_internal_http_port.into(),
1353 name: Some("internal-http".to_owned()),
1354 ..Default::default()
1355 },
1356 ContainerPort {
1357 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1358 name: Some("persist-pubsub".to_owned()),
1359 ..Default::default()
1360 },
1361 ];
1362
1363 if mz.meets_minimum_version(&V147_DEV0) {
1365 volume_mounts.push(VolumeMount {
1366 name: "listeners-configmap".to_string(),
1367 mount_path: "/listeners".to_string(),
1368 ..Default::default()
1369 });
1370 volumes.push(Volume {
1371 name: "listeners-configmap".to_string(),
1372 config_map: Some(ConfigMapVolumeSource {
1373 name: mz.listeners_configmap_name(generation),
1374 default_mode: Some(256),
1375 optional: Some(false),
1376 items: Some(vec![KeyToPath {
1377 key: "listeners.json".to_string(),
1378 path: "listeners.json".to_string(),
1379 ..Default::default()
1380 }]),
1381 }),
1382 ..Default::default()
1383 });
1384 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1385 if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1386 args.push("--system-parameter-default=enable_password_auth=true".into());
1387 env.push(EnvVar {
1388 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1389 value_from: Some(EnvVarSource {
1390 secret_key_ref: Some(SecretKeySelector {
1391 name: mz.backend_secret_name(),
1392 key: "external_login_password_mz_system".to_string(),
1393 optional: Some(false),
1394 }),
1395 ..Default::default()
1396 }),
1397 ..Default::default()
1398 })
1399 }
1400 } else {
1401 args.extend([
1402 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1403 format!(
1404 "--http-listen-addr=0.0.0.0:{}",
1405 config.environmentd_http_port
1406 ),
1407 format!(
1408 "--internal-sql-listen-addr=0.0.0.0:{}",
1409 config.environmentd_internal_sql_port
1410 ),
1411 format!(
1412 "--internal-http-listen-addr=0.0.0.0:{}",
1413 config.environmentd_internal_http_port
1414 ),
1415 ]);
1416 }
1417
1418 let container = Container {
1419 name: "environmentd".to_owned(),
1420 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1421 image_pull_policy: Some(config.image_pull_policy.to_string()),
1422 ports: Some(ports),
1423 args: Some(args),
1424 env: Some(env),
1425 volume_mounts: Some(volume_mounts),
1426 liveness_probe: Some(probe.clone()),
1427 readiness_probe: Some(probe),
1428 resources: mz.spec.environmentd_resource_requirements.clone(),
1429 security_context: security_context.clone(),
1430 ..Default::default()
1431 };
1432
1433 let mut pod_template_labels = mz.default_labels();
1434 pod_template_labels.insert(
1435 "materialize.cloud/name".to_owned(),
1436 mz.environmentd_statefulset_name(generation),
1437 );
1438 pod_template_labels.insert(
1439 "materialize.cloud/app".to_owned(),
1440 mz.environmentd_app_name(),
1441 );
1442 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1443 pod_template_labels.extend(
1444 mz.spec
1445 .pod_labels
1446 .as_ref()
1447 .map(|labels| labels.iter())
1448 .unwrap_or_default()
1449 .map(|(key, value)| (key.clone(), value.clone())),
1450 );
1451
1452 let mut pod_template_annotations = btreemap! {
1453 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1455
1456 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1458 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1459 "materialize.cloud/generation".to_owned() => generation.to_string(),
1460 };
1461 if config.enable_prometheus_scrape_annotations {
1462 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1463 pod_template_annotations.insert(
1464 "prometheus.io/port".to_owned(),
1465 config.environmentd_internal_http_port.to_string(),
1466 );
1467 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1468 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1469 pod_template_annotations.insert(
1470 "materialize.prometheus.io/mz_usage_path".to_owned(),
1471 "/metrics/mz_usage".to_string(),
1472 );
1473 pod_template_annotations.insert(
1474 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1475 "/metrics/mz_frontier".to_string(),
1476 );
1477 pod_template_annotations.insert(
1478 "materialize.prometheus.io/mz_compute_path".to_owned(),
1479 "/metrics/mz_compute".to_string(),
1480 );
1481 pod_template_annotations.insert(
1482 "materialize.prometheus.io/mz_storage_path".to_owned(),
1483 "/metrics/mz_storage".to_string(),
1484 );
1485 }
1486 pod_template_annotations.extend(
1487 mz.spec
1488 .pod_annotations
1489 .as_ref()
1490 .map(|annotations| annotations.iter())
1491 .unwrap_or_default()
1492 .map(|(key, value)| (key.clone(), value.clone())),
1493 );
1494
1495 let mut tolerations = vec![
1496 Toleration {
1500 effect: Some("NoExecute".into()),
1501 key: Some("node.kubernetes.io/not-ready".into()),
1502 operator: Some("Exists".into()),
1503 toleration_seconds: Some(30),
1504 value: None,
1505 },
1506 Toleration {
1507 effect: Some("NoExecute".into()),
1508 key: Some("node.kubernetes.io/unreachable".into()),
1509 operator: Some("Exists".into()),
1510 toleration_seconds: Some(30),
1511 value: None,
1512 },
1513 ];
1514 if let Some(user_tolerations) = &config.environmentd_tolerations {
1515 tolerations.extend(user_tolerations.iter().cloned());
1516 }
1517 let tolerations = Some(tolerations);
1518
1519 let pod_template_spec = PodTemplateSpec {
1520 metadata: Some(ObjectMeta {
1523 labels: Some(pod_template_labels),
1524 annotations: Some(pod_template_annotations), ..Default::default()
1526 }),
1527 spec: Some(PodSpec {
1528 containers: vec![container],
1529 node_selector: Some(
1530 config
1531 .environmentd_node_selector
1532 .iter()
1533 .map(|selector| (selector.key.clone(), selector.value.clone()))
1534 .collect(),
1535 ),
1536 affinity: config.environmentd_affinity.clone(),
1537 scheduler_name: config.scheduler_name.clone(),
1538 service_account_name: Some(mz.service_account_name()),
1539 volumes: Some(volumes),
1540 security_context: Some(PodSecurityContext {
1541 fs_group: Some(999),
1542 run_as_user: Some(999),
1543 run_as_group: Some(999),
1544 ..Default::default()
1545 }),
1546 tolerations,
1547 termination_grace_period_seconds: Some(0),
1568 ..Default::default()
1569 }),
1570 };
1571
1572 let mut match_labels = BTreeMap::new();
1573 match_labels.insert(
1574 "materialize.cloud/name".to_owned(),
1575 mz.environmentd_statefulset_name(generation),
1576 );
1577
1578 let statefulset_spec = StatefulSetSpec {
1579 replicas: Some(1),
1580 template: pod_template_spec,
1581 update_strategy: Some(StatefulSetUpdateStrategy {
1582 rolling_update: None,
1583 type_: Some("OnDelete".to_owned()),
1584 }),
1585 service_name: Some(mz.environmentd_service_name()),
1586 selector: LabelSelector {
1587 match_expressions: None,
1588 match_labels: Some(match_labels),
1589 },
1590 ..Default::default()
1591 };
1592
1593 StatefulSet {
1594 metadata: ObjectMeta {
1595 annotations: Some(btreemap! {
1596 "materialize.cloud/generation".to_owned() => generation.to_string(),
1597 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1598 }),
1599 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1600 },
1601 spec: Some(statefulset_spec),
1602 status: None,
1603 }
1604}
1605
1606fn create_connection_info(
1607 config: &super::MaterializeControllerArgs,
1608 mz: &Materialize,
1609 generation: u64,
1610) -> ConnectionInfo {
1611 let external_enable_tls = issuer_ref_defined(
1612 &config.default_certificate_specs.internal,
1613 &mz.spec.internal_certificate_spec,
1614 );
1615 let authenticator_kind = mz.spec.authenticator_kind;
1616 let mut listeners_config = ListenersConfig {
1617 sql: btreemap! {
1618 "external".to_owned() => SqlListenerConfig{
1619 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1620 authenticator_kind,
1621 allowed_roles: AllowedRoles::Normal,
1622 enable_tls: external_enable_tls,
1623 },
1624 "internal".to_owned() => SqlListenerConfig{
1625 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1626 authenticator_kind: AuthenticatorKind::None,
1627 allowed_roles: AllowedRoles::NormalAndInternal,
1629 enable_tls: false,
1630 },
1631 },
1632 http: btreemap! {
1633 "external".to_owned() => HttpListenerConfig{
1634 base: BaseListenerConfig {
1635 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1636 authenticator_kind,
1637 allowed_roles: AllowedRoles::Normal,
1638 enable_tls: external_enable_tls,
1639 },
1640 routes: HttpRoutesEnabled{
1641 base: true,
1642 webhook: true,
1643 internal: false,
1644 metrics: false,
1645 profiling: false,
1646 }
1647 },
1648 "internal".to_owned() => HttpListenerConfig{
1649 base: BaseListenerConfig {
1650 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1651 authenticator_kind: AuthenticatorKind::None,
1652 allowed_roles: AllowedRoles::NormalAndInternal,
1654 enable_tls: false,
1655 },
1656 routes: HttpRoutesEnabled{
1657 base: true,
1658 webhook: true,
1659 internal: true,
1660 metrics: true,
1661 profiling: true,
1662 }
1663 },
1664 },
1665 };
1666 if authenticator_kind == AuthenticatorKind::Password {
1667 listeners_config.sql.remove("internal");
1668 listeners_config.http.remove("internal");
1669
1670 listeners_config.sql.get_mut("external").map(|listener| {
1671 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1672 listener
1673 });
1674 listeners_config.http.get_mut("external").map(|listener| {
1675 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1676 listener.routes.internal = true;
1677 listener.routes.profiling = true;
1678 listener
1679 });
1680
1681 listeners_config.http.insert(
1682 "metrics".to_owned(),
1683 HttpListenerConfig {
1684 base: BaseListenerConfig {
1685 addr: SocketAddr::new(
1686 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1687 config.environmentd_internal_http_port,
1688 ),
1689 authenticator_kind: AuthenticatorKind::None,
1690 allowed_roles: AllowedRoles::NormalAndInternal,
1691 enable_tls: false,
1692 },
1693 routes: HttpRoutesEnabled {
1694 base: false,
1695 webhook: false,
1696 internal: false,
1697 metrics: true,
1698 profiling: false,
1699 },
1700 },
1701 );
1702 }
1703
1704 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1705 let listeners_configmap = ConfigMap {
1706 binary_data: None,
1707 data: Some(btreemap! {
1708 "listeners.json".to_owned() => listeners_json,
1709 }),
1710 immutable: None,
1711 metadata: ObjectMeta {
1712 annotations: Some(btreemap! {
1713 "materialize.cloud/generation".to_owned() => generation.to_string(),
1714 }),
1715 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1716 },
1717 };
1718
1719 let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1720 AuthenticatorKind::Password => {
1721 let scheme = if external_enable_tls { "https" } else { "http" };
1722 (
1723 scheme,
1724 config.environmentd_http_port,
1725 Some(mz.spec.backend_secret_name.clone()),
1726 )
1727 }
1728 _ => ("http", config.environmentd_internal_http_port, None),
1729 };
1730 let environmentd_url = format!(
1731 "{}://{}.{}.svc.cluster.local:{}",
1732 scheme,
1733 mz.environmentd_generation_service_name(generation),
1734 mz.namespace(),
1735 leader_api_port,
1736 );
1737 ConnectionInfo {
1738 environmentd_url,
1739 listeners_configmap,
1740 mz_system_secret_name,
1741 }
1742}
1743
1744#[derive(Debug, Deserialize, PartialEq, Eq)]
1746struct BecomeLeaderResponse {
1747 result: BecomeLeaderResult,
1748}
1749
1750#[derive(Debug, Deserialize, PartialEq, Eq)]
1751enum BecomeLeaderResult {
1752 Success,
1753 Failure { message: String },
1754}
1755
1756#[derive(Debug, Deserialize, PartialEq, Eq)]
1757struct SkipCatchupError {
1758 message: String,
1759}
1760
1761fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String {
1762 format!("{}-{}", statefulset.name_unchecked(), idx)
1763}