1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
25 VolumeMount,
26 },
27 networking::v1::{
28 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
29 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
30 },
31 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
32 },
33 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
34};
35use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
36use maplit::btreemap;
37use mz_server_core::listeners::{
38 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
39 ListenersConfig, SqlListenerConfig,
40};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::k8s::{apply_resource, delete_resource, get_resource};
50use crate::tls::{create_certificate, issuer_ref_defined};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
53use mz_cloud_resources::crd::{
54 ManagedResource,
55 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
56};
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 140,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69 major: 0,
70 minor: 147,
71 patch: 0,
72 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 0,
78 minor: 154,
79 patch: 0,
80 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91 Initializing,
94 ReadyToPromote,
96 Promoting,
98 IsLeader,
100}
101
102#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
103pub struct GetLeaderStatusResponse {
104 status: DeploymentStatus,
105}
106
107#[derive(Deserialize, Serialize)]
108pub struct LoginCredentials {
109 username: String,
110 password: String,
112}
113
114#[derive(Debug, Serialize)]
115pub struct ConnectionInfo {
116 pub environmentd_url: String,
117 pub mz_system_secret_name: Option<String>,
118 pub listeners_configmap: ConfigMap,
119}
120
121#[derive(Debug, Serialize)]
122pub struct Resources {
123 pub generation: u64,
124 pub environmentd_network_policies: Vec<NetworkPolicy>,
125 pub service_account: Box<Option<ServiceAccount>>,
126 pub role: Box<Role>,
127 pub role_binding: Box<RoleBinding>,
128 pub public_service: Box<Service>,
129 pub generation_service: Box<Service>,
130 pub persist_pubsub_service: Box<Service>,
131 pub environmentd_certificate: Box<Option<Certificate>>,
132 pub environmentd_statefulset: Box<StatefulSet>,
133 pub connection_info: Box<ConnectionInfo>,
134}
135
136impl Resources {
137 pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
138 let environmentd_network_policies = create_environmentd_network_policies(config, mz);
139
140 let service_account = Box::new(create_service_account_object(config, mz));
141 let role = Box::new(create_role_object(mz));
142 let role_binding = Box::new(create_role_binding_object(mz));
143 let public_service = Box::new(create_public_service_object(config, mz, generation));
144 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
145 let persist_pubsub_service =
146 Box::new(create_persist_pubsub_service(config, mz, generation));
147 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
148 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
149 config, mz, generation,
150 ));
151 let connection_info = Box::new(create_connection_info(config, mz, generation));
152
153 Self {
154 generation,
155 environmentd_network_policies,
156 service_account,
157 role,
158 role_binding,
159 public_service,
160 generation_service,
161 persist_pubsub_service,
162 environmentd_certificate,
163 environmentd_statefulset,
164 connection_info,
165 }
166 }
167
168 #[instrument]
169 pub async fn apply(
170 &self,
171 client: &Client,
172 force_promote: bool,
173 namespace: &str,
174 ) -> Result<Option<Action>, Error> {
175 let environmentd_network_policy_api: Api<NetworkPolicy> =
176 Api::namespaced(client.clone(), namespace);
177 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
178 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
179 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
180 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
181 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
182 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
183 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
184
185 for policy in &self.environmentd_network_policies {
186 trace!("applying network policy {}", policy.name_unchecked());
187 apply_resource(&environmentd_network_policy_api, policy).await?;
188 }
189
190 if let Some(service_account) = &*self.service_account {
191 trace!("applying environmentd service account");
192 apply_resource(&service_account_api, service_account).await?;
193 }
194
195 trace!("applying environmentd role");
196 apply_resource(&role_api, &*self.role).await?;
197
198 trace!("applying environmentd role binding");
199 apply_resource(&role_binding_api, &*self.role_binding).await?;
200
201 trace!("applying environmentd per-generation service");
202 apply_resource(&service_api, &*self.generation_service).await?;
203
204 trace!("creating persist pubsub service");
205 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
206
207 if let Some(certificate) = &*self.environmentd_certificate {
208 trace!("creating new environmentd certificate");
209 apply_resource(&certificate_api, certificate).await?;
210 }
211
212 trace!("applying listeners configmap");
213 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
214
215 trace!("creating new environmentd statefulset");
216 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
217
218 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
219
220 let statefulset = get_resource(
221 &statefulset_api,
222 &self.environmentd_statefulset.name_unchecked(),
223 )
224 .await?;
225 if statefulset
226 .and_then(|statefulset| statefulset.status)
227 .and_then(|status| status.ready_replicas)
228 .unwrap_or(0)
229 == 0
230 {
231 trace!("environmentd statefulset is not ready yet...");
232 return Ok(Some(retry_action));
233 }
234
235 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
236 return Ok(Some(retry_action));
237 };
238 let status_url = reqwest::Url::parse(&format!(
239 "{}/api/leader/status",
240 self.connection_info.environmentd_url,
241 ))
242 .unwrap();
243
244 match http_client.get(status_url.clone()).send().await {
245 Ok(response) => {
246 let response: GetLeaderStatusResponse = match response.error_for_status() {
247 Ok(response) => response.json().await?,
248 Err(e) => {
249 trace!("failed to get status of environmentd, retrying... ({e})");
250 return Ok(Some(retry_action));
251 }
252 };
253 if force_promote {
254 trace!("skipping cluster catchup");
255 let skip_catchup_url = reqwest::Url::parse(&format!(
256 "{}/api/leader/skip-catchup",
257 self.connection_info.environmentd_url,
258 ))
259 .unwrap();
260 let response = http_client.post(skip_catchup_url).send().await?;
261 if response.status() == StatusCode::BAD_REQUEST {
262 let err: SkipCatchupError = response.json().await?;
263 return Err(
264 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
265 );
266 }
267 } else {
268 match response.status {
269 DeploymentStatus::Initializing => {
270 trace!("environmentd is still initializing, retrying...");
271 return Ok(Some(retry_action));
272 }
273 DeploymentStatus::ReadyToPromote
274 | DeploymentStatus::Promoting
275 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
276 }
277 }
278 }
279 Err(e) => {
280 trace!("failed to connect to environmentd, retrying... ({e})");
281 return Ok(Some(retry_action));
282 }
283 }
284
285 Ok(None)
286 }
287
288 #[instrument]
289 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
290 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
291 Some(match &self.connection_info.mz_system_secret_name {
292 Some(mz_system_secret_name) => {
293 let http_client = reqwest::Client::builder()
294 .timeout(std::time::Duration::from_secs(10))
295 .cookie_store(true)
296 .danger_accept_invalid_certs(true)
298 .build()
299 .unwrap();
300
301 let secret = secret_api
302 .get(mz_system_secret_name)
303 .await
304 .map_err(|e| {
305 error!("Failed to get backend secret: {:?}", e);
306 e
307 })
308 .ok()?;
309 if let Some(data) = secret.data {
310 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
311 let password = String::from_utf8_lossy(&password.0).to_string();
312 let login_url = reqwest::Url::parse(&format!(
313 "{}/api/login",
314 self.connection_info.environmentd_url,
315 ))
316 .unwrap();
317 match http_client
318 .post(login_url)
319 .body(
320 serde_json::to_string(&LoginCredentials {
321 username: "mz_system".to_owned(),
322 password,
323 })
324 .expect(
325 "Serializing a simple struct with utf8 strings doesn't fail.",
326 ),
327 )
328 .header("Content-Type", "application/json")
329 .send()
330 .await
331 {
332 Ok(response) => {
333 if let Err(e) = response.error_for_status() {
334 trace!("failed to login to environmentd, retrying... ({e})");
335 return None;
336 }
337 }
338 Err(e) => {
339 trace!("failed to connect to environmentd, retrying... ({e})");
340 return None;
341 }
342 };
343 }
344 };
345 http_client
346 }
347 None => reqwest::Client::builder()
348 .timeout(std::time::Duration::from_secs(10))
349 .build()
350 .unwrap(),
351 })
352 }
353
354 #[instrument]
355 pub async fn promote_services(
356 &self,
357 client: &Client,
358 namespace: &str,
359 ) -> Result<Option<Action>, Error> {
360 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
361 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
362
363 let promote_url = reqwest::Url::parse(&format!(
364 "{}/api/leader/promote",
365 self.connection_info.environmentd_url,
366 ))
367 .unwrap();
368
369 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
370 return Ok(Some(retry_action));
371 };
372
373 trace!("promoting new environmentd to leader");
374 let response = http_client.post(promote_url).send().await?;
375 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
376 if let BecomeLeaderResult::Failure { message } = response.result {
377 return Err(Error::Anyhow(anyhow::anyhow!(
378 "failed to promote new environmentd: {message}"
379 )));
380 }
381
382 let status_url = reqwest::Url::parse(&format!(
394 "{}/api/leader/status",
395 self.connection_info.environmentd_url,
396 ))
397 .unwrap();
398 match http_client.get(status_url.clone()).send().await {
399 Ok(response) => {
400 let response: GetLeaderStatusResponse = response.json().await?;
401 if response.status != DeploymentStatus::IsLeader {
402 trace!(
403 "environmentd is still promoting (status: {:?}), retrying...",
404 response.status
405 );
406 return Ok(Some(retry_action));
407 } else {
408 trace!("environmentd is ready");
409 }
410 }
411 Err(e) => {
412 trace!("failed to connect to environmentd, retrying... ({e})");
413 return Ok(Some(retry_action));
414 }
415 }
416
417 trace!("applying environmentd public service");
418 apply_resource(&service_api, &*self.public_service).await?;
419
420 Ok(None)
421 }
422
423 #[instrument]
424 pub async fn teardown_generation(
425 &self,
426 client: &Client,
427 mz: &Materialize,
428 generation: u64,
429 ) -> Result<(), anyhow::Error> {
430 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
431 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
432 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
433
434 trace!("deleting environmentd statefulset for generation {generation}");
435 delete_resource(
436 &statefulset_api,
437 &mz.environmentd_statefulset_name(generation),
438 )
439 .await?;
440
441 trace!("deleting persist pubsub service for generation {generation}");
442 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
443
444 trace!("deleting environmentd per-generation service for generation {generation}");
445 delete_resource(
446 &service_api,
447 &mz.environmentd_generation_service_name(generation),
448 )
449 .await?;
450
451 trace!("deleting listeners configmap for generation {generation}");
452 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
453
454 Ok(())
455 }
456
457 pub fn generate_hash(&self) -> String {
460 let mut hasher = Sha256::new();
461 hasher.update(&serde_json::to_string(self).unwrap());
462 format!("{:x}", hasher.finalize())
463 }
464}
465
466fn create_environmentd_network_policies(
467 config: &super::Config,
468 mz: &Materialize,
469) -> Vec<NetworkPolicy> {
470 let mut network_policies = Vec::new();
471 if config.network_policies_internal_enabled {
472 let environmentd_label_selector = LabelSelector {
473 match_labels: Some(
474 mz.default_labels()
475 .into_iter()
476 .chain([(
477 "materialize.cloud/app".to_owned(),
478 mz.environmentd_app_name(),
479 )])
480 .collect(),
481 ),
482 ..Default::default()
483 };
484 let orchestratord_label_selector = LabelSelector {
485 match_labels: Some(
486 config
487 .orchestratord_pod_selector_labels
488 .iter()
489 .cloned()
490 .map(|kv| (kv.key, kv.value))
491 .collect(),
492 ),
493 ..Default::default()
494 };
495 let all_pods_label_selector = LabelSelector {
498 match_labels: Some(mz.default_labels()),
499 ..Default::default()
500 };
501 network_policies.extend([
502 NetworkPolicy {
505 metadata: mz
506 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
507 spec: Some(NetworkPolicySpec {
508 egress: Some(vec![NetworkPolicyEgressRule {
509 to: Some(vec![NetworkPolicyPeer {
510 pod_selector: Some(all_pods_label_selector.clone()),
511 ..Default::default()
512 }]),
513 ..Default::default()
514 }]),
515 ingress: Some(vec![NetworkPolicyIngressRule {
516 from: Some(vec![NetworkPolicyPeer {
517 pod_selector: Some(all_pods_label_selector.clone()),
518 ..Default::default()
519 }]),
520 ..Default::default()
521 }]),
522 pod_selector: Some(all_pods_label_selector.clone()),
523 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
524 ..Default::default()
525 }),
526 },
527 NetworkPolicy {
530 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
531 spec: Some(NetworkPolicySpec {
532 ingress: Some(vec![NetworkPolicyIngressRule {
533 from: Some(vec![NetworkPolicyPeer {
534 namespace_selector: Some(LabelSelector {
535 match_labels: Some(btreemap! {
536 "kubernetes.io/metadata.name".into()
537 => config.orchestratord_namespace.clone(),
538 }),
539 ..Default::default()
540 }),
541 pod_selector: Some(orchestratord_label_selector),
542 ..Default::default()
543 }]),
544 ports: Some(vec![
545 NetworkPolicyPort {
546 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
547 protocol: Some("TCP".to_string()),
548 ..Default::default()
549 },
550 NetworkPolicyPort {
551 port: Some(IntOrString::Int(
552 config.environmentd_internal_http_port.into(),
553 )),
554 protocol: Some("TCP".to_string()),
555 ..Default::default()
556 },
557 ]),
558 ..Default::default()
559 }]),
560 pod_selector: Some(environmentd_label_selector),
561 policy_types: Some(vec!["Ingress".to_owned()]),
562 ..Default::default()
563 }),
564 },
565 ]);
566 }
567 if config.network_policies_ingress_enabled {
568 let mut ingress_label_selector = mz.default_labels();
569 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
570 network_policies.extend([NetworkPolicy {
571 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
572 spec: Some(NetworkPolicySpec {
573 ingress: Some(vec![NetworkPolicyIngressRule {
574 from: Some(
575 config
576 .network_policies_ingress_cidrs
577 .iter()
578 .map(|cidr| NetworkPolicyPeer {
579 ip_block: Some(IPBlock {
580 cidr: cidr.to_owned(),
581 except: None,
582 }),
583 ..Default::default()
584 })
585 .collect(),
586 ),
587 ports: Some(vec![
588 NetworkPolicyPort {
589 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
590 protocol: Some("TCP".to_string()),
591 ..Default::default()
592 },
593 NetworkPolicyPort {
594 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
595 protocol: Some("TCP".to_string()),
596 ..Default::default()
597 },
598 ]),
599 ..Default::default()
600 }]),
601 pod_selector: Some(LabelSelector {
602 match_expressions: None,
603 match_labels: Some(ingress_label_selector),
604 }),
605 policy_types: Some(vec!["Ingress".to_owned()]),
606 ..Default::default()
607 }),
608 }]);
609 }
610 if config.network_policies_egress_enabled {
611 network_policies.extend([NetworkPolicy {
612 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
613 spec: Some(NetworkPolicySpec {
614 egress: Some(vec![NetworkPolicyEgressRule {
615 to: Some(
616 config
617 .network_policies_egress_cidrs
618 .iter()
619 .map(|cidr| NetworkPolicyPeer {
620 ip_block: Some(IPBlock {
621 cidr: cidr.to_owned(),
622 except: None,
623 }),
624 ..Default::default()
625 })
626 .collect(),
627 ),
628 ..Default::default()
629 }]),
630 pod_selector: Some(LabelSelector {
631 match_expressions: None,
632 match_labels: Some(mz.default_labels()),
633 }),
634 policy_types: Some(vec!["Egress".to_owned()]),
635 ..Default::default()
636 }),
637 }]);
638 }
639 network_policies
640}
641
642fn create_service_account_object(
643 config: &super::Config,
644 mz: &Materialize,
645) -> Option<ServiceAccount> {
646 if mz.create_service_account() {
647 let mut annotations: BTreeMap<String, String> = mz
648 .spec
649 .service_account_annotations
650 .clone()
651 .unwrap_or_default();
652 if let (CloudProvider::Aws, Some(role_arn)) = (
653 config.cloud_provider,
654 mz.spec
655 .environmentd_iam_role_arn
656 .as_deref()
657 .or(config.environmentd_iam_role_arn.as_deref()),
658 ) {
659 warn!(
660 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
661 );
662 annotations.insert(
663 "eks.amazonaws.com/role-arn".to_string(),
664 role_arn.to_string(),
665 );
666 };
667
668 let mut labels = mz.default_labels();
669 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
670
671 Some(ServiceAccount {
672 metadata: ObjectMeta {
673 annotations: Some(annotations),
674 labels: Some(labels),
675 ..mz.managed_resource_meta(mz.service_account_name())
676 },
677 ..Default::default()
678 })
679 } else {
680 None
681 }
682}
683
684fn create_role_object(mz: &Materialize) -> Role {
685 Role {
686 metadata: mz.managed_resource_meta(mz.role_name()),
687 rules: Some(vec![
688 PolicyRule {
689 api_groups: Some(vec!["apps".to_string()]),
690 resources: Some(vec!["statefulsets".to_string()]),
691 verbs: vec![
692 "get".to_string(),
693 "list".to_string(),
694 "watch".to_string(),
695 "create".to_string(),
696 "update".to_string(),
697 "patch".to_string(),
698 "delete".to_string(),
699 ],
700 ..Default::default()
701 },
702 PolicyRule {
703 api_groups: Some(vec!["".to_string()]),
704 resources: Some(vec![
705 "persistentvolumeclaims".to_string(),
706 "pods".to_string(),
707 "secrets".to_string(),
708 "services".to_string(),
709 ]),
710 verbs: vec![
711 "get".to_string(),
712 "list".to_string(),
713 "watch".to_string(),
714 "create".to_string(),
715 "update".to_string(),
716 "patch".to_string(),
717 "delete".to_string(),
718 ],
719 ..Default::default()
720 },
721 PolicyRule {
722 api_groups: Some(vec!["".to_string()]),
723 resources: Some(vec!["configmaps".to_string()]),
724 verbs: vec!["get".to_string()],
725 ..Default::default()
726 },
727 PolicyRule {
728 api_groups: Some(vec!["materialize.cloud".to_string()]),
729 resources: Some(vec!["vpcendpoints".to_string()]),
730 verbs: vec![
731 "get".to_string(),
732 "list".to_string(),
733 "watch".to_string(),
734 "create".to_string(),
735 "update".to_string(),
736 "patch".to_string(),
737 "delete".to_string(),
738 ],
739 ..Default::default()
740 },
741 PolicyRule {
742 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
743 resources: Some(vec!["pods".to_string()]),
744 verbs: vec!["get".to_string(), "list".to_string()],
745 ..Default::default()
746 },
747 PolicyRule {
748 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
749 resources: Some(vec![
750 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
751 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
752 ]),
753 verbs: vec!["get".to_string()],
754 ..Default::default()
755 },
756 ]),
757 }
758}
759
760fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
761 RoleBinding {
762 metadata: mz.managed_resource_meta(mz.role_binding_name()),
763 role_ref: RoleRef {
764 api_group: "".to_string(),
765 kind: "Role".to_string(),
766 name: mz.role_name(),
767 },
768 subjects: Some(vec![Subject {
769 api_group: Some("".to_string()),
770 kind: "ServiceAccount".to_string(),
771 name: mz.service_account_name(),
772 namespace: Some(mz.namespace()),
773 }]),
774 }
775}
776
777fn create_public_service_object(
778 config: &super::Config,
779 mz: &Materialize,
780 generation: u64,
781) -> Service {
782 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
783}
784
785fn create_generation_service_object(
786 config: &super::Config,
787 mz: &Materialize,
788 generation: u64,
789) -> Service {
790 create_base_service_object(
791 config,
792 mz,
793 generation,
794 &mz.environmentd_generation_service_name(generation),
795 )
796}
797
798fn create_base_service_object(
799 config: &super::Config,
800 mz: &Materialize,
801 generation: u64,
802 service_name: &str,
803) -> Service {
804 let ports = vec![
805 ServicePort {
806 port: config.environmentd_sql_port.into(),
807 protocol: Some("TCP".to_string()),
808 name: Some("sql".to_string()),
809 ..Default::default()
810 },
811 ServicePort {
812 port: config.environmentd_http_port.into(),
813 protocol: Some("TCP".to_string()),
814 name: Some("https".to_string()),
815 ..Default::default()
816 },
817 ServicePort {
818 port: config.environmentd_internal_sql_port.into(),
819 protocol: Some("TCP".to_string()),
820 name: Some("internal-sql".to_string()),
821 ..Default::default()
822 },
823 ServicePort {
824 port: config.environmentd_internal_http_port.into(),
825 protocol: Some("TCP".to_string()),
826 name: Some("internal-http".to_string()),
827 ..Default::default()
828 },
829 ];
830
831 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
832
833 let spec = ServiceSpec {
834 type_: Some("ClusterIP".to_string()),
835 cluster_ip: Some("None".to_string()),
836 selector: Some(selector),
837 ports: Some(ports),
838 ..Default::default()
839 };
840
841 Service {
842 metadata: mz.managed_resource_meta(service_name.to_string()),
843 spec: Some(spec),
844 status: None,
845 }
846}
847
848fn create_persist_pubsub_service(
849 config: &super::Config,
850 mz: &Materialize,
851 generation: u64,
852) -> Service {
853 Service {
854 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
855 spec: Some(ServiceSpec {
856 type_: Some("ClusterIP".to_string()),
857 cluster_ip: Some("None".to_string()),
858 selector: Some(btreemap! {
859 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
860 }),
861 ports: Some(vec![ServicePort {
862 name: Some("grpc".to_string()),
863 protocol: Some("TCP".to_string()),
864 port: config.environmentd_internal_persist_pubsub_port.into(),
865 ..Default::default()
866 }]),
867 ..Default::default()
868 }),
869 status: None,
870 }
871}
872
873fn create_environmentd_certificate(
874 config: &super::Config,
875 mz: &Materialize,
876) -> Option<Certificate> {
877 create_certificate(
878 config.default_certificate_specs.internal.clone(),
879 mz,
880 mz.spec.internal_certificate_spec.clone(),
881 mz.environmentd_certificate_name(),
882 mz.environmentd_certificate_secret_name(),
883 Some(vec![
884 mz.environmentd_service_name(),
885 mz.environmentd_service_internal_fqdn(),
886 ]),
887 CertificatePrivateKeyAlgorithm::Ed25519,
888 None,
889 )
890}
891
892fn create_environmentd_statefulset_object(
893 config: &super::Config,
894 mz: &Materialize,
895 generation: u64,
896) -> StatefulSet {
897 let mut env = vec![
907 EnvVar {
908 name: "MZ_METADATA_BACKEND_URL".to_string(),
909 value_from: Some(EnvVarSource {
910 secret_key_ref: Some(SecretKeySelector {
911 name: mz.backend_secret_name(),
912 key: "metadata_backend_url".to_string(),
913 optional: Some(false),
914 }),
915 ..Default::default()
916 }),
917 ..Default::default()
918 },
919 EnvVar {
920 name: "MZ_PERSIST_BLOB_URL".to_string(),
921 value_from: Some(EnvVarSource {
922 secret_key_ref: Some(SecretKeySelector {
923 name: mz.backend_secret_name(),
924 key: "persist_backend_url".to_string(),
925 optional: Some(false),
926 }),
927 ..Default::default()
928 }),
929 ..Default::default()
930 },
931 ];
932
933 env.push(EnvVar {
934 name: "AWS_REGION".to_string(),
935 value: Some(config.region.clone()),
936 ..Default::default()
937 });
938
939 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
940
941 let mut args = vec![];
942
943 if let Some(helm_chart_version) = &config.helm_chart_version {
944 args.push(format!("--helm-chart-version={helm_chart_version}"));
945 }
946
947 args.push(format!(
949 "--environment-id={}",
950 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
951 ));
952
953 args.push(format!(
955 "--clusterd-image={}",
956 matching_image_from_environmentd_image_ref(
957 &mz.spec.environmentd_image_ref,
958 "clusterd",
959 None
960 )
961 ));
962
963 args.extend(
965 [
966 config
967 .environmentd_cluster_replica_sizes
968 .as_ref()
969 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
970 config
971 .bootstrap_default_cluster_replica_size
972 .as_ref()
973 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
974 config
975 .bootstrap_builtin_system_cluster_replica_size
976 .as_ref()
977 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
978 config
979 .bootstrap_builtin_probe_cluster_replica_size
980 .as_ref()
981 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
982 config
983 .bootstrap_builtin_support_cluster_replica_size
984 .as_ref()
985 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
986 config
987 .bootstrap_builtin_catalog_server_cluster_replica_size
988 .as_ref()
989 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
990 config
991 .bootstrap_builtin_analytics_cluster_replica_size
992 .as_ref()
993 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
994 config
995 .bootstrap_builtin_system_cluster_replication_factor
996 .as_ref()
997 .map(|replication_factor| {
998 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
999 }),
1000 config
1001 .bootstrap_builtin_probe_cluster_replication_factor
1002 .as_ref()
1003 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1004 config
1005 .bootstrap_builtin_support_cluster_replication_factor
1006 .as_ref()
1007 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1008 config
1009 .bootstrap_builtin_analytics_cluster_replication_factor
1010 .as_ref()
1011 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1012 ]
1013 .into_iter()
1014 .flatten(),
1015 );
1016
1017 args.extend(
1018 config
1019 .environmentd_allowed_origins
1020 .iter()
1021 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1022 );
1023
1024 args.push(format!(
1025 "--secrets-controller={}",
1026 config.secrets_controller
1027 ));
1028
1029 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1030 if let Ok(cluster_replica_sizes) =
1031 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1032 {
1033 let cluster_replica_sizes: Vec<_> =
1034 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1035 args.push(format!(
1036 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1037 cluster_replica_sizes.join("', '")
1038 ));
1039 }
1040 }
1041 if !config.cloud_provider.is_cloud() {
1042 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1043 }
1044
1045 if config.enable_internal_statement_logging {
1046 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1047 }
1048
1049 if config.disable_statement_logging {
1050 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1051 }
1052
1053 if !mz.spec.enable_rbac {
1054 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1055 }
1056
1057 args.push("--persist-isolated-runtime-threads=-1".to_string());
1061
1062 if config.cloud_provider == CloudProvider::Aws {
1064 if let Some(azs) = config.environmentd_availability_zones.as_ref() {
1065 for az in azs {
1066 args.push(format!("--availability-zone={az}"));
1067 }
1068 }
1069
1070 if let Some(environmentd_connection_role_arn) = mz
1071 .spec
1072 .environmentd_connection_role_arn
1073 .as_deref()
1074 .or(config.environmentd_connection_role_arn.as_deref())
1075 {
1076 args.push(format!(
1077 "--aws-connection-role-arn={}",
1078 environmentd_connection_role_arn
1079 ));
1080 }
1081 if let Some(account_id) = &config.aws_account_id {
1082 args.push(format!("--aws-account-id={account_id}"));
1083 }
1084
1085 args.extend([format!(
1086 "--aws-secrets-controller-tags=Environment={}",
1087 mz.name_unchecked()
1088 )]);
1089 args.extend_from_slice(&config.aws_secrets_controller_tags);
1090 }
1091
1092 args.extend([
1094 "--orchestrator=kubernetes".into(),
1095 format!(
1096 "--orchestrator-kubernetes-service-account={}",
1097 &mz.service_account_name()
1098 ),
1099 format!(
1100 "--orchestrator-kubernetes-image-pull-policy={}",
1101 config.image_pull_policy.as_kebab_case_str(),
1102 ),
1103 ]);
1104 for selector in &config.clusterd_node_selector {
1105 args.push(format!(
1106 "--orchestrator-kubernetes-service-node-selector={}={}",
1107 selector.key, selector.value,
1108 ));
1109 }
1110 if mz.meets_minimum_version(&V144) {
1111 if let Some(affinity) = &config.clusterd_affinity {
1112 let affinity = serde_json::to_string(affinity).unwrap();
1113 args.push(format!(
1114 "--orchestrator-kubernetes-service-affinity={affinity}"
1115 ))
1116 }
1117 if let Some(tolerations) = &config.clusterd_tolerations {
1118 let tolerations = serde_json::to_string(tolerations).unwrap();
1119 args.push(format!(
1120 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1121 ))
1122 }
1123 }
1124 if let Some(scheduler_name) = &config.scheduler_name {
1125 args.push(format!(
1126 "--orchestrator-kubernetes-scheduler-name={}",
1127 scheduler_name
1128 ));
1129 }
1130 if mz.meets_minimum_version(&V154_DEV0) {
1131 args.extend(
1132 mz.spec
1133 .pod_annotations
1134 .as_ref()
1135 .map(|annotations| annotations.iter())
1136 .unwrap_or_default()
1137 .map(|(key, val)| {
1138 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1139 }),
1140 );
1141 }
1142 args.extend(
1143 mz.default_labels()
1144 .iter()
1145 .chain(
1146 mz.spec
1147 .pod_labels
1148 .as_ref()
1149 .map(|labels| labels.iter())
1150 .unwrap_or_default(),
1151 )
1152 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1153 );
1154 if let Some(status) = &mz.status {
1155 args.push(format!(
1156 "--orchestrator-kubernetes-name-prefix=mz{}-",
1157 status.resource_id
1158 ));
1159 }
1160
1161 args.extend(["--log-format=json".into()]);
1163 if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
1164 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1165 }
1166 args.extend([
1168 format!(
1169 "--opentelemetry-resource=organization_id={}",
1170 mz.spec.environment_id
1171 ),
1172 format!(
1173 "--opentelemetry-resource=environment_name={}",
1174 mz.name_unchecked()
1175 ),
1176 ]);
1177
1178 if let Some(segment_api_key) = &config.segment_api_key {
1179 args.push(format!("--segment-api-key={}", segment_api_key));
1180 if config.segment_client_side {
1181 args.push("--segment-client-side".into());
1182 }
1183 }
1184
1185 let mut volumes = Vec::new();
1186 let mut volume_mounts = Vec::new();
1187 if issuer_ref_defined(
1188 &config.default_certificate_specs.internal,
1189 &mz.spec.internal_certificate_spec,
1190 ) {
1191 volumes.push(Volume {
1192 name: "certificate".to_owned(),
1193 secret: Some(SecretVolumeSource {
1194 default_mode: Some(0o400),
1195 secret_name: Some(mz.environmentd_certificate_secret_name()),
1196 items: None,
1197 optional: Some(false),
1198 }),
1199 ..Default::default()
1200 });
1201 volume_mounts.push(VolumeMount {
1202 name: "certificate".to_owned(),
1203 mount_path: "/etc/materialized".to_owned(),
1204 read_only: Some(true),
1205 ..Default::default()
1206 });
1207 args.extend([
1208 "--tls-mode=require".into(),
1209 "--tls-cert=/etc/materialized/tls.crt".into(),
1210 "--tls-key=/etc/materialized/tls.key".into(),
1211 ]);
1212 } else {
1213 args.push("--tls-mode=disable".to_string());
1214 }
1215 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1216 args.push(format!(
1217 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1218 ephemeral_volume_class
1219 ));
1220 }
1221 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1223
1224 if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
1226 args.push(format!("--sentry-dsn={}", sentry_dsn));
1227 if let Some(sentry_environment) = &config.tracing.sentry_environment {
1228 args.push(format!("--sentry-environment={}", sentry_environment));
1229 }
1230 args.push(format!("--sentry-tag=region={}", config.region));
1231 }
1232
1233 args.push(format!(
1235 "--persist-pubsub-url=http://{}:{}",
1236 mz.persist_pubsub_service_name(generation),
1237 config.environmentd_internal_persist_pubsub_port,
1238 ));
1239 args.push(format!(
1240 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1241 config.environmentd_internal_persist_pubsub_port
1242 ));
1243
1244 args.push(format!("--deploy-generation={}", generation));
1245
1246 args.push(format!(
1248 "--internal-console-redirect-url={}",
1249 &config.internal_console_proxy_url,
1250 ));
1251
1252 if !config.collect_pod_metrics {
1253 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1254 }
1255 if config.enable_prometheus_scrape_annotations {
1256 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1257 }
1258
1259 if config.disable_license_key_checks {
1262 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1263 args.push("--disable-license-key-checks".into());
1264 }
1265 }
1266
1267 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1270 || mz.meets_minimum_version(&V153)
1271 {
1272 volume_mounts.push(VolumeMount {
1273 name: "license-key".to_string(),
1274 mount_path: "/license_key".to_string(),
1275 ..Default::default()
1276 });
1277 volumes.push(Volume {
1278 name: "license-key".to_string(),
1279 secret: Some(SecretVolumeSource {
1280 default_mode: Some(256),
1281 optional: Some(false),
1282 secret_name: Some(mz.backend_secret_name()),
1283 items: Some(vec![KeyToPath {
1284 key: "license_key".to_string(),
1285 path: "license_key".to_string(),
1286 ..Default::default()
1287 }]),
1288 ..Default::default()
1289 }),
1290 ..Default::default()
1291 });
1292 env.push(EnvVar {
1293 name: "MZ_LICENSE_KEY".to_string(),
1294 value: Some("/license_key/license_key".to_string()),
1295 ..Default::default()
1296 });
1297 }
1298
1299 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1301 args.extend(extra_args.iter().cloned());
1302 }
1303
1304 let probe = Probe {
1305 initial_delay_seconds: Some(1),
1306 failure_threshold: Some(12),
1307 tcp_socket: Some(TCPSocketAction {
1308 host: None,
1309 port: IntOrString::Int(config.environmentd_sql_port.into()),
1310 }),
1311 ..Default::default()
1312 };
1313
1314 let security_context = if config.enable_security_context {
1315 Some(SecurityContext {
1319 run_as_non_root: Some(true),
1320 capabilities: Some(Capabilities {
1321 drop: Some(vec!["ALL".to_string()]),
1322 ..Default::default()
1323 }),
1324 seccomp_profile: Some(SeccompProfile {
1325 type_: "RuntimeDefault".to_string(),
1326 ..Default::default()
1327 }),
1328 allow_privilege_escalation: Some(false),
1329 ..Default::default()
1330 })
1331 } else {
1332 None
1333 };
1334
1335 let ports = vec![
1336 ContainerPort {
1337 container_port: config.environmentd_sql_port.into(),
1338 name: Some("sql".to_owned()),
1339 ..Default::default()
1340 },
1341 ContainerPort {
1342 container_port: config.environmentd_internal_sql_port.into(),
1343 name: Some("internal-sql".to_owned()),
1344 ..Default::default()
1345 },
1346 ContainerPort {
1347 container_port: config.environmentd_http_port.into(),
1348 name: Some("http".to_owned()),
1349 ..Default::default()
1350 },
1351 ContainerPort {
1352 container_port: config.environmentd_internal_http_port.into(),
1353 name: Some("internal-http".to_owned()),
1354 ..Default::default()
1355 },
1356 ContainerPort {
1357 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1358 name: Some("persist-pubsub".to_owned()),
1359 ..Default::default()
1360 },
1361 ];
1362
1363 if mz.meets_minimum_version(&V147_DEV0) {
1365 volume_mounts.push(VolumeMount {
1366 name: "listeners-configmap".to_string(),
1367 mount_path: "/listeners".to_string(),
1368 ..Default::default()
1369 });
1370 volumes.push(Volume {
1371 name: "listeners-configmap".to_string(),
1372 config_map: Some(ConfigMapVolumeSource {
1373 name: mz.listeners_configmap_name(generation),
1374 default_mode: Some(256),
1375 optional: Some(false),
1376 items: Some(vec![KeyToPath {
1377 key: "listeners.json".to_string(),
1378 path: "listeners.json".to_string(),
1379 ..Default::default()
1380 }]),
1381 }),
1382 ..Default::default()
1383 });
1384 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1385 if matches!(
1386 mz.spec.authenticator_kind,
1387 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1388 ) {
1389 args.push("--system-parameter-default=enable_password_auth=true".into());
1390 env.push(EnvVar {
1391 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1392 value_from: Some(EnvVarSource {
1393 secret_key_ref: Some(SecretKeySelector {
1394 name: mz.backend_secret_name(),
1395 key: "external_login_password_mz_system".to_string(),
1396 optional: Some(false),
1397 }),
1398 ..Default::default()
1399 }),
1400 ..Default::default()
1401 })
1402 }
1403 } else {
1404 args.extend([
1405 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1406 format!(
1407 "--http-listen-addr=0.0.0.0:{}",
1408 config.environmentd_http_port
1409 ),
1410 format!(
1411 "--internal-sql-listen-addr=0.0.0.0:{}",
1412 config.environmentd_internal_sql_port
1413 ),
1414 format!(
1415 "--internal-http-listen-addr=0.0.0.0:{}",
1416 config.environmentd_internal_http_port
1417 ),
1418 ]);
1419 }
1420
1421 let container = Container {
1422 name: "environmentd".to_owned(),
1423 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1424 image_pull_policy: Some(config.image_pull_policy.to_string()),
1425 ports: Some(ports),
1426 args: Some(args),
1427 env: Some(env),
1428 volume_mounts: Some(volume_mounts),
1429 liveness_probe: Some(probe.clone()),
1430 readiness_probe: Some(probe),
1431 resources: mz
1432 .spec
1433 .environmentd_resource_requirements
1434 .clone()
1435 .or_else(|| config.environmentd_default_resources.clone()),
1436 security_context: security_context.clone(),
1437 ..Default::default()
1438 };
1439
1440 let mut pod_template_labels = mz.default_labels();
1441 pod_template_labels.insert(
1442 "materialize.cloud/name".to_owned(),
1443 mz.environmentd_statefulset_name(generation),
1444 );
1445 pod_template_labels.insert(
1446 "materialize.cloud/app".to_owned(),
1447 mz.environmentd_app_name(),
1448 );
1449 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1450 pod_template_labels.extend(
1451 mz.spec
1452 .pod_labels
1453 .as_ref()
1454 .map(|labels| labels.iter())
1455 .unwrap_or_default()
1456 .map(|(key, value)| (key.clone(), value.clone())),
1457 );
1458
1459 let mut pod_template_annotations = btreemap! {
1460 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1462
1463 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1465 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1466 "materialize.cloud/generation".to_owned() => generation.to_string(),
1467 };
1468 if config.enable_prometheus_scrape_annotations {
1469 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1470 pod_template_annotations.insert(
1471 "prometheus.io/port".to_owned(),
1472 config.environmentd_internal_http_port.to_string(),
1473 );
1474 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1475 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1476 pod_template_annotations.insert(
1477 "materialize.prometheus.io/mz_usage_path".to_owned(),
1478 "/metrics/mz_usage".to_string(),
1479 );
1480 pod_template_annotations.insert(
1481 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1482 "/metrics/mz_frontier".to_string(),
1483 );
1484 pod_template_annotations.insert(
1485 "materialize.prometheus.io/mz_compute_path".to_owned(),
1486 "/metrics/mz_compute".to_string(),
1487 );
1488 pod_template_annotations.insert(
1489 "materialize.prometheus.io/mz_storage_path".to_owned(),
1490 "/metrics/mz_storage".to_string(),
1491 );
1492 }
1493 pod_template_annotations.extend(
1494 mz.spec
1495 .pod_annotations
1496 .as_ref()
1497 .map(|annotations| annotations.iter())
1498 .unwrap_or_default()
1499 .map(|(key, value)| (key.clone(), value.clone())),
1500 );
1501
1502 let mut tolerations = vec![
1503 Toleration {
1507 effect: Some("NoExecute".into()),
1508 key: Some("node.kubernetes.io/not-ready".into()),
1509 operator: Some("Exists".into()),
1510 toleration_seconds: Some(30),
1511 value: None,
1512 },
1513 Toleration {
1514 effect: Some("NoExecute".into()),
1515 key: Some("node.kubernetes.io/unreachable".into()),
1516 operator: Some("Exists".into()),
1517 toleration_seconds: Some(30),
1518 value: None,
1519 },
1520 ];
1521 if let Some(user_tolerations) = &config.environmentd_tolerations {
1522 tolerations.extend(user_tolerations.iter().cloned());
1523 }
1524 let tolerations = Some(tolerations);
1525
1526 let pod_template_spec = PodTemplateSpec {
1527 metadata: Some(ObjectMeta {
1530 labels: Some(pod_template_labels),
1531 annotations: Some(pod_template_annotations), ..Default::default()
1533 }),
1534 spec: Some(PodSpec {
1535 containers: vec![container],
1536 node_selector: Some(
1537 config
1538 .environmentd_node_selector
1539 .iter()
1540 .map(|selector| (selector.key.clone(), selector.value.clone()))
1541 .collect(),
1542 ),
1543 affinity: config.environmentd_affinity.clone(),
1544 scheduler_name: config.scheduler_name.clone(),
1545 service_account_name: Some(mz.service_account_name()),
1546 volumes: Some(volumes),
1547 security_context: Some(PodSecurityContext {
1548 fs_group: Some(999),
1549 run_as_user: Some(999),
1550 run_as_group: Some(999),
1551 ..Default::default()
1552 }),
1553 tolerations,
1554 termination_grace_period_seconds: Some(0),
1575 ..Default::default()
1576 }),
1577 };
1578
1579 let mut match_labels = BTreeMap::new();
1580 match_labels.insert(
1581 "materialize.cloud/name".to_owned(),
1582 mz.environmentd_statefulset_name(generation),
1583 );
1584
1585 let statefulset_spec = StatefulSetSpec {
1586 replicas: Some(1),
1587 template: pod_template_spec,
1588 service_name: Some(mz.environmentd_service_name()),
1589 selector: LabelSelector {
1590 match_expressions: None,
1591 match_labels: Some(match_labels),
1592 },
1593 ..Default::default()
1594 };
1595
1596 StatefulSet {
1597 metadata: ObjectMeta {
1598 annotations: Some(btreemap! {
1599 "materialize.cloud/generation".to_owned() => generation.to_string(),
1600 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1601 }),
1602 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1603 },
1604 spec: Some(statefulset_spec),
1605 status: None,
1606 }
1607}
1608
1609fn create_connection_info(
1610 config: &super::Config,
1611 mz: &Materialize,
1612 generation: u64,
1613) -> ConnectionInfo {
1614 let external_enable_tls = issuer_ref_defined(
1615 &config.default_certificate_specs.internal,
1616 &mz.spec.internal_certificate_spec,
1617 );
1618 let authenticator_kind = mz.spec.authenticator_kind;
1619
1620 let mut listeners_config = ListenersConfig {
1621 sql: btreemap! {
1622 "external".to_owned() => SqlListenerConfig{
1623 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1624 authenticator_kind,
1625 allowed_roles: AllowedRoles::Normal,
1626 enable_tls: external_enable_tls,
1627 },
1628 "internal".to_owned() => SqlListenerConfig{
1629 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1630 authenticator_kind: AuthenticatorKind::None,
1631 allowed_roles: AllowedRoles::NormalAndInternal,
1633 enable_tls: false,
1634 },
1635 },
1636 http: btreemap! {
1637 "external".to_owned() => HttpListenerConfig{
1638 base: BaseListenerConfig {
1639 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1640 authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1644 AuthenticatorKind::Password
1645 } else {
1646 authenticator_kind
1647 },
1648 allowed_roles: AllowedRoles::Normal,
1649 enable_tls: external_enable_tls,
1650 },
1651 routes: HttpRoutesEnabled{
1652 base: true,
1653 webhook: true,
1654 internal: false,
1655 metrics: false,
1656 profiling: false,
1657 }
1658 },
1659 "internal".to_owned() => HttpListenerConfig{
1660 base: BaseListenerConfig {
1661 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1662 authenticator_kind: AuthenticatorKind::None,
1663 allowed_roles: AllowedRoles::NormalAndInternal,
1665 enable_tls: false,
1666 },
1667 routes: HttpRoutesEnabled{
1668 base: true,
1669 webhook: true,
1670 internal: true,
1671 metrics: true,
1672 profiling: true,
1673 }
1674 },
1675 },
1676 };
1677
1678 if matches!(
1679 authenticator_kind,
1680 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1681 ) {
1682 listeners_config.sql.remove("internal");
1683 listeners_config.http.remove("internal");
1684
1685 listeners_config.sql.get_mut("external").map(|listener| {
1686 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1687 listener
1688 });
1689 listeners_config.http.get_mut("external").map(|listener| {
1690 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1691 listener.routes.internal = true;
1692 listener.routes.profiling = true;
1693 listener
1694 });
1695
1696 listeners_config.http.insert(
1697 "metrics".to_owned(),
1698 HttpListenerConfig {
1699 base: BaseListenerConfig {
1700 addr: SocketAddr::new(
1701 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1702 config.environmentd_internal_http_port,
1703 ),
1704 authenticator_kind: AuthenticatorKind::None,
1705 allowed_roles: AllowedRoles::NormalAndInternal,
1706 enable_tls: false,
1707 },
1708 routes: HttpRoutesEnabled {
1709 base: false,
1710 webhook: false,
1711 internal: false,
1712 metrics: true,
1713 profiling: false,
1714 },
1715 },
1716 );
1717 }
1718
1719 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1720 let listeners_configmap = ConfigMap {
1721 binary_data: None,
1722 data: Some(btreemap! {
1723 "listeners.json".to_owned() => listeners_json,
1724 }),
1725 immutable: None,
1726 metadata: ObjectMeta {
1727 annotations: Some(btreemap! {
1728 "materialize.cloud/generation".to_owned() => generation.to_string(),
1729 }),
1730 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1731 },
1732 };
1733
1734 let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1735 AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1736 let scheme = if external_enable_tls { "https" } else { "http" };
1737 (
1738 scheme,
1739 config.environmentd_http_port,
1740 Some(mz.spec.backend_secret_name.clone()),
1741 )
1742 }
1743 _ => ("http", config.environmentd_internal_http_port, None),
1744 };
1745 let environmentd_url = format!(
1746 "{}://{}.{}.svc.cluster.local:{}",
1747 scheme,
1748 mz.environmentd_generation_service_name(generation),
1749 mz.namespace(),
1750 leader_api_port,
1751 );
1752 ConnectionInfo {
1753 environmentd_url,
1754 listeners_configmap,
1755 mz_system_secret_name,
1756 }
1757}
1758
1759#[derive(Debug, Deserialize, PartialEq, Eq)]
1761struct BecomeLeaderResponse {
1762 result: BecomeLeaderResult,
1763}
1764
1765#[derive(Debug, Deserialize, PartialEq, Eq)]
1766enum BecomeLeaderResult {
1767 Success,
1768 Failure { message: String },
1769}
1770
1771#[derive(Debug, Deserialize, PartialEq, Eq)]
1772struct SkipCatchupError {
1773 message: String,
1774}