1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19 api::{
20 apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21 core::v1::{
22 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25 Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26 VolumeMount,
27 },
28 networking::v1::{
29 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31 },
32 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33 },
34 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40 ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::{
54 Certificate, CertificatePrivateKeyAlgorithm,
55};
56use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
57use mz_orchestrator_tracing::TracingCliArgs;
58use mz_ore::instrument;
59
60static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
61 major: 0,
62 minor: 140,
63 patch: 0,
64 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
65 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
66});
67const V143: Version = Version::new(0, 143, 0);
68const V144: Version = Version::new(0, 144, 0);
69static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
70 major: 0,
71 minor: 147,
72 patch: 0,
73 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
74 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
75});
76const V153: Version = Version::new(0, 153, 0);
77static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
78 major: 0,
79 minor: 154,
80 patch: 0,
81 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
82 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
83});
84
85#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91 Initializing,
94 ReadyToPromote,
96 Promoting,
98 IsLeader,
100}
101
102#[derive(Deserialize, Serialize)]
103pub struct LoginCredentials {
104 username: String,
105 password: String,
107}
108
109#[derive(Debug, Serialize)]
110pub struct ConnectionInfo {
111 pub environmentd_url: String,
112 pub mz_system_secret_name: Option<String>,
113 pub listeners_configmap: ConfigMap,
114}
115
116#[derive(Debug, Serialize)]
117pub struct Resources {
118 pub generation: u64,
119 pub environmentd_network_policies: Vec<NetworkPolicy>,
120 pub service_account: Box<Option<ServiceAccount>>,
121 pub role: Box<Role>,
122 pub role_binding: Box<RoleBinding>,
123 pub public_service: Box<Service>,
124 pub generation_service: Box<Service>,
125 pub persist_pubsub_service: Box<Service>,
126 pub environmentd_certificate: Box<Option<Certificate>>,
127 pub environmentd_statefulset: Box<StatefulSet>,
128 pub connection_info: Box<ConnectionInfo>,
129}
130
131impl Resources {
132 pub fn new(
133 config: &super::MaterializeControllerArgs,
134 tracing: &TracingCliArgs,
135 orchestratord_namespace: &str,
136 mz: &Materialize,
137 generation: u64,
138 ) -> Self {
139 let environmentd_network_policies =
140 create_environmentd_network_policies(config, mz, orchestratord_namespace);
141
142 let service_account = Box::new(create_service_account_object(config, mz));
143 let role = Box::new(create_role_object(mz));
144 let role_binding = Box::new(create_role_binding_object(mz));
145 let public_service = Box::new(create_public_service_object(config, mz, generation));
146 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
147 let persist_pubsub_service =
148 Box::new(create_persist_pubsub_service(config, mz, generation));
149 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
150 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
151 config, tracing, mz, generation,
152 ));
153 let connection_info = Box::new(create_connection_info(config, mz, generation));
154
155 Self {
156 generation,
157 environmentd_network_policies,
158 service_account,
159 role,
160 role_binding,
161 public_service,
162 generation_service,
163 persist_pubsub_service,
164 environmentd_certificate,
165 environmentd_statefulset,
166 connection_info,
167 }
168 }
169
170 #[instrument]
171 pub async fn apply(
172 &self,
173 client: &Client,
174 force_promote: bool,
175 namespace: &str,
176 ) -> Result<Option<Action>, anyhow::Error> {
177 let environmentd_network_policy_api: Api<NetworkPolicy> =
178 Api::namespaced(client.clone(), namespace);
179 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
180 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
181 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
182 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
183 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
184 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
185 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
186 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
187
188 for policy in &self.environmentd_network_policies {
189 trace!("applying network policy {}", policy.name_unchecked());
190 apply_resource(&environmentd_network_policy_api, policy).await?;
191 }
192
193 if let Some(service_account) = &*self.service_account {
194 trace!("applying environmentd service account");
195 apply_resource(&service_account_api, service_account).await?;
196 }
197
198 trace!("applying environmentd role");
199 apply_resource(&role_api, &*self.role).await?;
200
201 trace!("applying environmentd role binding");
202 apply_resource(&role_binding_api, &*self.role_binding).await?;
203
204 trace!("applying environmentd per-generation service");
205 apply_resource(&service_api, &*self.generation_service).await?;
206
207 trace!("creating persist pubsub service");
208 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
209
210 if let Some(certificate) = &*self.environmentd_certificate {
211 trace!("creating new environmentd certificate");
212 apply_resource(&certificate_api, certificate).await?;
213 }
214
215 trace!("applying listeners configmap");
216 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
217
218 trace!("creating new environmentd statefulset");
219 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
220
221 let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
222
223 let statefulset = get_resource(
224 &statefulset_api,
225 &self.environmentd_statefulset.name_unchecked(),
226 )
227 .await?;
228 if statefulset
229 .and_then(|statefulset| statefulset.status)
230 .and_then(|status| status.ready_replicas)
231 .unwrap_or(0)
232 == 0
233 {
234 trace!("environmentd statefulset is not ready yet...");
235 return Ok(Some(retry_action));
236 }
237
238 let http_client = match &self.connection_info.mz_system_secret_name {
239 Some(mz_system_secret_name) => {
240 let http_client = reqwest::Client::builder()
241 .timeout(std::time::Duration::from_secs(10))
242 .cookie_store(true)
243 .danger_accept_invalid_certs(true)
245 .build()
246 .unwrap();
247 if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
248 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
249 let password = String::from_utf8_lossy(&password.0).to_string();
250 let login_url = reqwest::Url::parse(&format!(
251 "{}/api/login",
252 self.connection_info.environmentd_url,
253 ))
254 .unwrap();
255 match http_client
256 .post(login_url)
257 .body(serde_json::to_string(&LoginCredentials {
258 username: "mz_system".to_owned(),
259 password,
260 })?)
261 .header("Content-Type", "application/json")
262 .send()
263 .await
264 {
265 Ok(response) => {
266 if let Err(e) = response.error_for_status() {
267 trace!("failed to login to environmentd, retrying... ({e})");
268 return Ok(Some(retry_action));
269 }
270 }
271 Err(e) => {
272 trace!("failed to connect to environmentd, retrying... ({e})");
273 return Ok(Some(retry_action));
274 }
275 };
276 }
277 };
278 http_client
279 }
280 None => reqwest::Client::builder()
281 .timeout(std::time::Duration::from_secs(10))
282 .build()
283 .unwrap(),
284 };
285 let status_url = reqwest::Url::parse(&format!(
286 "{}/api/leader/status",
287 self.connection_info.environmentd_url,
288 ))
289 .unwrap();
290
291 match http_client.get(status_url.clone()).send().await {
292 Ok(response) => {
293 let response: BTreeMap<String, DeploymentStatus> = match response.error_for_status()
294 {
295 Ok(response) => response.json().await?,
296 Err(e) => {
297 trace!("failed to get status of environmentd, retrying... ({e})");
298 return Ok(Some(retry_action));
299 }
300 };
301 if force_promote {
302 trace!("skipping cluster catchup");
303 let skip_catchup_url = reqwest::Url::parse(&format!(
304 "{}/api/leader/skip-catchup",
305 self.connection_info.environmentd_url,
306 ))
307 .unwrap();
308 let response = http_client.post(skip_catchup_url).send().await?;
309 if response.status() == StatusCode::BAD_REQUEST {
310 let err: SkipCatchupError = response.json().await?;
311 bail!("failed to skip catchup: {}", err.message);
312 }
313 } else if response["status"] == DeploymentStatus::Initializing {
314 trace!("environmentd is still initializing, retrying...");
315 return Ok(Some(retry_action));
316 } else {
317 trace!("environmentd is ready");
318 }
319 }
320 Err(e) => {
321 trace!("failed to connect to environmentd, retrying... ({e})");
322 return Ok(Some(retry_action));
323 }
324 }
325
326 let promote_url = reqwest::Url::parse(&format!(
327 "{}/api/leader/promote",
328 self.connection_info.environmentd_url,
329 ))
330 .unwrap();
331
332 trace!("promoting new environmentd to leader");
341 let response = http_client.post(promote_url).send().await?;
342 let response: BecomeLeaderResponse = match response.error_for_status() {
343 Ok(response) => response.json().await?,
344 Err(e) => {
345 trace!("failed to promote environmentd, retrying... ({e})");
346 return Ok(Some(retry_action));
347 }
348 };
349 if let BecomeLeaderResult::Failure { message } = response.result {
350 bail!("failed to promote new environmentd: {message}");
351 }
352
353 match http_client.get(status_url.clone()).send().await {
365 Ok(response) => {
366 let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
367 if response["status"] != DeploymentStatus::IsLeader {
368 trace!(
369 "environmentd is still promoting (status: {:?}), retrying...",
370 response["status"]
371 );
372 return Ok(Some(retry_action));
373 } else {
374 trace!("environmentd is ready");
375 }
376 }
377 Err(e) => {
378 trace!("failed to connect to environmentd, retrying... ({e})");
379 return Ok(Some(retry_action));
380 }
381 }
382
383 Ok(None)
384 }
385
386 #[instrument]
387 pub async fn promote_services(
388 &self,
389 client: &Client,
390 namespace: &str,
391 ) -> Result<(), anyhow::Error> {
392 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
393
394 trace!("applying environmentd public service");
395 apply_resource(&service_api, &*self.public_service).await?;
396
397 Ok(())
398 }
399
400 #[instrument]
401 pub async fn teardown_generation(
402 &self,
403 client: &Client,
404 mz: &Materialize,
405 generation: u64,
406 ) -> Result<(), anyhow::Error> {
407 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
408 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
409 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
410
411 trace!("deleting environmentd statefulset for generation {generation}");
412 delete_resource(
413 &statefulset_api,
414 &mz.environmentd_statefulset_name(generation),
415 )
416 .await?;
417
418 trace!("deleting persist pubsub service for generation {generation}");
419 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
420
421 trace!("deleting environmentd per-generation service for generation {generation}");
422 delete_resource(
423 &service_api,
424 &mz.environmentd_generation_service_name(generation),
425 )
426 .await?;
427
428 trace!("deleting listeners configmap for generation {generation}");
429 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
430
431 Ok(())
432 }
433
434 pub fn generate_hash(&self) -> String {
437 let mut hasher = Sha256::new();
438 hasher.update(&serde_json::to_string(self).unwrap());
439 format!("{:x}", hasher.finalize())
440 }
441}
442
443fn create_environmentd_network_policies(
444 config: &super::MaterializeControllerArgs,
445 mz: &Materialize,
446 orchestratord_namespace: &str,
447) -> Vec<NetworkPolicy> {
448 let mut network_policies = Vec::new();
449 if config.network_policies.internal_enabled {
450 let environmentd_label_selector = LabelSelector {
451 match_labels: Some(
452 mz.default_labels()
453 .into_iter()
454 .chain([(
455 "materialize.cloud/app".to_owned(),
456 mz.environmentd_app_name(),
457 )])
458 .collect(),
459 ),
460 ..Default::default()
461 };
462 let orchestratord_label_selector = LabelSelector {
463 match_labels: Some(
464 config
465 .orchestratord_pod_selector_labels
466 .iter()
467 .cloned()
468 .map(|kv| (kv.key, kv.value))
469 .collect(),
470 ),
471 ..Default::default()
472 };
473 let all_pods_label_selector = LabelSelector {
476 match_labels: Some(mz.default_labels()),
477 ..Default::default()
478 };
479 network_policies.extend([
480 NetworkPolicy {
483 metadata: mz
484 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
485 spec: Some(NetworkPolicySpec {
486 egress: Some(vec![NetworkPolicyEgressRule {
487 to: Some(vec![NetworkPolicyPeer {
488 pod_selector: Some(all_pods_label_selector.clone()),
489 ..Default::default()
490 }]),
491 ..Default::default()
492 }]),
493 ingress: Some(vec![NetworkPolicyIngressRule {
494 from: Some(vec![NetworkPolicyPeer {
495 pod_selector: Some(all_pods_label_selector.clone()),
496 ..Default::default()
497 }]),
498 ..Default::default()
499 }]),
500 pod_selector: Some(all_pods_label_selector.clone()),
501 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
502 ..Default::default()
503 }),
504 },
505 NetworkPolicy {
508 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
509 spec: Some(NetworkPolicySpec {
510 ingress: Some(vec![NetworkPolicyIngressRule {
511 from: Some(vec![NetworkPolicyPeer {
512 namespace_selector: Some(LabelSelector {
513 match_labels: Some(btreemap! {
514 "kubernetes.io/metadata.name".into()
515 => orchestratord_namespace.into(),
516 }),
517 ..Default::default()
518 }),
519 pod_selector: Some(orchestratord_label_selector),
520 ..Default::default()
521 }]),
522 ports: Some(vec![
523 NetworkPolicyPort {
524 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
525 protocol: Some("TCP".to_string()),
526 ..Default::default()
527 },
528 NetworkPolicyPort {
529 port: Some(IntOrString::Int(
530 config.environmentd_internal_http_port.into(),
531 )),
532 protocol: Some("TCP".to_string()),
533 ..Default::default()
534 },
535 ]),
536 ..Default::default()
537 }]),
538 pod_selector: Some(environmentd_label_selector),
539 policy_types: Some(vec!["Ingress".to_owned()]),
540 ..Default::default()
541 }),
542 },
543 ]);
544 }
545 if config.network_policies.ingress_enabled {
546 let mut ingress_label_selector = mz.default_labels();
547 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
548 network_policies.extend([NetworkPolicy {
549 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
550 spec: Some(NetworkPolicySpec {
551 ingress: Some(vec![NetworkPolicyIngressRule {
552 from: Some(
553 config
554 .network_policies
555 .ingress_cidrs
556 .iter()
557 .map(|cidr| NetworkPolicyPeer {
558 ip_block: Some(IPBlock {
559 cidr: cidr.to_owned(),
560 except: None,
561 }),
562 ..Default::default()
563 })
564 .collect(),
565 ),
566 ports: Some(vec![
567 NetworkPolicyPort {
568 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
569 protocol: Some("TCP".to_string()),
570 ..Default::default()
571 },
572 NetworkPolicyPort {
573 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
574 protocol: Some("TCP".to_string()),
575 ..Default::default()
576 },
577 ]),
578 ..Default::default()
579 }]),
580 pod_selector: Some(LabelSelector {
581 match_expressions: None,
582 match_labels: Some(ingress_label_selector),
583 }),
584 policy_types: Some(vec!["Ingress".to_owned()]),
585 ..Default::default()
586 }),
587 }]);
588 }
589 if config.network_policies.egress_enabled {
590 network_policies.extend([NetworkPolicy {
591 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
592 spec: Some(NetworkPolicySpec {
593 egress: Some(vec![NetworkPolicyEgressRule {
594 to: Some(
595 config
596 .network_policies
597 .egress_cidrs
598 .iter()
599 .map(|cidr| NetworkPolicyPeer {
600 ip_block: Some(IPBlock {
601 cidr: cidr.to_owned(),
602 except: None,
603 }),
604 ..Default::default()
605 })
606 .collect(),
607 ),
608 ..Default::default()
609 }]),
610 pod_selector: Some(LabelSelector {
611 match_expressions: None,
612 match_labels: Some(mz.default_labels()),
613 }),
614 policy_types: Some(vec!["Egress".to_owned()]),
615 ..Default::default()
616 }),
617 }]);
618 }
619 network_policies
620}
621
622fn create_service_account_object(
623 config: &super::MaterializeControllerArgs,
624 mz: &Materialize,
625) -> Option<ServiceAccount> {
626 if mz.create_service_account() {
627 let mut annotations: BTreeMap<String, String> = mz
628 .spec
629 .service_account_annotations
630 .clone()
631 .unwrap_or_default();
632 if let (CloudProvider::Aws, Some(role_arn)) = (
633 config.cloud_provider,
634 mz.spec
635 .environmentd_iam_role_arn
636 .as_deref()
637 .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
638 ) {
639 warn!(
640 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
641 );
642 annotations.insert(
643 "eks.amazonaws.com/role-arn".to_string(),
644 role_arn.to_string(),
645 );
646 };
647
648 let mut labels = mz.default_labels();
649 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
650
651 Some(ServiceAccount {
652 metadata: ObjectMeta {
653 annotations: Some(annotations),
654 labels: Some(labels),
655 ..mz.managed_resource_meta(mz.service_account_name())
656 },
657 ..Default::default()
658 })
659 } else {
660 None
661 }
662}
663
664fn create_role_object(mz: &Materialize) -> Role {
665 Role {
666 metadata: mz.managed_resource_meta(mz.role_name()),
667 rules: Some(vec![
668 PolicyRule {
669 api_groups: Some(vec!["apps".to_string()]),
670 resources: Some(vec!["statefulsets".to_string()]),
671 verbs: vec![
672 "get".to_string(),
673 "list".to_string(),
674 "watch".to_string(),
675 "create".to_string(),
676 "update".to_string(),
677 "patch".to_string(),
678 "delete".to_string(),
679 ],
680 ..Default::default()
681 },
682 PolicyRule {
683 api_groups: Some(vec!["".to_string()]),
684 resources: Some(vec![
685 "persistentvolumeclaims".to_string(),
686 "pods".to_string(),
687 "secrets".to_string(),
688 "services".to_string(),
689 ]),
690 verbs: vec![
691 "get".to_string(),
692 "list".to_string(),
693 "watch".to_string(),
694 "create".to_string(),
695 "update".to_string(),
696 "patch".to_string(),
697 "delete".to_string(),
698 ],
699 ..Default::default()
700 },
701 PolicyRule {
702 api_groups: Some(vec!["".to_string()]),
703 resources: Some(vec!["configmaps".to_string()]),
704 verbs: vec!["get".to_string()],
705 ..Default::default()
706 },
707 PolicyRule {
708 api_groups: Some(vec!["materialize.cloud".to_string()]),
709 resources: Some(vec!["vpcendpoints".to_string()]),
710 verbs: vec![
711 "get".to_string(),
712 "list".to_string(),
713 "watch".to_string(),
714 "create".to_string(),
715 "update".to_string(),
716 "patch".to_string(),
717 "delete".to_string(),
718 ],
719 ..Default::default()
720 },
721 PolicyRule {
722 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
723 resources: Some(vec!["pods".to_string()]),
724 verbs: vec!["get".to_string(), "list".to_string()],
725 ..Default::default()
726 },
727 PolicyRule {
728 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
729 resources: Some(vec![
730 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
731 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
732 ]),
733 verbs: vec!["get".to_string()],
734 ..Default::default()
735 },
736 ]),
737 }
738}
739
740fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
741 RoleBinding {
742 metadata: mz.managed_resource_meta(mz.role_binding_name()),
743 role_ref: RoleRef {
744 api_group: "".to_string(),
745 kind: "Role".to_string(),
746 name: mz.role_name(),
747 },
748 subjects: Some(vec![Subject {
749 api_group: Some("".to_string()),
750 kind: "ServiceAccount".to_string(),
751 name: mz.service_account_name(),
752 namespace: Some(mz.namespace()),
753 }]),
754 }
755}
756
757fn create_public_service_object(
758 config: &super::MaterializeControllerArgs,
759 mz: &Materialize,
760 generation: u64,
761) -> Service {
762 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
763}
764
765fn create_generation_service_object(
766 config: &super::MaterializeControllerArgs,
767 mz: &Materialize,
768 generation: u64,
769) -> Service {
770 create_base_service_object(
771 config,
772 mz,
773 generation,
774 &mz.environmentd_generation_service_name(generation),
775 )
776}
777
778fn create_base_service_object(
779 config: &super::MaterializeControllerArgs,
780 mz: &Materialize,
781 generation: u64,
782 service_name: &str,
783) -> Service {
784 let ports = vec![
785 ServicePort {
786 port: config.environmentd_sql_port.into(),
787 protocol: Some("TCP".to_string()),
788 name: Some("sql".to_string()),
789 ..Default::default()
790 },
791 ServicePort {
792 port: config.environmentd_http_port.into(),
793 protocol: Some("TCP".to_string()),
794 name: Some("https".to_string()),
795 ..Default::default()
796 },
797 ServicePort {
798 port: config.environmentd_internal_sql_port.into(),
799 protocol: Some("TCP".to_string()),
800 name: Some("internal-sql".to_string()),
801 ..Default::default()
802 },
803 ServicePort {
804 port: config.environmentd_internal_http_port.into(),
805 protocol: Some("TCP".to_string()),
806 name: Some("internal-http".to_string()),
807 ..Default::default()
808 },
809 ];
810
811 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
812
813 let spec = ServiceSpec {
814 type_: Some("ClusterIP".to_string()),
815 cluster_ip: Some("None".to_string()),
816 selector: Some(selector),
817 ports: Some(ports),
818 ..Default::default()
819 };
820
821 Service {
822 metadata: mz.managed_resource_meta(service_name.to_string()),
823 spec: Some(spec),
824 status: None,
825 }
826}
827
828fn create_persist_pubsub_service(
829 config: &super::MaterializeControllerArgs,
830 mz: &Materialize,
831 generation: u64,
832) -> Service {
833 Service {
834 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
835 spec: Some(ServiceSpec {
836 type_: Some("ClusterIP".to_string()),
837 cluster_ip: Some("None".to_string()),
838 selector: Some(btreemap! {
839 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
840 }),
841 ports: Some(vec![ServicePort {
842 name: Some("grpc".to_string()),
843 protocol: Some("TCP".to_string()),
844 port: config.environmentd_internal_persist_pubsub_port.into(),
845 ..Default::default()
846 }]),
847 ..Default::default()
848 }),
849 status: None,
850 }
851}
852
853fn create_environmentd_certificate(
854 config: &super::MaterializeControllerArgs,
855 mz: &Materialize,
856) -> Option<Certificate> {
857 create_certificate(
858 config.default_certificate_specs.internal.clone(),
859 mz,
860 mz.spec.internal_certificate_spec.clone(),
861 mz.environmentd_certificate_name(),
862 mz.environmentd_certificate_secret_name(),
863 Some(vec![
864 mz.environmentd_service_name(),
865 mz.environmentd_service_internal_fqdn(),
866 ]),
867 CertificatePrivateKeyAlgorithm::Ed25519,
868 None,
869 )
870}
871
872fn create_environmentd_statefulset_object(
873 config: &super::MaterializeControllerArgs,
874 tracing: &TracingCliArgs,
875 mz: &Materialize,
876 generation: u64,
877) -> StatefulSet {
878 let mut env = vec![
888 EnvVar {
889 name: "MZ_METADATA_BACKEND_URL".to_string(),
890 value_from: Some(EnvVarSource {
891 secret_key_ref: Some(SecretKeySelector {
892 name: mz.backend_secret_name(),
893 key: "metadata_backend_url".to_string(),
894 optional: Some(false),
895 }),
896 ..Default::default()
897 }),
898 ..Default::default()
899 },
900 EnvVar {
901 name: "MZ_PERSIST_BLOB_URL".to_string(),
902 value_from: Some(EnvVarSource {
903 secret_key_ref: Some(SecretKeySelector {
904 name: mz.backend_secret_name(),
905 key: "persist_backend_url".to_string(),
906 optional: Some(false),
907 }),
908 ..Default::default()
909 }),
910 ..Default::default()
911 },
912 ];
913
914 env.push(EnvVar {
915 name: "AWS_REGION".to_string(),
916 value: Some(config.region.clone()),
917 ..Default::default()
918 });
919
920 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
921
922 let mut args = vec![];
923
924 if let Some(helm_chart_version) = &config.helm_chart_version {
925 args.push(format!("--helm-chart-version={helm_chart_version}"));
926 }
927
928 args.push(format!(
930 "--environment-id={}",
931 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
932 ));
933
934 args.push(format!(
936 "--clusterd-image={}",
937 matching_image_from_environmentd_image_ref(
938 &mz.spec.environmentd_image_ref,
939 "clusterd",
940 None
941 )
942 ));
943
944 args.extend(
946 [
947 config
948 .environmentd_cluster_replica_sizes
949 .as_ref()
950 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
951 config
952 .bootstrap_default_cluster_replica_size
953 .as_ref()
954 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
955 config
956 .bootstrap_builtin_system_cluster_replica_size
957 .as_ref()
958 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
959 config
960 .bootstrap_builtin_probe_cluster_replica_size
961 .as_ref()
962 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
963 config
964 .bootstrap_builtin_support_cluster_replica_size
965 .as_ref()
966 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
967 config
968 .bootstrap_builtin_catalog_server_cluster_replica_size
969 .as_ref()
970 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
971 config
972 .bootstrap_builtin_analytics_cluster_replica_size
973 .as_ref()
974 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
975 config
976 .bootstrap_builtin_system_cluster_replication_factor
977 .as_ref()
978 .map(|replication_factor| {
979 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
980 }),
981 config
982 .bootstrap_builtin_probe_cluster_replication_factor
983 .as_ref()
984 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
985 config
986 .bootstrap_builtin_support_cluster_replication_factor
987 .as_ref()
988 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
989 config
990 .bootstrap_builtin_analytics_cluster_replication_factor
991 .as_ref()
992 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
993 ]
994 .into_iter()
995 .flatten(),
996 );
997
998 args.extend(
999 config
1000 .environmentd_allowed_origins
1001 .iter()
1002 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1003 );
1004
1005 args.push(format!(
1006 "--secrets-controller={}",
1007 config.secrets_controller
1008 ));
1009
1010 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1011 if let Ok(cluster_replica_sizes) =
1012 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1013 {
1014 let cluster_replica_sizes: Vec<_> =
1015 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1016 args.push(format!(
1017 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1018 cluster_replica_sizes.join("', '")
1019 ));
1020 }
1021 }
1022 if !config.cloud_provider.is_cloud() {
1023 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1024 }
1025
1026 if config.enable_internal_statement_logging {
1027 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1028 }
1029
1030 if config.disable_statement_logging {
1031 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1032 }
1033
1034 if !mz.spec.enable_rbac {
1035 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1036 }
1037
1038 args.push("--persist-isolated-runtime-threads=-1".to_string());
1042
1043 if config.cloud_provider == CloudProvider::Aws {
1045 if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1046 for az in azs {
1047 args.push(format!("--availability-zone={az}"));
1048 }
1049 }
1050
1051 if let Some(environmentd_connection_role_arn) = mz
1052 .spec
1053 .environmentd_connection_role_arn
1054 .as_deref()
1055 .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1056 {
1057 args.push(format!(
1058 "--aws-connection-role-arn={}",
1059 environmentd_connection_role_arn
1060 ));
1061 }
1062 if let Some(account_id) = &config.aws_info.aws_account_id {
1063 args.push(format!("--aws-account-id={account_id}"));
1064 }
1065
1066 args.extend([format!(
1067 "--aws-secrets-controller-tags=Environment={}",
1068 mz.name_unchecked()
1069 )]);
1070 args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1071 }
1072
1073 args.extend([
1075 "--orchestrator=kubernetes".into(),
1076 format!(
1077 "--orchestrator-kubernetes-service-account={}",
1078 &mz.service_account_name()
1079 ),
1080 format!(
1081 "--orchestrator-kubernetes-image-pull-policy={}",
1082 config.image_pull_policy.as_kebab_case_str(),
1083 ),
1084 ]);
1085 for selector in &config.clusterd_node_selector {
1086 args.push(format!(
1087 "--orchestrator-kubernetes-service-node-selector={}={}",
1088 selector.key, selector.value,
1089 ));
1090 }
1091 if mz.meets_minimum_version(&V144) {
1092 if let Some(affinity) = &config.clusterd_affinity {
1093 let affinity = serde_json::to_string(affinity).unwrap();
1094 args.push(format!(
1095 "--orchestrator-kubernetes-service-affinity={affinity}"
1096 ))
1097 }
1098 if let Some(tolerations) = &config.clusterd_tolerations {
1099 let tolerations = serde_json::to_string(tolerations).unwrap();
1100 args.push(format!(
1101 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1102 ))
1103 }
1104 }
1105 if let Some(scheduler_name) = &config.scheduler_name {
1106 args.push(format!(
1107 "--orchestrator-kubernetes-scheduler-name={}",
1108 scheduler_name
1109 ));
1110 }
1111 if mz.meets_minimum_version(&V154_DEV0) {
1112 args.extend(
1113 mz.spec
1114 .pod_annotations
1115 .as_ref()
1116 .map(|annotations| annotations.iter())
1117 .unwrap_or_default()
1118 .map(|(key, val)| {
1119 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1120 }),
1121 );
1122 }
1123 args.extend(
1124 mz.default_labels()
1125 .iter()
1126 .chain(
1127 mz.spec
1128 .pod_labels
1129 .as_ref()
1130 .map(|labels| labels.iter())
1131 .unwrap_or_default(),
1132 )
1133 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1134 );
1135 if let Some(status) = &mz.status {
1136 args.push(format!(
1137 "--orchestrator-kubernetes-name-prefix=mz{}-",
1138 status.resource_id
1139 ));
1140 }
1141
1142 args.extend(["--log-format=json".into()]);
1144 if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1145 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1146 }
1147 args.extend([
1149 format!(
1150 "--opentelemetry-resource=organization_id={}",
1151 mz.spec.environment_id
1152 ),
1153 format!(
1154 "--opentelemetry-resource=environment_name={}",
1155 mz.name_unchecked()
1156 ),
1157 ]);
1158
1159 if let Some(segment_api_key) = &config.segment_api_key {
1160 args.push(format!("--segment-api-key={}", segment_api_key));
1161 if config.segment_client_side {
1162 args.push("--segment-client-side".into());
1163 }
1164 }
1165
1166 let mut volumes = Vec::new();
1167 let mut volume_mounts = Vec::new();
1168 if issuer_ref_defined(
1169 &config.default_certificate_specs.internal,
1170 &mz.spec.internal_certificate_spec,
1171 ) {
1172 volumes.push(Volume {
1173 name: "certificate".to_owned(),
1174 secret: Some(SecretVolumeSource {
1175 default_mode: Some(0o400),
1176 secret_name: Some(mz.environmentd_certificate_secret_name()),
1177 items: None,
1178 optional: Some(false),
1179 }),
1180 ..Default::default()
1181 });
1182 volume_mounts.push(VolumeMount {
1183 name: "certificate".to_owned(),
1184 mount_path: "/etc/materialized".to_owned(),
1185 read_only: Some(true),
1186 ..Default::default()
1187 });
1188 args.extend([
1189 "--tls-mode=require".into(),
1190 "--tls-cert=/etc/materialized/tls.crt".into(),
1191 "--tls-key=/etc/materialized/tls.key".into(),
1192 ]);
1193 } else {
1194 args.push("--tls-mode=disable".to_string());
1195 }
1196 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1197 args.push(format!(
1198 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1199 ephemeral_volume_class
1200 ));
1201 }
1202 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1204
1205 if let Some(sentry_dsn) = &tracing.sentry_dsn {
1207 args.push(format!("--sentry-dsn={}", sentry_dsn));
1208 if let Some(sentry_environment) = &tracing.sentry_environment {
1209 args.push(format!("--sentry-environment={}", sentry_environment));
1210 }
1211 args.push(format!("--sentry-tag=region={}", config.region));
1212 }
1213
1214 args.push(format!(
1216 "--persist-pubsub-url=http://{}:{}",
1217 mz.persist_pubsub_service_name(generation),
1218 config.environmentd_internal_persist_pubsub_port,
1219 ));
1220 args.push(format!(
1221 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1222 config.environmentd_internal_persist_pubsub_port
1223 ));
1224
1225 args.push(format!("--deploy-generation={}", generation));
1226
1227 args.push(format!(
1229 "--internal-console-redirect-url={}",
1230 &config.internal_console_proxy_url,
1231 ));
1232
1233 if !config.collect_pod_metrics {
1234 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1235 }
1236 if config.enable_prometheus_scrape_annotations {
1237 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1238 }
1239
1240 if config.disable_license_key_checks {
1243 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1244 args.push("--disable-license-key-checks".into());
1245 }
1246 }
1247
1248 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1251 || mz.meets_minimum_version(&V153)
1252 {
1253 volume_mounts.push(VolumeMount {
1254 name: "license-key".to_string(),
1255 mount_path: "/license_key".to_string(),
1256 ..Default::default()
1257 });
1258 volumes.push(Volume {
1259 name: "license-key".to_string(),
1260 secret: Some(SecretVolumeSource {
1261 default_mode: Some(256),
1262 optional: Some(false),
1263 secret_name: Some(mz.backend_secret_name()),
1264 items: Some(vec![KeyToPath {
1265 key: "license_key".to_string(),
1266 path: "license_key".to_string(),
1267 ..Default::default()
1268 }]),
1269 ..Default::default()
1270 }),
1271 ..Default::default()
1272 });
1273 env.push(EnvVar {
1274 name: "MZ_LICENSE_KEY".to_string(),
1275 value: Some("/license_key/license_key".to_string()),
1276 ..Default::default()
1277 });
1278 }
1279
1280 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1282 args.extend(extra_args.iter().cloned());
1283 }
1284
1285 let probe = Probe {
1286 initial_delay_seconds: Some(1),
1287 failure_threshold: Some(12),
1288 tcp_socket: Some(TCPSocketAction {
1289 host: None,
1290 port: IntOrString::Int(config.environmentd_sql_port.into()),
1291 }),
1292 ..Default::default()
1293 };
1294
1295 let security_context = if config.enable_security_context {
1296 Some(SecurityContext {
1300 run_as_non_root: Some(true),
1301 capabilities: Some(Capabilities {
1302 drop: Some(vec!["ALL".to_string()]),
1303 ..Default::default()
1304 }),
1305 seccomp_profile: Some(SeccompProfile {
1306 type_: "RuntimeDefault".to_string(),
1307 ..Default::default()
1308 }),
1309 allow_privilege_escalation: Some(false),
1310 ..Default::default()
1311 })
1312 } else {
1313 None
1314 };
1315
1316 let ports = vec![
1317 ContainerPort {
1318 container_port: config.environmentd_sql_port.into(),
1319 name: Some("sql".to_owned()),
1320 ..Default::default()
1321 },
1322 ContainerPort {
1323 container_port: config.environmentd_internal_sql_port.into(),
1324 name: Some("internal-sql".to_owned()),
1325 ..Default::default()
1326 },
1327 ContainerPort {
1328 container_port: config.environmentd_http_port.into(),
1329 name: Some("http".to_owned()),
1330 ..Default::default()
1331 },
1332 ContainerPort {
1333 container_port: config.environmentd_internal_http_port.into(),
1334 name: Some("internal-http".to_owned()),
1335 ..Default::default()
1336 },
1337 ContainerPort {
1338 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1339 name: Some("persist-pubsub".to_owned()),
1340 ..Default::default()
1341 },
1342 ];
1343
1344 if mz.meets_minimum_version(&V147_DEV0) {
1346 volume_mounts.push(VolumeMount {
1347 name: "listeners-configmap".to_string(),
1348 mount_path: "/listeners".to_string(),
1349 ..Default::default()
1350 });
1351 volumes.push(Volume {
1352 name: "listeners-configmap".to_string(),
1353 config_map: Some(ConfigMapVolumeSource {
1354 name: mz.listeners_configmap_name(generation),
1355 default_mode: Some(256),
1356 optional: Some(false),
1357 items: Some(vec![KeyToPath {
1358 key: "listeners.json".to_string(),
1359 path: "listeners.json".to_string(),
1360 ..Default::default()
1361 }]),
1362 }),
1363 ..Default::default()
1364 });
1365 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1366 if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1367 args.push("--system-parameter-default=enable_password_auth=true".into());
1368 env.push(EnvVar {
1369 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1370 value_from: Some(EnvVarSource {
1371 secret_key_ref: Some(SecretKeySelector {
1372 name: mz.backend_secret_name(),
1373 key: "external_login_password_mz_system".to_string(),
1374 optional: Some(false),
1375 }),
1376 ..Default::default()
1377 }),
1378 ..Default::default()
1379 })
1380 }
1381 } else {
1382 args.extend([
1383 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1384 format!(
1385 "--http-listen-addr=0.0.0.0:{}",
1386 config.environmentd_http_port
1387 ),
1388 format!(
1389 "--internal-sql-listen-addr=0.0.0.0:{}",
1390 config.environmentd_internal_sql_port
1391 ),
1392 format!(
1393 "--internal-http-listen-addr=0.0.0.0:{}",
1394 config.environmentd_internal_http_port
1395 ),
1396 ]);
1397 }
1398
1399 let container = Container {
1400 name: "environmentd".to_owned(),
1401 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1402 image_pull_policy: Some(config.image_pull_policy.to_string()),
1403 ports: Some(ports),
1404 args: Some(args),
1405 env: Some(env),
1406 volume_mounts: Some(volume_mounts),
1407 liveness_probe: Some(probe.clone()),
1408 readiness_probe: Some(probe),
1409 resources: mz
1410 .spec
1411 .environmentd_resource_requirements
1412 .clone()
1413 .or_else(|| config.environmentd_default_resources.clone()),
1414 security_context: security_context.clone(),
1415 ..Default::default()
1416 };
1417
1418 let mut pod_template_labels = mz.default_labels();
1419 pod_template_labels.insert(
1420 "materialize.cloud/name".to_owned(),
1421 mz.environmentd_statefulset_name(generation),
1422 );
1423 pod_template_labels.insert(
1424 "materialize.cloud/app".to_owned(),
1425 mz.environmentd_app_name(),
1426 );
1427 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1428 pod_template_labels.extend(
1429 mz.spec
1430 .pod_labels
1431 .as_ref()
1432 .map(|labels| labels.iter())
1433 .unwrap_or_default()
1434 .map(|(key, value)| (key.clone(), value.clone())),
1435 );
1436
1437 let mut pod_template_annotations = btreemap! {
1438 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1440
1441 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1443 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1444 "materialize.cloud/generation".to_owned() => generation.to_string(),
1445 };
1446 if config.enable_prometheus_scrape_annotations {
1447 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1448 pod_template_annotations.insert(
1449 "prometheus.io/port".to_owned(),
1450 config.environmentd_internal_http_port.to_string(),
1451 );
1452 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1453 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1454 pod_template_annotations.insert(
1455 "materialize.prometheus.io/mz_usage_path".to_owned(),
1456 "/metrics/mz_usage".to_string(),
1457 );
1458 pod_template_annotations.insert(
1459 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1460 "/metrics/mz_frontier".to_string(),
1461 );
1462 pod_template_annotations.insert(
1463 "materialize.prometheus.io/mz_compute_path".to_owned(),
1464 "/metrics/mz_compute".to_string(),
1465 );
1466 pod_template_annotations.insert(
1467 "materialize.prometheus.io/mz_storage_path".to_owned(),
1468 "/metrics/mz_storage".to_string(),
1469 );
1470 }
1471 pod_template_annotations.extend(
1472 mz.spec
1473 .pod_annotations
1474 .as_ref()
1475 .map(|annotations| annotations.iter())
1476 .unwrap_or_default()
1477 .map(|(key, value)| (key.clone(), value.clone())),
1478 );
1479
1480 let mut tolerations = vec![
1481 Toleration {
1485 effect: Some("NoExecute".into()),
1486 key: Some("node.kubernetes.io/not-ready".into()),
1487 operator: Some("Exists".into()),
1488 toleration_seconds: Some(30),
1489 value: None,
1490 },
1491 Toleration {
1492 effect: Some("NoExecute".into()),
1493 key: Some("node.kubernetes.io/unreachable".into()),
1494 operator: Some("Exists".into()),
1495 toleration_seconds: Some(30),
1496 value: None,
1497 },
1498 ];
1499 if let Some(user_tolerations) = &config.environmentd_tolerations {
1500 tolerations.extend(user_tolerations.iter().cloned());
1501 }
1502 let tolerations = Some(tolerations);
1503
1504 let pod_template_spec = PodTemplateSpec {
1505 metadata: Some(ObjectMeta {
1508 labels: Some(pod_template_labels),
1509 annotations: Some(pod_template_annotations), ..Default::default()
1511 }),
1512 spec: Some(PodSpec {
1513 containers: vec![container],
1514 node_selector: Some(
1515 config
1516 .environmentd_node_selector
1517 .iter()
1518 .map(|selector| (selector.key.clone(), selector.value.clone()))
1519 .collect(),
1520 ),
1521 affinity: config.environmentd_affinity.clone(),
1522 scheduler_name: config.scheduler_name.clone(),
1523 service_account_name: Some(mz.service_account_name()),
1524 volumes: Some(volumes),
1525 security_context: Some(PodSecurityContext {
1526 fs_group: Some(999),
1527 run_as_user: Some(999),
1528 run_as_group: Some(999),
1529 ..Default::default()
1530 }),
1531 tolerations,
1532 termination_grace_period_seconds: Some(0),
1553 ..Default::default()
1554 }),
1555 };
1556
1557 let mut match_labels = BTreeMap::new();
1558 match_labels.insert(
1559 "materialize.cloud/name".to_owned(),
1560 mz.environmentd_statefulset_name(generation),
1561 );
1562
1563 let statefulset_spec = StatefulSetSpec {
1564 replicas: Some(1),
1565 template: pod_template_spec,
1566 update_strategy: Some(StatefulSetUpdateStrategy {
1567 rolling_update: None,
1568 type_: Some("OnDelete".to_owned()),
1569 }),
1570 service_name: Some(mz.environmentd_service_name()),
1571 selector: LabelSelector {
1572 match_expressions: None,
1573 match_labels: Some(match_labels),
1574 },
1575 ..Default::default()
1576 };
1577
1578 StatefulSet {
1579 metadata: ObjectMeta {
1580 annotations: Some(btreemap! {
1581 "materialize.cloud/generation".to_owned() => generation.to_string(),
1582 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1583 }),
1584 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1585 },
1586 spec: Some(statefulset_spec),
1587 status: None,
1588 }
1589}
1590
1591fn create_connection_info(
1592 config: &super::MaterializeControllerArgs,
1593 mz: &Materialize,
1594 generation: u64,
1595) -> ConnectionInfo {
1596 let external_enable_tls = issuer_ref_defined(
1597 &config.default_certificate_specs.internal,
1598 &mz.spec.internal_certificate_spec,
1599 );
1600 let authenticator_kind = mz.spec.authenticator_kind;
1601 let mut listeners_config = ListenersConfig {
1602 sql: btreemap! {
1603 "external".to_owned() => SqlListenerConfig{
1604 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1605 authenticator_kind,
1606 allowed_roles: AllowedRoles::Normal,
1607 enable_tls: external_enable_tls,
1608 },
1609 "internal".to_owned() => SqlListenerConfig{
1610 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1611 authenticator_kind: AuthenticatorKind::None,
1612 allowed_roles: AllowedRoles::NormalAndInternal,
1614 enable_tls: false,
1615 },
1616 },
1617 http: btreemap! {
1618 "external".to_owned() => HttpListenerConfig{
1619 base: BaseListenerConfig {
1620 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1621 authenticator_kind,
1622 allowed_roles: AllowedRoles::Normal,
1623 enable_tls: external_enable_tls,
1624 },
1625 routes: HttpRoutesEnabled{
1626 base: true,
1627 webhook: true,
1628 internal: false,
1629 metrics: false,
1630 profiling: false,
1631 }
1632 },
1633 "internal".to_owned() => HttpListenerConfig{
1634 base: BaseListenerConfig {
1635 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1636 authenticator_kind: AuthenticatorKind::None,
1637 allowed_roles: AllowedRoles::NormalAndInternal,
1639 enable_tls: false,
1640 },
1641 routes: HttpRoutesEnabled{
1642 base: true,
1643 webhook: true,
1644 internal: true,
1645 metrics: true,
1646 profiling: true,
1647 }
1648 },
1649 },
1650 };
1651 if authenticator_kind == AuthenticatorKind::Password {
1652 listeners_config.sql.remove("internal");
1653 listeners_config.http.remove("internal");
1654
1655 listeners_config.sql.get_mut("external").map(|listener| {
1656 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1657 listener
1658 });
1659 listeners_config.http.get_mut("external").map(|listener| {
1660 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1661 listener.routes.internal = true;
1662 listener.routes.profiling = true;
1663 listener
1664 });
1665
1666 listeners_config.http.insert(
1667 "metrics".to_owned(),
1668 HttpListenerConfig {
1669 base: BaseListenerConfig {
1670 addr: SocketAddr::new(
1671 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1672 config.environmentd_internal_http_port,
1673 ),
1674 authenticator_kind: AuthenticatorKind::None,
1675 allowed_roles: AllowedRoles::NormalAndInternal,
1676 enable_tls: false,
1677 },
1678 routes: HttpRoutesEnabled {
1679 base: false,
1680 webhook: false,
1681 internal: false,
1682 metrics: true,
1683 profiling: false,
1684 },
1685 },
1686 );
1687 }
1688
1689 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1690 let listeners_configmap = ConfigMap {
1691 binary_data: None,
1692 data: Some(btreemap! {
1693 "listeners.json".to_owned() => listeners_json,
1694 }),
1695 immutable: None,
1696 metadata: ObjectMeta {
1697 annotations: Some(btreemap! {
1698 "materialize.cloud/generation".to_owned() => generation.to_string(),
1699 }),
1700 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1701 },
1702 };
1703
1704 let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1705 AuthenticatorKind::Password => {
1706 let scheme = if external_enable_tls { "https" } else { "http" };
1707 (
1708 scheme,
1709 config.environmentd_http_port,
1710 Some(mz.spec.backend_secret_name.clone()),
1711 )
1712 }
1713 _ => ("http", config.environmentd_internal_http_port, None),
1714 };
1715 let environmentd_url = format!(
1716 "{}://{}.{}.svc.cluster.local:{}",
1717 scheme,
1718 mz.environmentd_generation_service_name(generation),
1719 mz.namespace(),
1720 leader_api_port,
1721 );
1722 ConnectionInfo {
1723 environmentd_url,
1724 listeners_configmap,
1725 mz_system_secret_name,
1726 }
1727}
1728
1729#[derive(Debug, Deserialize, PartialEq, Eq)]
1731struct BecomeLeaderResponse {
1732 result: BecomeLeaderResult,
1733}
1734
1735#[derive(Debug, Deserialize, PartialEq, Eq)]
1736enum BecomeLeaderResult {
1737 Success,
1738 Failure { message: String },
1739}
1740
1741#[derive(Debug, Deserialize, PartialEq, Eq)]
1742struct SkipCatchupError {
1743 message: String,
1744}