1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec,
23 Probe, SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServiceAccount, ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
25 },
26 networking::v1::{
27 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
28 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
29 },
30 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
31 },
32 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
33};
34use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
35use maplit::btreemap;
36use mz_server_core::listeners::{
37 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
38 ListenersConfig, SqlListenerConfig,
39};
40use rand::{Rng, thread_rng};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
50use crate::k8s::{apply_resource, delete_resource, get_resource};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::generated::cert_manager::certificates::{
53 Certificate, CertificatePrivateKeyAlgorithm,
54};
55use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
56use mz_orchestrator_tracing::TracingCliArgs;
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 140,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69 major: 0,
70 minor: 147,
71 patch: 0,
72 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 0,
78 minor: 154,
79 patch: 0,
80 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91 Initializing,
94 ReadyToPromote,
96 Promoting,
98 IsLeader,
100}
101
102#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
103pub struct GetLeaderStatusResponse {
104 status: DeploymentStatus,
105}
106
107#[derive(Deserialize, Serialize)]
108pub struct LoginCredentials {
109 username: String,
110 password: String,
112}
113
114#[derive(Debug, Serialize)]
115pub struct ConnectionInfo {
116 pub environmentd_url: String,
117 pub mz_system_secret_name: Option<String>,
118 pub listeners_configmap: ConfigMap,
119}
120
121#[derive(Debug, Serialize)]
122pub struct Resources {
123 pub generation: u64,
124 pub environmentd_network_policies: Vec<NetworkPolicy>,
125 pub service_account: Box<Option<ServiceAccount>>,
126 pub role: Box<Role>,
127 pub role_binding: Box<RoleBinding>,
128 pub public_service: Box<Service>,
129 pub generation_service: Box<Service>,
130 pub persist_pubsub_service: Box<Service>,
131 pub environmentd_certificate: Box<Option<Certificate>>,
132 pub environmentd_statefulset: Box<StatefulSet>,
133 pub connection_info: Box<ConnectionInfo>,
134}
135
136impl Resources {
137 pub fn new(
138 config: &super::MaterializeControllerArgs,
139 tracing: &TracingCliArgs,
140 orchestratord_namespace: &str,
141 mz: &Materialize,
142 generation: u64,
143 ) -> Self {
144 let environmentd_network_policies =
145 create_environmentd_network_policies(config, mz, orchestratord_namespace);
146
147 let service_account = Box::new(create_service_account_object(config, mz));
148 let role = Box::new(create_role_object(mz));
149 let role_binding = Box::new(create_role_binding_object(mz));
150 let public_service = Box::new(create_public_service_object(config, mz, generation));
151 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
152 let persist_pubsub_service =
153 Box::new(create_persist_pubsub_service(config, mz, generation));
154 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
155 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
156 config, tracing, mz, generation,
157 ));
158 let connection_info = Box::new(create_connection_info(config, mz, generation));
159
160 Self {
161 generation,
162 environmentd_network_policies,
163 service_account,
164 role,
165 role_binding,
166 public_service,
167 generation_service,
168 persist_pubsub_service,
169 environmentd_certificate,
170 environmentd_statefulset,
171 connection_info,
172 }
173 }
174
175 #[instrument]
176 pub async fn apply(
177 &self,
178 client: &Client,
179 force_promote: bool,
180 namespace: &str,
181 ) -> Result<Option<Action>, Error> {
182 let environmentd_network_policy_api: Api<NetworkPolicy> =
183 Api::namespaced(client.clone(), namespace);
184 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
185 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
186 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
187 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
188 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
189 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
190 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
191
192 for policy in &self.environmentd_network_policies {
193 trace!("applying network policy {}", policy.name_unchecked());
194 apply_resource(&environmentd_network_policy_api, policy).await?;
195 }
196
197 if let Some(service_account) = &*self.service_account {
198 trace!("applying environmentd service account");
199 apply_resource(&service_account_api, service_account).await?;
200 }
201
202 trace!("applying environmentd role");
203 apply_resource(&role_api, &*self.role).await?;
204
205 trace!("applying environmentd role binding");
206 apply_resource(&role_binding_api, &*self.role_binding).await?;
207
208 trace!("applying environmentd per-generation service");
209 apply_resource(&service_api, &*self.generation_service).await?;
210
211 trace!("creating persist pubsub service");
212 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
213
214 if let Some(certificate) = &*self.environmentd_certificate {
215 trace!("creating new environmentd certificate");
216 apply_resource(&certificate_api, certificate).await?;
217 }
218
219 trace!("applying listeners configmap");
220 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
221
222 trace!("creating new environmentd statefulset");
223 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
224
225 let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
226
227 let statefulset = get_resource(
228 &statefulset_api,
229 &self.environmentd_statefulset.name_unchecked(),
230 )
231 .await?;
232 if statefulset
233 .and_then(|statefulset| statefulset.status)
234 .and_then(|status| status.ready_replicas)
235 .unwrap_or(0)
236 == 0
237 {
238 trace!("environmentd statefulset is not ready yet...");
239 return Ok(Some(retry_action));
240 }
241
242 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
243 return Ok(Some(retry_action));
244 };
245 let status_url = reqwest::Url::parse(&format!(
246 "{}/api/leader/status",
247 self.connection_info.environmentd_url,
248 ))
249 .unwrap();
250
251 match http_client.get(status_url.clone()).send().await {
252 Ok(response) => {
253 let response: GetLeaderStatusResponse = match response.error_for_status() {
254 Ok(response) => response.json().await?,
255 Err(e) => {
256 trace!("failed to get status of environmentd, retrying... ({e})");
257 return Ok(Some(retry_action));
258 }
259 };
260 if force_promote {
261 trace!("skipping cluster catchup");
262 let skip_catchup_url = reqwest::Url::parse(&format!(
263 "{}/api/leader/skip-catchup",
264 self.connection_info.environmentd_url,
265 ))
266 .unwrap();
267 let response = http_client.post(skip_catchup_url).send().await?;
268 if response.status() == StatusCode::BAD_REQUEST {
269 let err: SkipCatchupError = response.json().await?;
270 return Err(
271 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
272 );
273 }
274 } else {
275 match response.status {
276 DeploymentStatus::Initializing => {
277 trace!("environmentd is still initializing, retrying...");
278 return Ok(Some(retry_action));
279 }
280 DeploymentStatus::ReadyToPromote
281 | DeploymentStatus::Promoting
282 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
283 }
284 }
285 }
286 Err(e) => {
287 trace!("failed to connect to environmentd, retrying... ({e})");
288 return Ok(Some(retry_action));
289 }
290 }
291
292 Ok(None)
293 }
294
295 #[instrument]
296 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
297 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
298 Some(match &self.connection_info.mz_system_secret_name {
299 Some(mz_system_secret_name) => {
300 let http_client = reqwest::Client::builder()
301 .timeout(std::time::Duration::from_secs(10))
302 .cookie_store(true)
303 .danger_accept_invalid_certs(true)
305 .build()
306 .unwrap();
307
308 let secret = secret_api
309 .get(mz_system_secret_name)
310 .await
311 .map_err(|e| {
312 error!("Failed to get backend secret: {:?}", e);
313 e
314 })
315 .ok()?;
316 if let Some(data) = secret.data {
317 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
318 let password = String::from_utf8_lossy(&password.0).to_string();
319 let login_url = reqwest::Url::parse(&format!(
320 "{}/api/login",
321 self.connection_info.environmentd_url,
322 ))
323 .unwrap();
324 match http_client
325 .post(login_url)
326 .body(
327 serde_json::to_string(&LoginCredentials {
328 username: "mz_system".to_owned(),
329 password,
330 })
331 .expect(
332 "Serializing a simple struct with utf8 strings doesn't fail.",
333 ),
334 )
335 .header("Content-Type", "application/json")
336 .send()
337 .await
338 {
339 Ok(response) => {
340 if let Err(e) = response.error_for_status() {
341 trace!("failed to login to environmentd, retrying... ({e})");
342 return None;
343 }
344 }
345 Err(e) => {
346 trace!("failed to connect to environmentd, retrying... ({e})");
347 return None;
348 }
349 };
350 }
351 };
352 http_client
353 }
354 None => reqwest::Client::builder()
355 .timeout(std::time::Duration::from_secs(10))
356 .build()
357 .unwrap(),
358 })
359 }
360
361 #[instrument]
362 pub async fn promote_services(
363 &self,
364 client: &Client,
365 namespace: &str,
366 ) -> Result<Option<Action>, Error> {
367 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
368 let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
369
370 let promote_url = reqwest::Url::parse(&format!(
371 "{}/api/leader/promote",
372 self.connection_info.environmentd_url,
373 ))
374 .unwrap();
375
376 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
377 return Ok(Some(retry_action));
378 };
379
380 trace!("promoting new environmentd to leader");
381 let response = http_client.post(promote_url).send().await?;
382 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
383 if let BecomeLeaderResult::Failure { message } = response.result {
384 return Err(Error::Anyhow(anyhow::anyhow!(
385 "failed to promote new environmentd: {message}"
386 )));
387 }
388
389 let status_url = reqwest::Url::parse(&format!(
401 "{}/api/leader/status",
402 self.connection_info.environmentd_url,
403 ))
404 .unwrap();
405 match http_client.get(status_url.clone()).send().await {
406 Ok(response) => {
407 let response: GetLeaderStatusResponse = response.json().await?;
408 if response.status != DeploymentStatus::IsLeader {
409 trace!(
410 "environmentd is still promoting (status: {:?}), retrying...",
411 response.status
412 );
413 return Ok(Some(retry_action));
414 } else {
415 trace!("environmentd is ready");
416 }
417 }
418 Err(e) => {
419 trace!("failed to connect to environmentd, retrying... ({e})");
420 return Ok(Some(retry_action));
421 }
422 }
423
424 trace!("applying environmentd public service");
425 apply_resource(&service_api, &*self.public_service).await?;
426
427 Ok(None)
428 }
429
430 #[instrument]
431 pub async fn teardown_generation(
432 &self,
433 client: &Client,
434 mz: &Materialize,
435 generation: u64,
436 ) -> Result<(), anyhow::Error> {
437 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
438 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
439 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
440
441 trace!("deleting environmentd statefulset for generation {generation}");
442 delete_resource(
443 &statefulset_api,
444 &mz.environmentd_statefulset_name(generation),
445 )
446 .await?;
447
448 trace!("deleting persist pubsub service for generation {generation}");
449 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
450
451 trace!("deleting environmentd per-generation service for generation {generation}");
452 delete_resource(
453 &service_api,
454 &mz.environmentd_generation_service_name(generation),
455 )
456 .await?;
457
458 trace!("deleting listeners configmap for generation {generation}");
459 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
460
461 Ok(())
462 }
463
464 pub fn generate_hash(&self) -> String {
467 let mut hasher = Sha256::new();
468 hasher.update(&serde_json::to_string(self).unwrap());
469 format!("{:x}", hasher.finalize())
470 }
471}
472
473fn create_environmentd_network_policies(
474 config: &super::MaterializeControllerArgs,
475 mz: &Materialize,
476 orchestratord_namespace: &str,
477) -> Vec<NetworkPolicy> {
478 let mut network_policies = Vec::new();
479 if config.network_policies.internal_enabled {
480 let environmentd_label_selector = LabelSelector {
481 match_labels: Some(
482 mz.default_labels()
483 .into_iter()
484 .chain([(
485 "materialize.cloud/app".to_owned(),
486 mz.environmentd_app_name(),
487 )])
488 .collect(),
489 ),
490 ..Default::default()
491 };
492 let orchestratord_label_selector = LabelSelector {
493 match_labels: Some(
494 config
495 .orchestratord_pod_selector_labels
496 .iter()
497 .cloned()
498 .map(|kv| (kv.key, kv.value))
499 .collect(),
500 ),
501 ..Default::default()
502 };
503 let all_pods_label_selector = LabelSelector {
506 match_labels: Some(mz.default_labels()),
507 ..Default::default()
508 };
509 network_policies.extend([
510 NetworkPolicy {
513 metadata: mz
514 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
515 spec: Some(NetworkPolicySpec {
516 egress: Some(vec![NetworkPolicyEgressRule {
517 to: Some(vec![NetworkPolicyPeer {
518 pod_selector: Some(all_pods_label_selector.clone()),
519 ..Default::default()
520 }]),
521 ..Default::default()
522 }]),
523 ingress: Some(vec![NetworkPolicyIngressRule {
524 from: Some(vec![NetworkPolicyPeer {
525 pod_selector: Some(all_pods_label_selector.clone()),
526 ..Default::default()
527 }]),
528 ..Default::default()
529 }]),
530 pod_selector: Some(all_pods_label_selector.clone()),
531 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
532 ..Default::default()
533 }),
534 },
535 NetworkPolicy {
538 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
539 spec: Some(NetworkPolicySpec {
540 ingress: Some(vec![NetworkPolicyIngressRule {
541 from: Some(vec![NetworkPolicyPeer {
542 namespace_selector: Some(LabelSelector {
543 match_labels: Some(btreemap! {
544 "kubernetes.io/metadata.name".into()
545 => orchestratord_namespace.into(),
546 }),
547 ..Default::default()
548 }),
549 pod_selector: Some(orchestratord_label_selector),
550 ..Default::default()
551 }]),
552 ports: Some(vec![
553 NetworkPolicyPort {
554 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
555 protocol: Some("TCP".to_string()),
556 ..Default::default()
557 },
558 NetworkPolicyPort {
559 port: Some(IntOrString::Int(
560 config.environmentd_internal_http_port.into(),
561 )),
562 protocol: Some("TCP".to_string()),
563 ..Default::default()
564 },
565 ]),
566 ..Default::default()
567 }]),
568 pod_selector: Some(environmentd_label_selector),
569 policy_types: Some(vec!["Ingress".to_owned()]),
570 ..Default::default()
571 }),
572 },
573 ]);
574 }
575 if config.network_policies.ingress_enabled {
576 let mut ingress_label_selector = mz.default_labels();
577 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
578 network_policies.extend([NetworkPolicy {
579 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
580 spec: Some(NetworkPolicySpec {
581 ingress: Some(vec![NetworkPolicyIngressRule {
582 from: Some(
583 config
584 .network_policies
585 .ingress_cidrs
586 .iter()
587 .map(|cidr| NetworkPolicyPeer {
588 ip_block: Some(IPBlock {
589 cidr: cidr.to_owned(),
590 except: None,
591 }),
592 ..Default::default()
593 })
594 .collect(),
595 ),
596 ports: Some(vec![
597 NetworkPolicyPort {
598 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
599 protocol: Some("TCP".to_string()),
600 ..Default::default()
601 },
602 NetworkPolicyPort {
603 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
604 protocol: Some("TCP".to_string()),
605 ..Default::default()
606 },
607 ]),
608 ..Default::default()
609 }]),
610 pod_selector: Some(LabelSelector {
611 match_expressions: None,
612 match_labels: Some(ingress_label_selector),
613 }),
614 policy_types: Some(vec!["Ingress".to_owned()]),
615 ..Default::default()
616 }),
617 }]);
618 }
619 if config.network_policies.egress_enabled {
620 network_policies.extend([NetworkPolicy {
621 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
622 spec: Some(NetworkPolicySpec {
623 egress: Some(vec![NetworkPolicyEgressRule {
624 to: Some(
625 config
626 .network_policies
627 .egress_cidrs
628 .iter()
629 .map(|cidr| NetworkPolicyPeer {
630 ip_block: Some(IPBlock {
631 cidr: cidr.to_owned(),
632 except: None,
633 }),
634 ..Default::default()
635 })
636 .collect(),
637 ),
638 ..Default::default()
639 }]),
640 pod_selector: Some(LabelSelector {
641 match_expressions: None,
642 match_labels: Some(mz.default_labels()),
643 }),
644 policy_types: Some(vec!["Egress".to_owned()]),
645 ..Default::default()
646 }),
647 }]);
648 }
649 network_policies
650}
651
652fn create_service_account_object(
653 config: &super::MaterializeControllerArgs,
654 mz: &Materialize,
655) -> Option<ServiceAccount> {
656 if mz.create_service_account() {
657 let mut annotations: BTreeMap<String, String> = mz
658 .spec
659 .service_account_annotations
660 .clone()
661 .unwrap_or_default();
662 if let (CloudProvider::Aws, Some(role_arn)) = (
663 config.cloud_provider,
664 mz.spec
665 .environmentd_iam_role_arn
666 .as_deref()
667 .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
668 ) {
669 warn!(
670 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
671 );
672 annotations.insert(
673 "eks.amazonaws.com/role-arn".to_string(),
674 role_arn.to_string(),
675 );
676 };
677
678 let mut labels = mz.default_labels();
679 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
680
681 Some(ServiceAccount {
682 metadata: ObjectMeta {
683 annotations: Some(annotations),
684 labels: Some(labels),
685 ..mz.managed_resource_meta(mz.service_account_name())
686 },
687 ..Default::default()
688 })
689 } else {
690 None
691 }
692}
693
694fn create_role_object(mz: &Materialize) -> Role {
695 Role {
696 metadata: mz.managed_resource_meta(mz.role_name()),
697 rules: Some(vec![
698 PolicyRule {
699 api_groups: Some(vec!["apps".to_string()]),
700 resources: Some(vec!["statefulsets".to_string()]),
701 verbs: vec![
702 "get".to_string(),
703 "list".to_string(),
704 "watch".to_string(),
705 "create".to_string(),
706 "update".to_string(),
707 "patch".to_string(),
708 "delete".to_string(),
709 ],
710 ..Default::default()
711 },
712 PolicyRule {
713 api_groups: Some(vec!["".to_string()]),
714 resources: Some(vec![
715 "persistentvolumeclaims".to_string(),
716 "pods".to_string(),
717 "secrets".to_string(),
718 "services".to_string(),
719 ]),
720 verbs: vec![
721 "get".to_string(),
722 "list".to_string(),
723 "watch".to_string(),
724 "create".to_string(),
725 "update".to_string(),
726 "patch".to_string(),
727 "delete".to_string(),
728 ],
729 ..Default::default()
730 },
731 PolicyRule {
732 api_groups: Some(vec!["".to_string()]),
733 resources: Some(vec!["configmaps".to_string()]),
734 verbs: vec!["get".to_string()],
735 ..Default::default()
736 },
737 PolicyRule {
738 api_groups: Some(vec!["materialize.cloud".to_string()]),
739 resources: Some(vec!["vpcendpoints".to_string()]),
740 verbs: vec![
741 "get".to_string(),
742 "list".to_string(),
743 "watch".to_string(),
744 "create".to_string(),
745 "update".to_string(),
746 "patch".to_string(),
747 "delete".to_string(),
748 ],
749 ..Default::default()
750 },
751 PolicyRule {
752 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
753 resources: Some(vec!["pods".to_string()]),
754 verbs: vec!["get".to_string(), "list".to_string()],
755 ..Default::default()
756 },
757 PolicyRule {
758 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
759 resources: Some(vec![
760 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
761 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
762 ]),
763 verbs: vec!["get".to_string()],
764 ..Default::default()
765 },
766 ]),
767 }
768}
769
770fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
771 RoleBinding {
772 metadata: mz.managed_resource_meta(mz.role_binding_name()),
773 role_ref: RoleRef {
774 api_group: "".to_string(),
775 kind: "Role".to_string(),
776 name: mz.role_name(),
777 },
778 subjects: Some(vec![Subject {
779 api_group: Some("".to_string()),
780 kind: "ServiceAccount".to_string(),
781 name: mz.service_account_name(),
782 namespace: Some(mz.namespace()),
783 }]),
784 }
785}
786
787fn create_public_service_object(
788 config: &super::MaterializeControllerArgs,
789 mz: &Materialize,
790 generation: u64,
791) -> Service {
792 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
793}
794
795fn create_generation_service_object(
796 config: &super::MaterializeControllerArgs,
797 mz: &Materialize,
798 generation: u64,
799) -> Service {
800 create_base_service_object(
801 config,
802 mz,
803 generation,
804 &mz.environmentd_generation_service_name(generation),
805 )
806}
807
808fn create_base_service_object(
809 config: &super::MaterializeControllerArgs,
810 mz: &Materialize,
811 generation: u64,
812 service_name: &str,
813) -> Service {
814 let ports = vec![
815 ServicePort {
816 port: config.environmentd_sql_port.into(),
817 protocol: Some("TCP".to_string()),
818 name: Some("sql".to_string()),
819 ..Default::default()
820 },
821 ServicePort {
822 port: config.environmentd_http_port.into(),
823 protocol: Some("TCP".to_string()),
824 name: Some("https".to_string()),
825 ..Default::default()
826 },
827 ServicePort {
828 port: config.environmentd_internal_sql_port.into(),
829 protocol: Some("TCP".to_string()),
830 name: Some("internal-sql".to_string()),
831 ..Default::default()
832 },
833 ServicePort {
834 port: config.environmentd_internal_http_port.into(),
835 protocol: Some("TCP".to_string()),
836 name: Some("internal-http".to_string()),
837 ..Default::default()
838 },
839 ];
840
841 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
842
843 let spec = ServiceSpec {
844 type_: Some("ClusterIP".to_string()),
845 cluster_ip: Some("None".to_string()),
846 selector: Some(selector),
847 ports: Some(ports),
848 ..Default::default()
849 };
850
851 Service {
852 metadata: mz.managed_resource_meta(service_name.to_string()),
853 spec: Some(spec),
854 status: None,
855 }
856}
857
858fn create_persist_pubsub_service(
859 config: &super::MaterializeControllerArgs,
860 mz: &Materialize,
861 generation: u64,
862) -> Service {
863 Service {
864 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
865 spec: Some(ServiceSpec {
866 type_: Some("ClusterIP".to_string()),
867 cluster_ip: Some("None".to_string()),
868 selector: Some(btreemap! {
869 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
870 }),
871 ports: Some(vec![ServicePort {
872 name: Some("grpc".to_string()),
873 protocol: Some("TCP".to_string()),
874 port: config.environmentd_internal_persist_pubsub_port.into(),
875 ..Default::default()
876 }]),
877 ..Default::default()
878 }),
879 status: None,
880 }
881}
882
883fn create_environmentd_certificate(
884 config: &super::MaterializeControllerArgs,
885 mz: &Materialize,
886) -> Option<Certificate> {
887 create_certificate(
888 config.default_certificate_specs.internal.clone(),
889 mz,
890 mz.spec.internal_certificate_spec.clone(),
891 mz.environmentd_certificate_name(),
892 mz.environmentd_certificate_secret_name(),
893 Some(vec![
894 mz.environmentd_service_name(),
895 mz.environmentd_service_internal_fqdn(),
896 ]),
897 CertificatePrivateKeyAlgorithm::Ed25519,
898 None,
899 )
900}
901
902fn create_environmentd_statefulset_object(
903 config: &super::MaterializeControllerArgs,
904 tracing: &TracingCliArgs,
905 mz: &Materialize,
906 generation: u64,
907) -> StatefulSet {
908 let mut env = vec![
918 EnvVar {
919 name: "MZ_METADATA_BACKEND_URL".to_string(),
920 value_from: Some(EnvVarSource {
921 secret_key_ref: Some(SecretKeySelector {
922 name: mz.backend_secret_name(),
923 key: "metadata_backend_url".to_string(),
924 optional: Some(false),
925 }),
926 ..Default::default()
927 }),
928 ..Default::default()
929 },
930 EnvVar {
931 name: "MZ_PERSIST_BLOB_URL".to_string(),
932 value_from: Some(EnvVarSource {
933 secret_key_ref: Some(SecretKeySelector {
934 name: mz.backend_secret_name(),
935 key: "persist_backend_url".to_string(),
936 optional: Some(false),
937 }),
938 ..Default::default()
939 }),
940 ..Default::default()
941 },
942 ];
943
944 env.push(EnvVar {
945 name: "AWS_REGION".to_string(),
946 value: Some(config.region.clone()),
947 ..Default::default()
948 });
949
950 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
951
952 let mut args = vec![];
953
954 if let Some(helm_chart_version) = &config.helm_chart_version {
955 args.push(format!("--helm-chart-version={helm_chart_version}"));
956 }
957
958 args.push(format!(
960 "--environment-id={}",
961 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
962 ));
963
964 args.push(format!(
966 "--clusterd-image={}",
967 matching_image_from_environmentd_image_ref(
968 &mz.spec.environmentd_image_ref,
969 "clusterd",
970 None
971 )
972 ));
973
974 args.extend(
976 [
977 config
978 .environmentd_cluster_replica_sizes
979 .as_ref()
980 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
981 config
982 .bootstrap_default_cluster_replica_size
983 .as_ref()
984 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
985 config
986 .bootstrap_builtin_system_cluster_replica_size
987 .as_ref()
988 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
989 config
990 .bootstrap_builtin_probe_cluster_replica_size
991 .as_ref()
992 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
993 config
994 .bootstrap_builtin_support_cluster_replica_size
995 .as_ref()
996 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
997 config
998 .bootstrap_builtin_catalog_server_cluster_replica_size
999 .as_ref()
1000 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1001 config
1002 .bootstrap_builtin_analytics_cluster_replica_size
1003 .as_ref()
1004 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1005 config
1006 .bootstrap_builtin_system_cluster_replication_factor
1007 .as_ref()
1008 .map(|replication_factor| {
1009 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1010 }),
1011 config
1012 .bootstrap_builtin_probe_cluster_replication_factor
1013 .as_ref()
1014 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1015 config
1016 .bootstrap_builtin_support_cluster_replication_factor
1017 .as_ref()
1018 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1019 config
1020 .bootstrap_builtin_analytics_cluster_replication_factor
1021 .as_ref()
1022 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1023 ]
1024 .into_iter()
1025 .flatten(),
1026 );
1027
1028 args.extend(
1029 config
1030 .environmentd_allowed_origins
1031 .iter()
1032 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1033 );
1034
1035 args.push(format!(
1036 "--secrets-controller={}",
1037 config.secrets_controller
1038 ));
1039
1040 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1041 if let Ok(cluster_replica_sizes) =
1042 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1043 {
1044 let cluster_replica_sizes: Vec<_> =
1045 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1046 args.push(format!(
1047 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1048 cluster_replica_sizes.join("', '")
1049 ));
1050 }
1051 }
1052 if !config.cloud_provider.is_cloud() {
1053 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1054 }
1055
1056 if config.enable_internal_statement_logging {
1057 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1058 }
1059
1060 if config.disable_statement_logging {
1061 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1062 }
1063
1064 if !mz.spec.enable_rbac {
1065 args.push("--system-parameter-default=enable_rbac_checks=false".into());
1066 }
1067
1068 args.push("--persist-isolated-runtime-threads=-1".to_string());
1072
1073 if config.cloud_provider == CloudProvider::Aws {
1075 if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1076 for az in azs {
1077 args.push(format!("--availability-zone={az}"));
1078 }
1079 }
1080
1081 if let Some(environmentd_connection_role_arn) = mz
1082 .spec
1083 .environmentd_connection_role_arn
1084 .as_deref()
1085 .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1086 {
1087 args.push(format!(
1088 "--aws-connection-role-arn={}",
1089 environmentd_connection_role_arn
1090 ));
1091 }
1092 if let Some(account_id) = &config.aws_info.aws_account_id {
1093 args.push(format!("--aws-account-id={account_id}"));
1094 }
1095
1096 args.extend([format!(
1097 "--aws-secrets-controller-tags=Environment={}",
1098 mz.name_unchecked()
1099 )]);
1100 args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1101 }
1102
1103 args.extend([
1105 "--orchestrator=kubernetes".into(),
1106 format!(
1107 "--orchestrator-kubernetes-service-account={}",
1108 &mz.service_account_name()
1109 ),
1110 format!(
1111 "--orchestrator-kubernetes-image-pull-policy={}",
1112 config.image_pull_policy.as_kebab_case_str(),
1113 ),
1114 ]);
1115 for selector in &config.clusterd_node_selector {
1116 args.push(format!(
1117 "--orchestrator-kubernetes-service-node-selector={}={}",
1118 selector.key, selector.value,
1119 ));
1120 }
1121 if mz.meets_minimum_version(&V144) {
1122 if let Some(affinity) = &config.clusterd_affinity {
1123 let affinity = serde_json::to_string(affinity).unwrap();
1124 args.push(format!(
1125 "--orchestrator-kubernetes-service-affinity={affinity}"
1126 ))
1127 }
1128 if let Some(tolerations) = &config.clusterd_tolerations {
1129 let tolerations = serde_json::to_string(tolerations).unwrap();
1130 args.push(format!(
1131 "--orchestrator-kubernetes-service-tolerations={tolerations}"
1132 ))
1133 }
1134 }
1135 if let Some(scheduler_name) = &config.scheduler_name {
1136 args.push(format!(
1137 "--orchestrator-kubernetes-scheduler-name={}",
1138 scheduler_name
1139 ));
1140 }
1141 if mz.meets_minimum_version(&V154_DEV0) {
1142 args.extend(
1143 mz.spec
1144 .pod_annotations
1145 .as_ref()
1146 .map(|annotations| annotations.iter())
1147 .unwrap_or_default()
1148 .map(|(key, val)| {
1149 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1150 }),
1151 );
1152 }
1153 args.extend(
1154 mz.default_labels()
1155 .iter()
1156 .chain(
1157 mz.spec
1158 .pod_labels
1159 .as_ref()
1160 .map(|labels| labels.iter())
1161 .unwrap_or_default(),
1162 )
1163 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1164 );
1165 if let Some(status) = &mz.status {
1166 args.push(format!(
1167 "--orchestrator-kubernetes-name-prefix=mz{}-",
1168 status.resource_id
1169 ));
1170 }
1171
1172 args.extend(["--log-format=json".into()]);
1174 if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1175 args.push(format!("--opentelemetry-endpoint={}", endpoint));
1176 }
1177 args.extend([
1179 format!(
1180 "--opentelemetry-resource=organization_id={}",
1181 mz.spec.environment_id
1182 ),
1183 format!(
1184 "--opentelemetry-resource=environment_name={}",
1185 mz.name_unchecked()
1186 ),
1187 ]);
1188
1189 if let Some(segment_api_key) = &config.segment_api_key {
1190 args.push(format!("--segment-api-key={}", segment_api_key));
1191 if config.segment_client_side {
1192 args.push("--segment-client-side".into());
1193 }
1194 }
1195
1196 let mut volumes = Vec::new();
1197 let mut volume_mounts = Vec::new();
1198 if issuer_ref_defined(
1199 &config.default_certificate_specs.internal,
1200 &mz.spec.internal_certificate_spec,
1201 ) {
1202 volumes.push(Volume {
1203 name: "certificate".to_owned(),
1204 secret: Some(SecretVolumeSource {
1205 default_mode: Some(0o400),
1206 secret_name: Some(mz.environmentd_certificate_secret_name()),
1207 items: None,
1208 optional: Some(false),
1209 }),
1210 ..Default::default()
1211 });
1212 volume_mounts.push(VolumeMount {
1213 name: "certificate".to_owned(),
1214 mount_path: "/etc/materialized".to_owned(),
1215 read_only: Some(true),
1216 ..Default::default()
1217 });
1218 args.extend([
1219 "--tls-mode=require".into(),
1220 "--tls-cert=/etc/materialized/tls.crt".into(),
1221 "--tls-key=/etc/materialized/tls.key".into(),
1222 ]);
1223 } else {
1224 args.push("--tls-mode=disable".to_string());
1225 }
1226 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1227 args.push(format!(
1228 "--orchestrator-kubernetes-ephemeral-volume-class={}",
1229 ephemeral_volume_class
1230 ));
1231 }
1232 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1234
1235 if let Some(sentry_dsn) = &tracing.sentry_dsn {
1237 args.push(format!("--sentry-dsn={}", sentry_dsn));
1238 if let Some(sentry_environment) = &tracing.sentry_environment {
1239 args.push(format!("--sentry-environment={}", sentry_environment));
1240 }
1241 args.push(format!("--sentry-tag=region={}", config.region));
1242 }
1243
1244 args.push(format!(
1246 "--persist-pubsub-url=http://{}:{}",
1247 mz.persist_pubsub_service_name(generation),
1248 config.environmentd_internal_persist_pubsub_port,
1249 ));
1250 args.push(format!(
1251 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1252 config.environmentd_internal_persist_pubsub_port
1253 ));
1254
1255 args.push(format!("--deploy-generation={}", generation));
1256
1257 args.push(format!(
1259 "--internal-console-redirect-url={}",
1260 &config.internal_console_proxy_url,
1261 ));
1262
1263 if !config.collect_pod_metrics {
1264 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1265 }
1266 if config.enable_prometheus_scrape_annotations {
1267 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1268 }
1269
1270 if config.disable_license_key_checks {
1273 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1274 args.push("--disable-license-key-checks".into());
1275 }
1276 }
1277
1278 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1281 || mz.meets_minimum_version(&V153)
1282 {
1283 volume_mounts.push(VolumeMount {
1284 name: "license-key".to_string(),
1285 mount_path: "/license_key".to_string(),
1286 ..Default::default()
1287 });
1288 volumes.push(Volume {
1289 name: "license-key".to_string(),
1290 secret: Some(SecretVolumeSource {
1291 default_mode: Some(256),
1292 optional: Some(false),
1293 secret_name: Some(mz.backend_secret_name()),
1294 items: Some(vec![KeyToPath {
1295 key: "license_key".to_string(),
1296 path: "license_key".to_string(),
1297 ..Default::default()
1298 }]),
1299 ..Default::default()
1300 }),
1301 ..Default::default()
1302 });
1303 env.push(EnvVar {
1304 name: "MZ_LICENSE_KEY".to_string(),
1305 value: Some("/license_key/license_key".to_string()),
1306 ..Default::default()
1307 });
1308 }
1309
1310 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1312 args.extend(extra_args.iter().cloned());
1313 }
1314
1315 let probe = Probe {
1316 initial_delay_seconds: Some(1),
1317 failure_threshold: Some(12),
1318 http_get: Some(HTTPGetAction {
1319 path: Some("/api/readyz".to_string()),
1320 port: IntOrString::Int(config.environmentd_internal_http_port.into()),
1321 host: None,
1322 scheme: None,
1323 http_headers: None,
1324 }),
1325 ..Default::default()
1326 };
1327
1328 let security_context = if config.enable_security_context {
1329 Some(SecurityContext {
1333 run_as_non_root: Some(true),
1334 capabilities: Some(Capabilities {
1335 drop: Some(vec!["ALL".to_string()]),
1336 ..Default::default()
1337 }),
1338 seccomp_profile: Some(SeccompProfile {
1339 type_: "RuntimeDefault".to_string(),
1340 ..Default::default()
1341 }),
1342 allow_privilege_escalation: Some(false),
1343 ..Default::default()
1344 })
1345 } else {
1346 None
1347 };
1348
1349 let ports = vec![
1350 ContainerPort {
1351 container_port: config.environmentd_sql_port.into(),
1352 name: Some("sql".to_owned()),
1353 ..Default::default()
1354 },
1355 ContainerPort {
1356 container_port: config.environmentd_internal_sql_port.into(),
1357 name: Some("internal-sql".to_owned()),
1358 ..Default::default()
1359 },
1360 ContainerPort {
1361 container_port: config.environmentd_http_port.into(),
1362 name: Some("http".to_owned()),
1363 ..Default::default()
1364 },
1365 ContainerPort {
1366 container_port: config.environmentd_internal_http_port.into(),
1367 name: Some("internal-http".to_owned()),
1368 ..Default::default()
1369 },
1370 ContainerPort {
1371 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1372 name: Some("persist-pubsub".to_owned()),
1373 ..Default::default()
1374 },
1375 ];
1376
1377 if mz.meets_minimum_version(&V147_DEV0) {
1379 volume_mounts.push(VolumeMount {
1380 name: "listeners-configmap".to_string(),
1381 mount_path: "/listeners".to_string(),
1382 ..Default::default()
1383 });
1384 volumes.push(Volume {
1385 name: "listeners-configmap".to_string(),
1386 config_map: Some(ConfigMapVolumeSource {
1387 name: mz.listeners_configmap_name(generation),
1388 default_mode: Some(256),
1389 optional: Some(false),
1390 items: Some(vec![KeyToPath {
1391 key: "listeners.json".to_string(),
1392 path: "listeners.json".to_string(),
1393 ..Default::default()
1394 }]),
1395 }),
1396 ..Default::default()
1397 });
1398 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1399 if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1400 args.push("--system-parameter-default=enable_password_auth=true".into());
1401 env.push(EnvVar {
1402 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1403 value_from: Some(EnvVarSource {
1404 secret_key_ref: Some(SecretKeySelector {
1405 name: mz.backend_secret_name(),
1406 key: "external_login_password_mz_system".to_string(),
1407 optional: Some(false),
1408 }),
1409 ..Default::default()
1410 }),
1411 ..Default::default()
1412 })
1413 }
1414 } else {
1415 args.extend([
1416 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1417 format!(
1418 "--http-listen-addr=0.0.0.0:{}",
1419 config.environmentd_http_port
1420 ),
1421 format!(
1422 "--internal-sql-listen-addr=0.0.0.0:{}",
1423 config.environmentd_internal_sql_port
1424 ),
1425 format!(
1426 "--internal-http-listen-addr=0.0.0.0:{}",
1427 config.environmentd_internal_http_port
1428 ),
1429 ]);
1430 }
1431
1432 let container = Container {
1433 name: "environmentd".to_owned(),
1434 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1435 image_pull_policy: Some(config.image_pull_policy.to_string()),
1436 ports: Some(ports),
1437 args: Some(args),
1438 env: Some(env),
1439 volume_mounts: Some(volume_mounts),
1440 liveness_probe: Some(probe.clone()),
1441 readiness_probe: Some(probe),
1442 resources: mz
1443 .spec
1444 .environmentd_resource_requirements
1445 .clone()
1446 .or_else(|| config.environmentd_default_resources.clone()),
1447 security_context: security_context.clone(),
1448 ..Default::default()
1449 };
1450
1451 let mut pod_template_labels = mz.default_labels();
1452 pod_template_labels.insert(
1453 "materialize.cloud/name".to_owned(),
1454 mz.environmentd_statefulset_name(generation),
1455 );
1456 pod_template_labels.insert(
1457 "materialize.cloud/app".to_owned(),
1458 mz.environmentd_app_name(),
1459 );
1460 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1461 pod_template_labels.extend(
1462 mz.spec
1463 .pod_labels
1464 .as_ref()
1465 .map(|labels| labels.iter())
1466 .unwrap_or_default()
1467 .map(|(key, value)| (key.clone(), value.clone())),
1468 );
1469
1470 let mut pod_template_annotations = btreemap! {
1471 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1473
1474 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1476 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1477 "materialize.cloud/generation".to_owned() => generation.to_string(),
1478 };
1479 if config.enable_prometheus_scrape_annotations {
1480 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1481 pod_template_annotations.insert(
1482 "prometheus.io/port".to_owned(),
1483 config.environmentd_internal_http_port.to_string(),
1484 );
1485 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1486 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1487 pod_template_annotations.insert(
1488 "materialize.prometheus.io/mz_usage_path".to_owned(),
1489 "/metrics/mz_usage".to_string(),
1490 );
1491 pod_template_annotations.insert(
1492 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1493 "/metrics/mz_frontier".to_string(),
1494 );
1495 pod_template_annotations.insert(
1496 "materialize.prometheus.io/mz_compute_path".to_owned(),
1497 "/metrics/mz_compute".to_string(),
1498 );
1499 pod_template_annotations.insert(
1500 "materialize.prometheus.io/mz_storage_path".to_owned(),
1501 "/metrics/mz_storage".to_string(),
1502 );
1503 }
1504 pod_template_annotations.extend(
1505 mz.spec
1506 .pod_annotations
1507 .as_ref()
1508 .map(|annotations| annotations.iter())
1509 .unwrap_or_default()
1510 .map(|(key, value)| (key.clone(), value.clone())),
1511 );
1512
1513 let mut tolerations = vec![
1514 Toleration {
1518 effect: Some("NoExecute".into()),
1519 key: Some("node.kubernetes.io/not-ready".into()),
1520 operator: Some("Exists".into()),
1521 toleration_seconds: Some(30),
1522 value: None,
1523 },
1524 Toleration {
1525 effect: Some("NoExecute".into()),
1526 key: Some("node.kubernetes.io/unreachable".into()),
1527 operator: Some("Exists".into()),
1528 toleration_seconds: Some(30),
1529 value: None,
1530 },
1531 ];
1532 if let Some(user_tolerations) = &config.environmentd_tolerations {
1533 tolerations.extend(user_tolerations.iter().cloned());
1534 }
1535 let tolerations = Some(tolerations);
1536
1537 let pod_template_spec = PodTemplateSpec {
1538 metadata: Some(ObjectMeta {
1541 labels: Some(pod_template_labels),
1542 annotations: Some(pod_template_annotations), ..Default::default()
1544 }),
1545 spec: Some(PodSpec {
1546 containers: vec![container],
1547 node_selector: Some(
1548 config
1549 .environmentd_node_selector
1550 .iter()
1551 .map(|selector| (selector.key.clone(), selector.value.clone()))
1552 .collect(),
1553 ),
1554 affinity: config.environmentd_affinity.clone(),
1555 scheduler_name: config.scheduler_name.clone(),
1556 service_account_name: Some(mz.service_account_name()),
1557 volumes: Some(volumes),
1558 security_context: Some(PodSecurityContext {
1559 fs_group: Some(999),
1560 run_as_user: Some(999),
1561 run_as_group: Some(999),
1562 ..Default::default()
1563 }),
1564 tolerations,
1565 termination_grace_period_seconds: Some(0),
1586 ..Default::default()
1587 }),
1588 };
1589
1590 let mut match_labels = BTreeMap::new();
1591 match_labels.insert(
1592 "materialize.cloud/name".to_owned(),
1593 mz.environmentd_statefulset_name(generation),
1594 );
1595
1596 let statefulset_spec = StatefulSetSpec {
1597 replicas: Some(1),
1598 template: pod_template_spec,
1599 update_strategy: Some(StatefulSetUpdateStrategy {
1600 rolling_update: None,
1601 type_: Some("OnDelete".to_owned()),
1602 }),
1603 service_name: Some(mz.environmentd_service_name()),
1604 selector: LabelSelector {
1605 match_expressions: None,
1606 match_labels: Some(match_labels),
1607 },
1608 ..Default::default()
1609 };
1610
1611 StatefulSet {
1612 metadata: ObjectMeta {
1613 annotations: Some(btreemap! {
1614 "materialize.cloud/generation".to_owned() => generation.to_string(),
1615 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1616 }),
1617 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1618 },
1619 spec: Some(statefulset_spec),
1620 status: None,
1621 }
1622}
1623
1624fn create_connection_info(
1625 config: &super::MaterializeControllerArgs,
1626 mz: &Materialize,
1627 generation: u64,
1628) -> ConnectionInfo {
1629 let external_enable_tls = issuer_ref_defined(
1630 &config.default_certificate_specs.internal,
1631 &mz.spec.internal_certificate_spec,
1632 );
1633 let authenticator_kind = mz.spec.authenticator_kind;
1634 let mut listeners_config = ListenersConfig {
1635 sql: btreemap! {
1636 "external".to_owned() => SqlListenerConfig{
1637 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1638 authenticator_kind,
1639 allowed_roles: AllowedRoles::Normal,
1640 enable_tls: external_enable_tls,
1641 },
1642 "internal".to_owned() => SqlListenerConfig{
1643 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1644 authenticator_kind: AuthenticatorKind::None,
1645 allowed_roles: AllowedRoles::NormalAndInternal,
1647 enable_tls: false,
1648 },
1649 },
1650 http: btreemap! {
1651 "external".to_owned() => HttpListenerConfig{
1652 base: BaseListenerConfig {
1653 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1654 authenticator_kind,
1655 allowed_roles: AllowedRoles::Normal,
1656 enable_tls: external_enable_tls,
1657 },
1658 routes: HttpRoutesEnabled{
1659 base: true,
1660 webhook: true,
1661 internal: false,
1662 metrics: false,
1663 profiling: false,
1664 }
1665 },
1666 "internal".to_owned() => HttpListenerConfig{
1667 base: BaseListenerConfig {
1668 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1669 authenticator_kind: AuthenticatorKind::None,
1670 allowed_roles: AllowedRoles::NormalAndInternal,
1672 enable_tls: false,
1673 },
1674 routes: HttpRoutesEnabled{
1675 base: true,
1676 webhook: true,
1677 internal: true,
1678 metrics: true,
1679 profiling: true,
1680 }
1681 },
1682 },
1683 };
1684 if authenticator_kind == AuthenticatorKind::Password {
1685 listeners_config.sql.remove("internal");
1686 listeners_config.http.remove("internal");
1687
1688 listeners_config.sql.get_mut("external").map(|listener| {
1689 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1690 listener
1691 });
1692 listeners_config.http.get_mut("external").map(|listener| {
1693 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1694 listener.routes.internal = true;
1695 listener.routes.profiling = true;
1696 listener
1697 });
1698
1699 listeners_config.http.insert(
1700 "metrics".to_owned(),
1701 HttpListenerConfig {
1702 base: BaseListenerConfig {
1703 addr: SocketAddr::new(
1704 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1705 config.environmentd_internal_http_port,
1706 ),
1707 authenticator_kind: AuthenticatorKind::None,
1708 allowed_roles: AllowedRoles::NormalAndInternal,
1709 enable_tls: false,
1710 },
1711 routes: HttpRoutesEnabled {
1712 base: false,
1713 webhook: false,
1714 internal: false,
1715 metrics: true,
1716 profiling: false,
1717 },
1718 },
1719 );
1720 }
1721
1722 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1723 let listeners_configmap = ConfigMap {
1724 binary_data: None,
1725 data: Some(btreemap! {
1726 "listeners.json".to_owned() => listeners_json,
1727 }),
1728 immutable: None,
1729 metadata: ObjectMeta {
1730 annotations: Some(btreemap! {
1731 "materialize.cloud/generation".to_owned() => generation.to_string(),
1732 }),
1733 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1734 },
1735 };
1736
1737 let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1738 AuthenticatorKind::Password => {
1739 let scheme = if external_enable_tls { "https" } else { "http" };
1740 (
1741 scheme,
1742 config.environmentd_http_port,
1743 Some(mz.spec.backend_secret_name.clone()),
1744 )
1745 }
1746 _ => ("http", config.environmentd_internal_http_port, None),
1747 };
1748 let environmentd_url = format!(
1749 "{}://{}.{}.svc.cluster.local:{}",
1750 scheme,
1751 mz.environmentd_generation_service_name(generation),
1752 mz.namespace(),
1753 leader_api_port,
1754 );
1755 ConnectionInfo {
1756 environmentd_url,
1757 listeners_configmap,
1758 mz_system_secret_name,
1759 }
1760}
1761
1762#[derive(Debug, Deserialize, PartialEq, Eq)]
1764struct BecomeLeaderResponse {
1765 result: BecomeLeaderResult,
1766}
1767
1768#[derive(Debug, Deserialize, PartialEq, Eq)]
1769enum BecomeLeaderResult {
1770 Success,
1771 Failure { message: String },
1772}
1773
1774#[derive(Debug, Deserialize, PartialEq, Eq)]
1775struct SkipCatchupError {
1776 message: String,
1777}