1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
25 VolumeMount,
26 },
27 networking::v1::{
28 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
29 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
30 },
31 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
32 },
33 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
34};
35use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
36use maplit::btreemap;
37use mz_server_core::listeners::{
38 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
39 ListenersConfig, SqlListenerConfig,
40};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::k8s::{apply_resource, delete_resource, get_resource};
50use crate::tls::{create_certificate, issuer_ref_defined};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
53use mz_cloud_resources::crd::{
54 ManagedResource,
55 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
56};
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 140,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69 major: 0,
70 minor: 147,
71 patch: 0,
72 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 0,
78 minor: 154,
79 patch: 0,
80 pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
86 major: 26,
87 minor: 1,
88 patch: 0,
89 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
90 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
91});
92
93#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
98pub enum DeploymentStatus {
99 Initializing,
102 ReadyToPromote,
104 Promoting,
106 IsLeader,
108}
109
110#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct GetLeaderStatusResponse {
112 status: DeploymentStatus,
113}
114
115#[derive(Deserialize, Serialize)]
116pub struct LoginCredentials {
117 username: String,
118 password: String,
120}
121
122#[derive(Debug, Serialize)]
123pub struct ConnectionInfo {
124 pub environmentd_url: String,
125 pub mz_system_secret_name: Option<String>,
126 pub listeners_configmap: ConfigMap,
127}
128
129#[derive(Debug, Serialize)]
130pub struct Resources {
131 pub generation: u64,
132 pub environmentd_network_policies: Vec<NetworkPolicy>,
133 pub service_account: Box<Option<ServiceAccount>>,
134 pub role: Box<Role>,
135 pub role_binding: Box<RoleBinding>,
136 pub public_service: Box<Service>,
137 pub generation_service: Box<Service>,
138 pub persist_pubsub_service: Box<Service>,
139 pub environmentd_certificate: Box<Option<Certificate>>,
140 pub environmentd_statefulset: Box<StatefulSet>,
141 pub connection_info: Box<ConnectionInfo>,
142}
143
144impl Resources {
145 pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
146 let environmentd_network_policies = create_environmentd_network_policies(config, mz);
147
148 let service_account = Box::new(create_service_account_object(config, mz));
149 let role = Box::new(create_role_object(mz));
150 let role_binding = Box::new(create_role_binding_object(mz));
151 let public_service = Box::new(create_public_service_object(config, mz, generation));
152 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
153 let persist_pubsub_service =
154 Box::new(create_persist_pubsub_service(config, mz, generation));
155 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
156 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157 config, mz, generation,
158 ));
159 let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161 Self {
162 generation,
163 environmentd_network_policies,
164 service_account,
165 role,
166 role_binding,
167 public_service,
168 generation_service,
169 persist_pubsub_service,
170 environmentd_certificate,
171 environmentd_statefulset,
172 connection_info,
173 }
174 }
175
176 #[instrument]
177 pub async fn apply(
178 &self,
179 client: &Client,
180 force_promote: bool,
181 namespace: &str,
182 ) -> Result<Option<Action>, Error> {
183 let environmentd_network_policy_api: Api<NetworkPolicy> =
184 Api::namespaced(client.clone(), namespace);
185 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
186 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
187 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
188 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
189 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
190 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
191 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
192
193 for policy in &self.environmentd_network_policies {
194 trace!("applying network policy {}", policy.name_unchecked());
195 apply_resource(&environmentd_network_policy_api, policy).await?;
196 }
197
198 if let Some(service_account) = &*self.service_account {
199 trace!("applying environmentd service account");
200 apply_resource(&service_account_api, service_account).await?;
201 }
202
203 trace!("applying environmentd role");
204 apply_resource(&role_api, &*self.role).await?;
205
206 trace!("applying environmentd role binding");
207 apply_resource(&role_binding_api, &*self.role_binding).await?;
208
209 trace!("applying environmentd per-generation service");
210 apply_resource(&service_api, &*self.generation_service).await?;
211
212 trace!("creating persist pubsub service");
213 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
214
215 if let Some(certificate) = &*self.environmentd_certificate {
216 trace!("creating new environmentd certificate");
217 apply_resource(&certificate_api, certificate).await?;
218 }
219
220 trace!("applying listeners configmap");
221 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
222
223 trace!("creating new environmentd statefulset");
224 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
225
226 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
227
228 let statefulset = get_resource(
229 &statefulset_api,
230 &self.environmentd_statefulset.name_unchecked(),
231 )
232 .await?;
233 if statefulset
234 .and_then(|statefulset| statefulset.status)
235 .and_then(|status| status.ready_replicas)
236 .unwrap_or(0)
237 == 0
238 {
239 trace!("environmentd statefulset is not ready yet...");
240 return Ok(Some(retry_action));
241 }
242
243 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
244 return Ok(Some(retry_action));
245 };
246 let status_url = reqwest::Url::parse(&format!(
247 "{}/api/leader/status",
248 self.connection_info.environmentd_url,
249 ))
250 .unwrap();
251
252 match http_client.get(status_url.clone()).send().await {
253 Ok(response) => {
254 let response: GetLeaderStatusResponse = match response.error_for_status() {
255 Ok(response) => response.json().await?,
256 Err(e) => {
257 trace!("failed to get status of environmentd, retrying... ({e})");
258 return Ok(Some(retry_action));
259 }
260 };
261 if force_promote {
262 trace!("skipping cluster catchup");
263 let skip_catchup_url = reqwest::Url::parse(&format!(
264 "{}/api/leader/skip-catchup",
265 self.connection_info.environmentd_url,
266 ))
267 .unwrap();
268 let response = http_client.post(skip_catchup_url).send().await?;
269 if response.status() == StatusCode::BAD_REQUEST {
270 let err: SkipCatchupError = response.json().await?;
271 return Err(
272 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
273 );
274 }
275 } else {
276 match response.status {
277 DeploymentStatus::Initializing => {
278 trace!("environmentd is still initializing, retrying...");
279 return Ok(Some(retry_action));
280 }
281 DeploymentStatus::ReadyToPromote
282 | DeploymentStatus::Promoting
283 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
284 }
285 }
286 }
287 Err(e) => {
288 trace!("failed to connect to environmentd, retrying... ({e})");
289 return Ok(Some(retry_action));
290 }
291 }
292
293 Ok(None)
294 }
295
296 #[instrument]
297 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
298 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
299 Some(match &self.connection_info.mz_system_secret_name {
300 Some(mz_system_secret_name) => {
301 let http_client = reqwest::Client::builder()
302 .timeout(std::time::Duration::from_secs(10))
303 .cookie_store(true)
304 .danger_accept_invalid_certs(true)
306 .build()
307 .unwrap();
308
309 let secret = secret_api
310 .get(mz_system_secret_name)
311 .await
312 .map_err(|e| {
313 error!("Failed to get backend secret: {:?}", e);
314 e
315 })
316 .ok()?;
317 if let Some(data) = secret.data {
318 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
319 let password = String::from_utf8_lossy(&password.0).to_string();
320 let login_url = reqwest::Url::parse(&format!(
321 "{}/api/login",
322 self.connection_info.environmentd_url,
323 ))
324 .unwrap();
325 match http_client
326 .post(login_url)
327 .body(
328 serde_json::to_string(&LoginCredentials {
329 username: "mz_system".to_owned(),
330 password,
331 })
332 .expect(
333 "Serializing a simple struct with utf8 strings doesn't fail.",
334 ),
335 )
336 .header("Content-Type", "application/json")
337 .send()
338 .await
339 {
340 Ok(response) => {
341 if let Err(e) = response.error_for_status() {
342 trace!("failed to login to environmentd, retrying... ({e})");
343 return None;
344 }
345 }
346 Err(e) => {
347 trace!("failed to connect to environmentd, retrying... ({e})");
348 return None;
349 }
350 };
351 }
352 };
353 http_client
354 }
355 None => reqwest::Client::builder()
356 .timeout(std::time::Duration::from_secs(10))
357 .build()
358 .unwrap(),
359 })
360 }
361
362 #[instrument]
363 pub async fn promote_services(
364 &self,
365 client: &Client,
366 namespace: &str,
367 ) -> Result<Option<Action>, Error> {
368 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
369 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
370
371 let promote_url = reqwest::Url::parse(&format!(
372 "{}/api/leader/promote",
373 self.connection_info.environmentd_url,
374 ))
375 .unwrap();
376
377 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
378 return Ok(Some(retry_action));
379 };
380
381 trace!("promoting new environmentd to leader");
382 let response = http_client.post(promote_url).send().await?;
383 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
384 if let BecomeLeaderResult::Failure { message } = response.result {
385 return Err(Error::Anyhow(anyhow::anyhow!(
386 "failed to promote new environmentd: {message}"
387 )));
388 }
389
390 let status_url = reqwest::Url::parse(&format!(
402 "{}/api/leader/status",
403 self.connection_info.environmentd_url,
404 ))
405 .unwrap();
406 match http_client.get(status_url.clone()).send().await {
407 Ok(response) => {
408 let response: GetLeaderStatusResponse = response.json().await?;
409 if response.status != DeploymentStatus::IsLeader {
410 trace!(
411 "environmentd is still promoting (status: {:?}), retrying...",
412 response.status
413 );
414 return Ok(Some(retry_action));
415 } else {
416 trace!("environmentd is ready");
417 }
418 }
419 Err(e) => {
420 trace!("failed to connect to environmentd, retrying... ({e})");
421 return Ok(Some(retry_action));
422 }
423 }
424
425 trace!("applying environmentd public service");
426 apply_resource(&service_api, &*self.public_service).await?;
427
428 Ok(None)
429 }
430
431 #[instrument]
432 pub async fn teardown_generation(
433 &self,
434 client: &Client,
435 mz: &Materialize,
436 generation: u64,
437 ) -> Result<(), anyhow::Error> {
438 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
439 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
440 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
441
442 trace!("deleting environmentd statefulset for generation {generation}");
443 delete_resource(
444 &statefulset_api,
445 &mz.environmentd_statefulset_name(generation),
446 )
447 .await?;
448
449 trace!("deleting persist pubsub service for generation {generation}");
450 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
451
452 trace!("deleting environmentd per-generation service for generation {generation}");
453 delete_resource(
454 &service_api,
455 &mz.environmentd_generation_service_name(generation),
456 )
457 .await?;
458
459 trace!("deleting listeners configmap for generation {generation}");
460 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
461
462 Ok(())
463 }
464
465 pub fn generate_hash(&self) -> String {
468 let mut hasher = Sha256::new();
469 hasher.update(&serde_json::to_string(self).unwrap());
470 format!("{:x}", hasher.finalize())
471 }
472}
473
474fn create_environmentd_network_policies(
475 config: &super::Config,
476 mz: &Materialize,
477) -> Vec<NetworkPolicy> {
478 let mut network_policies = Vec::new();
479 if config.network_policies_internal_enabled {
480 let environmentd_label_selector = LabelSelector {
481 match_labels: Some(
482 mz.default_labels()
483 .into_iter()
484 .chain([(
485 "materialize.cloud/app".to_owned(),
486 mz.environmentd_app_name(),
487 )])
488 .collect(),
489 ),
490 ..Default::default()
491 };
492 let orchestratord_label_selector = LabelSelector {
493 match_labels: Some(
494 config
495 .orchestratord_pod_selector_labels
496 .iter()
497 .cloned()
498 .map(|kv| (kv.key, kv.value))
499 .collect(),
500 ),
501 ..Default::default()
502 };
503 let all_pods_label_selector = LabelSelector {
506 match_labels: Some(
512 [(
513 "materialize.cloud/mz-resource-id".to_owned(),
514 mz.resource_id().to_owned(),
515 )]
516 .into(),
517 ),
518 ..Default::default()
519 };
520 network_policies.extend([
521 NetworkPolicy {
524 metadata: mz
525 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
526 spec: Some(NetworkPolicySpec {
527 egress: Some(vec![NetworkPolicyEgressRule {
528 to: Some(vec![NetworkPolicyPeer {
529 pod_selector: Some(all_pods_label_selector.clone()),
530 ..Default::default()
531 }]),
532 ..Default::default()
533 }]),
534 ingress: Some(vec![NetworkPolicyIngressRule {
535 from: Some(vec![NetworkPolicyPeer {
536 pod_selector: Some(all_pods_label_selector.clone()),
537 ..Default::default()
538 }]),
539 ..Default::default()
540 }]),
541 pod_selector: Some(all_pods_label_selector.clone()),
542 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
543 ..Default::default()
544 }),
545 },
546 NetworkPolicy {
549 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
550 spec: Some(NetworkPolicySpec {
551 ingress: Some(vec![NetworkPolicyIngressRule {
552 from: Some(vec![NetworkPolicyPeer {
553 namespace_selector: Some(LabelSelector {
554 match_labels: Some(btreemap! {
555 "kubernetes.io/metadata.name".into()
556 => config.orchestratord_namespace.clone(),
557 }),
558 ..Default::default()
559 }),
560 pod_selector: Some(orchestratord_label_selector),
561 ..Default::default()
562 }]),
563 ports: Some(vec![
564 NetworkPolicyPort {
565 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
566 protocol: Some("TCP".to_string()),
567 ..Default::default()
568 },
569 NetworkPolicyPort {
570 port: Some(IntOrString::Int(
571 config.environmentd_internal_http_port.into(),
572 )),
573 protocol: Some("TCP".to_string()),
574 ..Default::default()
575 },
576 ]),
577 ..Default::default()
578 }]),
579 pod_selector: Some(environmentd_label_selector),
580 policy_types: Some(vec!["Ingress".to_owned()]),
581 ..Default::default()
582 }),
583 },
584 ]);
585 }
586 if config.network_policies_ingress_enabled {
587 let mut ingress_label_selector = mz.default_labels();
588 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
589 network_policies.extend([NetworkPolicy {
590 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
591 spec: Some(NetworkPolicySpec {
592 ingress: Some(vec![NetworkPolicyIngressRule {
593 from: Some(
594 config
595 .network_policies_ingress_cidrs
596 .iter()
597 .map(|cidr| NetworkPolicyPeer {
598 ip_block: Some(IPBlock {
599 cidr: cidr.to_owned(),
600 except: None,
601 }),
602 ..Default::default()
603 })
604 .collect(),
605 ),
606 ports: Some(vec![
607 NetworkPolicyPort {
608 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
609 protocol: Some("TCP".to_string()),
610 ..Default::default()
611 },
612 NetworkPolicyPort {
613 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
614 protocol: Some("TCP".to_string()),
615 ..Default::default()
616 },
617 ]),
618 ..Default::default()
619 }]),
620 pod_selector: Some(LabelSelector {
621 match_expressions: None,
622 match_labels: Some(ingress_label_selector),
623 }),
624 policy_types: Some(vec!["Ingress".to_owned()]),
625 ..Default::default()
626 }),
627 }]);
628 }
629 if config.network_policies_egress_enabled {
630 network_policies.extend([NetworkPolicy {
631 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
632 spec: Some(NetworkPolicySpec {
633 egress: Some(vec![NetworkPolicyEgressRule {
634 to: Some(
635 config
636 .network_policies_egress_cidrs
637 .iter()
638 .map(|cidr| NetworkPolicyPeer {
639 ip_block: Some(IPBlock {
640 cidr: cidr.to_owned(),
641 except: None,
642 }),
643 ..Default::default()
644 })
645 .collect(),
646 ),
647 ..Default::default()
648 }]),
649 pod_selector: Some(LabelSelector {
650 match_expressions: None,
651 match_labels: Some(mz.default_labels()),
652 }),
653 policy_types: Some(vec!["Egress".to_owned()]),
654 ..Default::default()
655 }),
656 }]);
657 }
658 network_policies
659}
660
661fn create_service_account_object(
662 config: &super::Config,
663 mz: &Materialize,
664) -> Option<ServiceAccount> {
665 if mz.create_service_account() {
666 let mut annotations: BTreeMap<String, String> = mz
667 .spec
668 .service_account_annotations
669 .clone()
670 .unwrap_or_default();
671 if let (CloudProvider::Aws, Some(role_arn)) = (
672 config.cloud_provider,
673 mz.spec
674 .environmentd_iam_role_arn
675 .as_deref()
676 .or(config.environmentd_iam_role_arn.as_deref()),
677 ) {
678 warn!(
679 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
680 );
681 annotations.insert(
682 "eks.amazonaws.com/role-arn".to_string(),
683 role_arn.to_string(),
684 );
685 };
686
687 let mut labels = mz.default_labels();
688 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
689
690 Some(ServiceAccount {
691 metadata: ObjectMeta {
692 annotations: Some(annotations),
693 labels: Some(labels),
694 ..mz.managed_resource_meta(mz.service_account_name())
695 },
696 ..Default::default()
697 })
698 } else {
699 None
700 }
701}
702
703fn create_role_object(mz: &Materialize) -> Role {
704 Role {
705 metadata: mz.managed_resource_meta(mz.role_name()),
706 rules: Some(vec![
707 PolicyRule {
708 api_groups: Some(vec!["apps".to_string()]),
709 resources: Some(vec!["statefulsets".to_string()]),
710 verbs: vec![
711 "get".to_string(),
712 "list".to_string(),
713 "watch".to_string(),
714 "create".to_string(),
715 "update".to_string(),
716 "patch".to_string(),
717 "delete".to_string(),
718 ],
719 ..Default::default()
720 },
721 PolicyRule {
722 api_groups: Some(vec!["".to_string()]),
723 resources: Some(vec![
724 "persistentvolumeclaims".to_string(),
725 "pods".to_string(),
726 "secrets".to_string(),
727 "services".to_string(),
728 ]),
729 verbs: vec![
730 "get".to_string(),
731 "list".to_string(),
732 "watch".to_string(),
733 "create".to_string(),
734 "update".to_string(),
735 "patch".to_string(),
736 "delete".to_string(),
737 ],
738 ..Default::default()
739 },
740 PolicyRule {
741 api_groups: Some(vec!["".to_string()]),
742 resources: Some(vec!["configmaps".to_string()]),
743 verbs: vec!["get".to_string()],
744 ..Default::default()
745 },
746 PolicyRule {
747 api_groups: Some(vec!["materialize.cloud".to_string()]),
748 resources: Some(vec!["vpcendpoints".to_string()]),
749 verbs: vec![
750 "get".to_string(),
751 "list".to_string(),
752 "watch".to_string(),
753 "create".to_string(),
754 "update".to_string(),
755 "patch".to_string(),
756 "delete".to_string(),
757 ],
758 ..Default::default()
759 },
760 PolicyRule {
761 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
762 resources: Some(vec!["pods".to_string()]),
763 verbs: vec!["get".to_string(), "list".to_string()],
764 ..Default::default()
765 },
766 PolicyRule {
767 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
768 resources: Some(vec![
769 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
770 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
771 ]),
772 verbs: vec!["get".to_string()],
773 ..Default::default()
774 },
775 ]),
776 }
777}
778
779fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
780 RoleBinding {
781 metadata: mz.managed_resource_meta(mz.role_binding_name()),
782 role_ref: RoleRef {
783 api_group: "".to_string(),
784 kind: "Role".to_string(),
785 name: mz.role_name(),
786 },
787 subjects: Some(vec![Subject {
788 api_group: Some("".to_string()),
789 kind: "ServiceAccount".to_string(),
790 name: mz.service_account_name(),
791 namespace: Some(mz.namespace()),
792 }]),
793 }
794}
795
796fn create_public_service_object(
797 config: &super::Config,
798 mz: &Materialize,
799 generation: u64,
800) -> Service {
801 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
802}
803
804fn create_generation_service_object(
805 config: &super::Config,
806 mz: &Materialize,
807 generation: u64,
808) -> Service {
809 create_base_service_object(
810 config,
811 mz,
812 generation,
813 &mz.environmentd_generation_service_name(generation),
814 )
815}
816
817fn create_base_service_object(
818 config: &super::Config,
819 mz: &Materialize,
820 generation: u64,
821 service_name: &str,
822) -> Service {
823 let ports = vec![
824 ServicePort {
825 port: config.environmentd_sql_port.into(),
826 protocol: Some("TCP".to_string()),
827 name: Some("sql".to_string()),
828 ..Default::default()
829 },
830 ServicePort {
831 port: config.environmentd_http_port.into(),
832 protocol: Some("TCP".to_string()),
833 name: Some("https".to_string()),
834 ..Default::default()
835 },
836 ServicePort {
837 port: config.environmentd_internal_sql_port.into(),
838 protocol: Some("TCP".to_string()),
839 name: Some("internal-sql".to_string()),
840 ..Default::default()
841 },
842 ServicePort {
843 port: config.environmentd_internal_http_port.into(),
844 protocol: Some("TCP".to_string()),
845 name: Some("internal-http".to_string()),
846 ..Default::default()
847 },
848 ];
849
850 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
851
852 let spec = ServiceSpec {
853 type_: Some("ClusterIP".to_string()),
854 cluster_ip: Some("None".to_string()),
855 selector: Some(selector),
856 ports: Some(ports),
857 ..Default::default()
858 };
859
860 Service {
861 metadata: mz.managed_resource_meta(service_name.to_string()),
862 spec: Some(spec),
863 status: None,
864 }
865}
866
867fn create_persist_pubsub_service(
868 config: &super::Config,
869 mz: &Materialize,
870 generation: u64,
871) -> Service {
872 Service {
873 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
874 spec: Some(ServiceSpec {
875 type_: Some("ClusterIP".to_string()),
876 cluster_ip: Some("None".to_string()),
877 selector: Some(btreemap! {
878 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
879 }),
880 ports: Some(vec![ServicePort {
881 name: Some("grpc".to_string()),
882 protocol: Some("TCP".to_string()),
883 port: config.environmentd_internal_persist_pubsub_port.into(),
884 ..Default::default()
885 }]),
886 ..Default::default()
887 }),
888 status: None,
889 }
890}
891
892fn create_environmentd_certificate(
893 config: &super::Config,
894 mz: &Materialize,
895) -> Option<Certificate> {
896 create_certificate(
897 config.default_certificate_specs.internal.clone(),
898 mz,
899 mz.spec.internal_certificate_spec.clone(),
900 mz.environmentd_certificate_name(),
901 mz.environmentd_certificate_secret_name(),
902 Some(vec![
903 mz.environmentd_service_name(),
904 mz.environmentd_service_internal_fqdn(),
905 ]),
906 CertificatePrivateKeyAlgorithm::Ed25519,
907 None,
908 )
909}
910
911fn create_environmentd_statefulset_object(
912 config: &super::Config,
913 mz: &Materialize,
914 generation: u64,
915) -> StatefulSet {
916 let mut env = vec![
926 EnvVar {
927 name: "MZ_METADATA_BACKEND_URL".to_string(),
928 value_from: Some(EnvVarSource {
929 secret_key_ref: Some(SecretKeySelector {
930 name: mz.backend_secret_name(),
931 key: "metadata_backend_url".to_string(),
932 optional: Some(false),
933 }),
934 ..Default::default()
935 }),
936 ..Default::default()
937 },
938 EnvVar {
939 name: "MZ_PERSIST_BLOB_URL".to_string(),
940 value_from: Some(EnvVarSource {
941 secret_key_ref: Some(SecretKeySelector {
942 name: mz.backend_secret_name(),
943 key: "persist_backend_url".to_string(),
944 optional: Some(false),
945 }),
946 ..Default::default()
947 }),
948 ..Default::default()
949 },
950 ];
951
952 env.push(EnvVar {
953 name: "AWS_REGION".to_string(),
954 value: Some(config.region.clone()),
955 ..Default::default()
956 });
957
958 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
959
960 let mut args = vec![];
961
962 if let Some(helm_chart_version) = &config.helm_chart_version {
963 args.push(format!("--helm-chart-version={helm_chart_version}"));
964 }
965
966 args.push(format!(
968 "--environment-id={}",
969 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
970 ));
971
972 args.push(format!(
974 "--clusterd-image={}",
975 matching_image_from_environmentd_image_ref(
976 &mz.spec.environmentd_image_ref,
977 "clusterd",
978 None
979 )
980 ));
981
982 args.extend(
984 [
985 config
986 .environmentd_cluster_replica_sizes
987 .as_ref()
988 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
989 config
990 .bootstrap_default_cluster_replica_size
991 .as_ref()
992 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
993 config
994 .bootstrap_builtin_system_cluster_replica_size
995 .as_ref()
996 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
997 config
998 .bootstrap_builtin_probe_cluster_replica_size
999 .as_ref()
1000 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
1001 config
1002 .bootstrap_builtin_support_cluster_replica_size
1003 .as_ref()
1004 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
1005 config
1006 .bootstrap_builtin_catalog_server_cluster_replica_size
1007 .as_ref()
1008 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1009 config
1010 .bootstrap_builtin_analytics_cluster_replica_size
1011 .as_ref()
1012 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1013 config
1014 .bootstrap_builtin_system_cluster_replication_factor
1015 .as_ref()
1016 .map(|replication_factor| {
1017 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1018 }),
1019 config
1020 .bootstrap_builtin_probe_cluster_replication_factor
1021 .as_ref()
1022 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1023 config
1024 .bootstrap_builtin_support_cluster_replication_factor
1025 .as_ref()
1026 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1027 config
1028 .bootstrap_builtin_analytics_cluster_replication_factor
1029 .as_ref()
1030 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1031 ]
1032 .into_iter()
1033 .flatten(),
1034 );
1035
1036 args.extend(
1037 config
1038 .environmentd_allowed_origins
1039 .iter()
1040 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1041 );
1042
1043 args.push(format!(
1044 "--secrets-controller={}",
1045 config.secrets_controller
1046 ));
1047
1048 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1049 if let Ok(cluster_replica_sizes) =
1050 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1051 {
1052 let cluster_replica_sizes: Vec<_> =
1053 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1054 args.push(format!(
1055 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1056 cluster_replica_sizes.join("', '")
1057 ));
1058 }
1059 }
1060 if !config.cloud_provider.is_cloud() {
1061 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1062 }
1063
1064 if config.enable_internal_statement_logging {
1065 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1066 }
1067
1068 if config.disable_statement_logging {
1069 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1070 }
1071
1072 if !mz.spec.enable_rbac {
1073 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1074 }
1075
1076 args.push("--persist-isolated-runtime-threads=-1".to_string());
1080
1081 if config.cloud_provider == CloudProvider::Aws {
1083 if let Some(azs) = config.environmentd_availability_zones.as_ref() {
1084 for az in azs {
1085 args.push(format!("--availability-zone={az}"));
1086 }
1087 }
1088
1089 if let Some(environmentd_connection_role_arn) = mz
1090 .spec
1091 .environmentd_connection_role_arn
1092 .as_deref()
1093 .or(config.environmentd_connection_role_arn.as_deref())
1094 {
1095 args.push(format!(
1096 "--aws-connection-role-arn={}",
1097 environmentd_connection_role_arn
1098 ));
1099 }
1100 if let Some(account_id) = &config.aws_account_id {
1101 args.push(format!("--aws-account-id={account_id}"));
1102 }
1103
1104 args.extend([format!(
1105 "--aws-secrets-controller-tags=Environment={}",
1106 mz.name_unchecked()
1107 )]);
1108 args.extend_from_slice(&config.aws_secrets_controller_tags);
1109 }
1110
1111 args.extend([
1113 "--orchestrator=kubernetes".into(),
1114 format!(
1115 "--orchestrator-kubernetes-service-account={}",
1116 &mz.service_account_name()
1117 ),
1118 format!(
1119 "--orchestrator-kubernetes-image-pull-policy={}",
1120 config.image_pull_policy.as_kebab_case_str(),
1121 ),
1122 ]);
1123 for selector in &config.clusterd_node_selector {
1124 args.push(format!(
1125 "--orchestrator-kubernetes-service-node-selector={}={}",
1126 selector.key, selector.value,
1127 ));
1128 }
1129 if mz.meets_minimum_version(&V144) {
1130 if let Some(affinity) = &config.clusterd_affinity {
1131 let affinity = serde_json::to_string(affinity).unwrap();
1132 args.push(format!(
1133 "--orchestrator-kubernetes-service-affinity={affinity}"
1134 ))
1135 }
1136 if let Some(tolerations) = &config.clusterd_tolerations {
1137 let tolerations = serde_json::to_string(tolerations).unwrap();
1138 args.push(format!(
1139 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1140 ))
1141 }
1142 }
1143 if let Some(scheduler_name) = &config.scheduler_name {
1144 args.push(format!(
1145 "--orchestrator-kubernetes-scheduler-name={}",
1146 scheduler_name
1147 ));
1148 }
1149 if mz.meets_minimum_version(&V154_DEV0) {
1150 args.extend(
1151 mz.spec
1152 .pod_annotations
1153 .as_ref()
1154 .map(|annotations| annotations.iter())
1155 .unwrap_or_default()
1156 .map(|(key, val)| {
1157 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1158 }),
1159 );
1160 }
1161 args.extend(
1162 mz.default_labels()
1163 .iter()
1164 .chain(
1165 mz.spec
1166 .pod_labels
1167 .as_ref()
1168 .map(|labels| labels.iter())
1169 .unwrap_or_default(),
1170 )
1171 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1172 );
1173 if let Some(status) = &mz.status {
1174 args.push(format!(
1175 "--orchestrator-kubernetes-name-prefix=mz{}-",
1176 status.resource_id
1177 ));
1178 }
1179
1180 args.extend(["--log-format=json".into()]);
1182 if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
1183 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1184 }
1185 args.extend([
1187 format!(
1188 "--opentelemetry-resource=organization_id={}",
1189 mz.spec.environment_id
1190 ),
1191 format!(
1192 "--opentelemetry-resource=environment_name={}",
1193 mz.name_unchecked()
1194 ),
1195 ]);
1196
1197 if let Some(segment_api_key) = &config.segment_api_key {
1198 args.push(format!("--segment-api-key={}", segment_api_key));
1199 if config.segment_client_side {
1200 args.push("--segment-client-side".into());
1201 }
1202 }
1203
1204 let mut volumes = Vec::new();
1205 let mut volume_mounts = Vec::new();
1206 if issuer_ref_defined(
1207 &config.default_certificate_specs.internal,
1208 &mz.spec.internal_certificate_spec,
1209 ) {
1210 volumes.push(Volume {
1211 name: "certificate".to_owned(),
1212 secret: Some(SecretVolumeSource {
1213 default_mode: Some(0o400),
1214 secret_name: Some(mz.environmentd_certificate_secret_name()),
1215 items: None,
1216 optional: Some(false),
1217 }),
1218 ..Default::default()
1219 });
1220 volume_mounts.push(VolumeMount {
1221 name: "certificate".to_owned(),
1222 mount_path: "/etc/materialized".to_owned(),
1223 read_only: Some(true),
1224 ..Default::default()
1225 });
1226 args.extend([
1227 "--tls-mode=require".into(),
1228 "--tls-cert=/etc/materialized/tls.crt".into(),
1229 "--tls-key=/etc/materialized/tls.key".into(),
1230 ]);
1231 } else {
1232 args.push("--tls-mode=disable".to_string());
1233 }
1234 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1235 args.push(format!(
1236 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1237 ephemeral_volume_class
1238 ));
1239 }
1240 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1242
1243 if mz.meets_minimum_version(&V26_1_0) {
1247 if let Some(ref name) = mz.spec.system_parameter_configmap_name {
1248 volumes.push(Volume {
1249 name: "system-params".to_string(),
1250 config_map: Some(ConfigMapVolumeSource {
1251 default_mode: Some(0o400),
1252 name: name.to_owned(),
1253 items: None,
1254 optional: Some(true),
1255 }),
1256 ..Default::default()
1257 });
1258 volume_mounts.push(VolumeMount {
1259 name: "system-params".to_string(),
1260 mount_path: "/system-params".to_owned(),
1262 read_only: Some(true),
1263 ..Default::default()
1264 });
1265 args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
1266 args.push("--config-sync-loop-interval=1s".to_string());
1267 }
1268 }
1269
1270 if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
1272 args.push(format!("--sentry-dsn={}", sentry_dsn));
1273 if let Some(sentry_environment) = &config.tracing.sentry_environment {
1274 args.push(format!("--sentry-environment={}", sentry_environment));
1275 }
1276 args.push(format!("--sentry-tag=region={}", config.region));
1277 }
1278
1279 args.push(format!(
1281 "--persist-pubsub-url=http://{}:{}",
1282 mz.persist_pubsub_service_name(generation),
1283 config.environmentd_internal_persist_pubsub_port,
1284 ));
1285 args.push(format!(
1286 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1287 config.environmentd_internal_persist_pubsub_port
1288 ));
1289
1290 args.push(format!("--deploy-generation={}", generation));
1291
1292 args.push(format!(
1294 "--internal-console-redirect-url={}",
1295 &config.internal_console_proxy_url,
1296 ));
1297
1298 if !config.collect_pod_metrics {
1299 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1300 }
1301 if config.enable_prometheus_scrape_annotations {
1302 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1303 }
1304
1305 if config.disable_license_key_checks {
1308 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1309 args.push("--disable-license-key-checks".into());
1310 }
1311 }
1312
1313 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1316 || mz.meets_minimum_version(&V153)
1317 {
1318 volume_mounts.push(VolumeMount {
1319 name: "license-key".to_string(),
1320 mount_path: "/license_key".to_string(),
1321 ..Default::default()
1322 });
1323 volumes.push(Volume {
1324 name: "license-key".to_string(),
1325 secret: Some(SecretVolumeSource {
1326 default_mode: Some(256),
1327 optional: Some(false),
1328 secret_name: Some(mz.backend_secret_name()),
1329 items: Some(vec![KeyToPath {
1330 key: "license_key".to_string(),
1331 path: "license_key".to_string(),
1332 ..Default::default()
1333 }]),
1334 ..Default::default()
1335 }),
1336 ..Default::default()
1337 });
1338 env.push(EnvVar {
1339 name: "MZ_LICENSE_KEY".to_string(),
1340 value: Some("/license_key/license_key".to_string()),
1341 ..Default::default()
1342 });
1343 }
1344
1345 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1347 args.extend(extra_args.iter().cloned());
1348 }
1349
1350 let probe = Probe {
1351 initial_delay_seconds: Some(1),
1352 failure_threshold: Some(12),
1353 tcp_socket: Some(TCPSocketAction {
1354 host: None,
1355 port: IntOrString::Int(config.environmentd_sql_port.into()),
1356 }),
1357 ..Default::default()
1358 };
1359
1360 let security_context = if config.enable_security_context {
1361 Some(SecurityContext {
1365 run_as_non_root: Some(true),
1366 capabilities: Some(Capabilities {
1367 drop: Some(vec!["ALL".to_string()]),
1368 ..Default::default()
1369 }),
1370 seccomp_profile: Some(SeccompProfile {
1371 type_: "RuntimeDefault".to_string(),
1372 ..Default::default()
1373 }),
1374 allow_privilege_escalation: Some(false),
1375 ..Default::default()
1376 })
1377 } else {
1378 None
1379 };
1380
1381 let ports = vec![
1382 ContainerPort {
1383 container_port: config.environmentd_sql_port.into(),
1384 name: Some("sql".to_owned()),
1385 ..Default::default()
1386 },
1387 ContainerPort {
1388 container_port: config.environmentd_internal_sql_port.into(),
1389 name: Some("internal-sql".to_owned()),
1390 ..Default::default()
1391 },
1392 ContainerPort {
1393 container_port: config.environmentd_http_port.into(),
1394 name: Some("http".to_owned()),
1395 ..Default::default()
1396 },
1397 ContainerPort {
1398 container_port: config.environmentd_internal_http_port.into(),
1399 name: Some("internal-http".to_owned()),
1400 ..Default::default()
1401 },
1402 ContainerPort {
1403 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1404 name: Some("persist-pubsub".to_owned()),
1405 ..Default::default()
1406 },
1407 ];
1408
1409 if mz.meets_minimum_version(&V147_DEV0) {
1411 volume_mounts.push(VolumeMount {
1412 name: "listeners-configmap".to_string(),
1413 mount_path: "/listeners".to_string(),
1414 ..Default::default()
1415 });
1416 volumes.push(Volume {
1417 name: "listeners-configmap".to_string(),
1418 config_map: Some(ConfigMapVolumeSource {
1419 name: mz.listeners_configmap_name(generation),
1420 default_mode: Some(256),
1421 optional: Some(false),
1422 items: Some(vec![KeyToPath {
1423 key: "listeners.json".to_string(),
1424 path: "listeners.json".to_string(),
1425 ..Default::default()
1426 }]),
1427 }),
1428 ..Default::default()
1429 });
1430 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1431 if matches!(
1432 mz.spec.authenticator_kind,
1433 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1434 ) {
1435 args.push("--system-parameter-default=enable_password_auth=true".into());
1436 env.push(EnvVar {
1437 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1438 value_from: Some(EnvVarSource {
1439 secret_key_ref: Some(SecretKeySelector {
1440 name: mz.backend_secret_name(),
1441 key: "external_login_password_mz_system".to_string(),
1442 optional: Some(false),
1443 }),
1444 ..Default::default()
1445 }),
1446 ..Default::default()
1447 })
1448 }
1449 } else {
1450 args.extend([
1451 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1452 format!(
1453 "--http-listen-addr=0.0.0.0:{}",
1454 config.environmentd_http_port
1455 ),
1456 format!(
1457 "--internal-sql-listen-addr=0.0.0.0:{}",
1458 config.environmentd_internal_sql_port
1459 ),
1460 format!(
1461 "--internal-http-listen-addr=0.0.0.0:{}",
1462 config.environmentd_internal_http_port
1463 ),
1464 ]);
1465 }
1466
1467 let container = Container {
1468 name: "environmentd".to_owned(),
1469 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1470 image_pull_policy: Some(config.image_pull_policy.to_string()),
1471 ports: Some(ports),
1472 args: Some(args),
1473 env: Some(env),
1474 volume_mounts: Some(volume_mounts),
1475 liveness_probe: Some(probe.clone()),
1476 readiness_probe: Some(probe),
1477 resources: mz
1478 .spec
1479 .environmentd_resource_requirements
1480 .clone()
1481 .or_else(|| config.environmentd_default_resources.clone()),
1482 security_context: security_context.clone(),
1483 ..Default::default()
1484 };
1485
1486 let mut pod_template_labels = mz.default_labels();
1487 pod_template_labels.insert(
1488 "materialize.cloud/name".to_owned(),
1489 mz.environmentd_statefulset_name(generation),
1490 );
1491 pod_template_labels.insert(
1492 "materialize.cloud/app".to_owned(),
1493 mz.environmentd_app_name(),
1494 );
1495 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1496 pod_template_labels.extend(
1497 mz.spec
1498 .pod_labels
1499 .as_ref()
1500 .map(|labels| labels.iter())
1501 .unwrap_or_default()
1502 .map(|(key, value)| (key.clone(), value.clone())),
1503 );
1504
1505 let mut pod_template_annotations = btreemap! {
1506 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1508
1509 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1511 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1512 "materialize.cloud/generation".to_owned() => generation.to_string(),
1513 };
1514 if config.enable_prometheus_scrape_annotations {
1515 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1516 pod_template_annotations.insert(
1517 "prometheus.io/port".to_owned(),
1518 config.environmentd_internal_http_port.to_string(),
1519 );
1520 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1521 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1522 pod_template_annotations.insert(
1523 "materialize.prometheus.io/mz_usage_path".to_owned(),
1524 "/metrics/mz_usage".to_string(),
1525 );
1526 pod_template_annotations.insert(
1527 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1528 "/metrics/mz_frontier".to_string(),
1529 );
1530 pod_template_annotations.insert(
1531 "materialize.prometheus.io/mz_compute_path".to_owned(),
1532 "/metrics/mz_compute".to_string(),
1533 );
1534 pod_template_annotations.insert(
1535 "materialize.prometheus.io/mz_storage_path".to_owned(),
1536 "/metrics/mz_storage".to_string(),
1537 );
1538 }
1539 pod_template_annotations.extend(
1540 mz.spec
1541 .pod_annotations
1542 .as_ref()
1543 .map(|annotations| annotations.iter())
1544 .unwrap_or_default()
1545 .map(|(key, value)| (key.clone(), value.clone())),
1546 );
1547
1548 let mut tolerations = vec![
1549 Toleration {
1553 effect: Some("NoExecute".into()),
1554 key: Some("node.kubernetes.io/not-ready".into()),
1555 operator: Some("Exists".into()),
1556 toleration_seconds: Some(30),
1557 value: None,
1558 },
1559 Toleration {
1560 effect: Some("NoExecute".into()),
1561 key: Some("node.kubernetes.io/unreachable".into()),
1562 operator: Some("Exists".into()),
1563 toleration_seconds: Some(30),
1564 value: None,
1565 },
1566 ];
1567 if let Some(user_tolerations) = &config.environmentd_tolerations {
1568 tolerations.extend(user_tolerations.iter().cloned());
1569 }
1570 let tolerations = Some(tolerations);
1571
1572 let pod_template_spec = PodTemplateSpec {
1573 metadata: Some(ObjectMeta {
1576 labels: Some(pod_template_labels),
1577 annotations: Some(pod_template_annotations), ..Default::default()
1579 }),
1580 spec: Some(PodSpec {
1581 containers: vec![container],
1582 node_selector: Some(
1583 config
1584 .environmentd_node_selector
1585 .iter()
1586 .map(|selector| (selector.key.clone(), selector.value.clone()))
1587 .collect(),
1588 ),
1589 affinity: config.environmentd_affinity.clone(),
1590 scheduler_name: config.scheduler_name.clone(),
1591 service_account_name: Some(mz.service_account_name()),
1592 volumes: Some(volumes),
1593 security_context: Some(PodSecurityContext {
1594 fs_group: Some(999),
1595 run_as_user: Some(999),
1596 run_as_group: Some(999),
1597 ..Default::default()
1598 }),
1599 tolerations,
1600 termination_grace_period_seconds: Some(0),
1621 ..Default::default()
1622 }),
1623 };
1624
1625 let mut match_labels = BTreeMap::new();
1626 match_labels.insert(
1627 "materialize.cloud/name".to_owned(),
1628 mz.environmentd_statefulset_name(generation),
1629 );
1630
1631 let statefulset_spec = StatefulSetSpec {
1632 replicas: Some(1),
1633 template: pod_template_spec,
1634 service_name: Some(mz.environmentd_service_name()),
1635 selector: LabelSelector {
1636 match_expressions: None,
1637 match_labels: Some(match_labels),
1638 },
1639 ..Default::default()
1640 };
1641
1642 StatefulSet {
1643 metadata: ObjectMeta {
1644 annotations: Some(btreemap! {
1645 "materialize.cloud/generation".to_owned() => generation.to_string(),
1646 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1647 }),
1648 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1649 },
1650 spec: Some(statefulset_spec),
1651 status: None,
1652 }
1653}
1654
1655fn create_connection_info(
1656 config: &super::Config,
1657 mz: &Materialize,
1658 generation: u64,
1659) -> ConnectionInfo {
1660 let external_enable_tls = issuer_ref_defined(
1661 &config.default_certificate_specs.internal,
1662 &mz.spec.internal_certificate_spec,
1663 );
1664 let authenticator_kind = mz.spec.authenticator_kind;
1665
1666 let mut listeners_config = ListenersConfig {
1667 sql: btreemap! {
1668 "external".to_owned() => SqlListenerConfig{
1669 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1670 authenticator_kind,
1671 allowed_roles: AllowedRoles::Normal,
1672 enable_tls: external_enable_tls,
1673 },
1674 "internal".to_owned() => SqlListenerConfig{
1675 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1676 authenticator_kind: AuthenticatorKind::None,
1677 allowed_roles: AllowedRoles::NormalAndInternal,
1679 enable_tls: false,
1680 },
1681 },
1682 http: btreemap! {
1683 "external".to_owned() => HttpListenerConfig{
1684 base: BaseListenerConfig {
1685 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1686 authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1690 AuthenticatorKind::Password
1691 } else {
1692 authenticator_kind
1693 },
1694 allowed_roles: AllowedRoles::Normal,
1695 enable_tls: external_enable_tls,
1696 },
1697 routes: HttpRoutesEnabled{
1698 base: true,
1699 webhook: true,
1700 internal: false,
1701 metrics: false,
1702 profiling: false,
1703 }
1704 },
1705 "internal".to_owned() => HttpListenerConfig{
1706 base: BaseListenerConfig {
1707 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1708 authenticator_kind: AuthenticatorKind::None,
1709 allowed_roles: AllowedRoles::NormalAndInternal,
1711 enable_tls: false,
1712 },
1713 routes: HttpRoutesEnabled{
1714 base: true,
1715 webhook: true,
1716 internal: true,
1717 metrics: true,
1718 profiling: true,
1719 }
1720 },
1721 },
1722 };
1723
1724 if matches!(
1725 authenticator_kind,
1726 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1727 ) {
1728 listeners_config.sql.remove("internal");
1729 listeners_config.http.remove("internal");
1730
1731 listeners_config.sql.get_mut("external").map(|listener| {
1732 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1733 listener
1734 });
1735 listeners_config.http.get_mut("external").map(|listener| {
1736 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1737 listener.routes.internal = true;
1738 listener.routes.profiling = true;
1739 listener
1740 });
1741
1742 listeners_config.http.insert(
1743 "metrics".to_owned(),
1744 HttpListenerConfig {
1745 base: BaseListenerConfig {
1746 addr: SocketAddr::new(
1747 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1748 config.environmentd_internal_http_port,
1749 ),
1750 authenticator_kind: AuthenticatorKind::None,
1751 allowed_roles: AllowedRoles::NormalAndInternal,
1752 enable_tls: false,
1753 },
1754 routes: HttpRoutesEnabled {
1755 base: false,
1756 webhook: false,
1757 internal: false,
1758 metrics: true,
1759 profiling: false,
1760 },
1761 },
1762 );
1763 }
1764
1765 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1766 let listeners_configmap = ConfigMap {
1767 binary_data: None,
1768 data: Some(btreemap! {
1769 "listeners.json".to_owned() => listeners_json,
1770 }),
1771 immutable: None,
1772 metadata: ObjectMeta {
1773 annotations: Some(btreemap! {
1774 "materialize.cloud/generation".to_owned() => generation.to_string(),
1775 }),
1776 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1777 },
1778 };
1779
1780 let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1781 AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1782 let scheme = if external_enable_tls { "https" } else { "http" };
1783 (
1784 scheme,
1785 config.environmentd_http_port,
1786 Some(mz.spec.backend_secret_name.clone()),
1787 )
1788 }
1789 _ => ("http", config.environmentd_internal_http_port, None),
1790 };
1791 let environmentd_url = format!(
1792 "{}://{}.{}.svc.cluster.local:{}",
1793 scheme,
1794 mz.environmentd_generation_service_name(generation),
1795 mz.namespace(),
1796 leader_api_port,
1797 );
1798 ConnectionInfo {
1799 environmentd_url,
1800 listeners_configmap,
1801 mz_system_secret_name,
1802 }
1803}
1804
1805#[derive(Debug, Deserialize, PartialEq, Eq)]
1807struct BecomeLeaderResponse {
1808 result: BecomeLeaderResult,
1809}
1810
1811#[derive(Debug, Deserialize, PartialEq, Eq)]
1812enum BecomeLeaderResult {
1813 Success,
1814 Failure { message: String },
1815}
1816
1817#[derive(Debug, Deserialize, PartialEq, Eq)]
1818struct SkipCatchupError {
1819 message: String,
1820}