mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19    api::{
20        apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21        core::v1::{
22            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23            EnvVarSource, KeyToPath, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25            Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26            VolumeMount,
27        },
28        networking::v1::{
29            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31        },
32        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33    },
34    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40    ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
54use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
55use mz_orchestrator_tracing::TracingCliArgs;
56use mz_ore::instrument;
57
58static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
59    major: 0,
60    minor: 140,
61    patch: 0,
62    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
63    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
64});
65const V143: Version = Version::new(0, 143, 0);
66const V144: Version = Version::new(0, 144, 0);
67static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68    major: 0,
69    minor: 147,
70    patch: 0,
71    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
72    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74const V153: Version = Version::new(0, 153, 0);
75static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
76    major: 0,
77    minor: 154,
78    patch: 0,
79    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
80    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
81});
82
83/// Describes the status of a deployment.
84///
85/// This is a simplified representation of `DeploymentState`, suitable for
86/// announcement to the external orchestrator.
87#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
88pub enum DeploymentStatus {
89    /// This deployment is not the leader. It is initializing and is not yet
90    /// ready to become the leader.
91    Initializing,
92    /// This deployment is not the leader, but it is ready to become the leader.
93    ReadyToPromote,
94    /// This deployment is in the process of becoming the leader.
95    Promoting,
96    /// This deployment is the leader.
97    IsLeader,
98}
99
100#[derive(Deserialize, Serialize)]
101pub struct LoginCredentials {
102    username: String,
103    // TODO Password type?
104    password: String,
105}
106
107#[derive(Debug, Serialize)]
108pub struct ConnectionInfo {
109    pub environmentd_url: String,
110    pub mz_system_secret_name: Option<String>,
111    pub listeners_configmap: ConfigMap,
112}
113
114#[derive(Debug, Serialize)]
115pub struct Resources {
116    pub generation: u64,
117    pub environmentd_network_policies: Vec<NetworkPolicy>,
118    pub service_account: Box<Option<ServiceAccount>>,
119    pub role: Box<Role>,
120    pub role_binding: Box<RoleBinding>,
121    pub public_service: Box<Service>,
122    pub generation_service: Box<Service>,
123    pub persist_pubsub_service: Box<Service>,
124    pub environmentd_certificate: Box<Option<Certificate>>,
125    pub environmentd_statefulset: Box<StatefulSet>,
126    pub connection_info: Box<ConnectionInfo>,
127}
128
129impl Resources {
130    pub fn new(
131        config: &super::MaterializeControllerArgs,
132        tracing: &TracingCliArgs,
133        orchestratord_namespace: &str,
134        mz: &Materialize,
135        generation: u64,
136    ) -> Self {
137        let environmentd_network_policies =
138            create_environmentd_network_policies(config, mz, orchestratord_namespace);
139
140        let service_account = Box::new(create_service_account_object(config, mz));
141        let role = Box::new(create_role_object(mz));
142        let role_binding = Box::new(create_role_binding_object(mz));
143        let public_service = Box::new(create_public_service_object(config, mz, generation));
144        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
145        let persist_pubsub_service =
146            Box::new(create_persist_pubsub_service(config, mz, generation));
147        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
148        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
149            config, tracing, mz, generation,
150        ));
151        let connection_info = Box::new(create_connection_info(config, mz, generation));
152
153        Self {
154            generation,
155            environmentd_network_policies,
156            service_account,
157            role,
158            role_binding,
159            public_service,
160            generation_service,
161            persist_pubsub_service,
162            environmentd_certificate,
163            environmentd_statefulset,
164            connection_info,
165        }
166    }
167
168    #[instrument]
169    pub async fn apply(
170        &self,
171        client: &Client,
172        increment_generation: bool,
173        force_promote: bool,
174        namespace: &str,
175    ) -> Result<Option<Action>, anyhow::Error> {
176        let environmentd_network_policy_api: Api<NetworkPolicy> =
177            Api::namespaced(client.clone(), namespace);
178        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
179        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
180        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
181        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
182        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
183        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
184        let pod_api: Api<Pod> = Api::namespaced(client.clone(), namespace);
185        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
186        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
187
188        for policy in &self.environmentd_network_policies {
189            trace!("applying network policy {}", policy.name_unchecked());
190            apply_resource(&environmentd_network_policy_api, policy).await?;
191        }
192
193        if let Some(service_account) = &*self.service_account {
194            trace!("applying environmentd service account");
195            apply_resource(&service_account_api, service_account).await?;
196        }
197
198        trace!("applying environmentd role");
199        apply_resource(&role_api, &*self.role).await?;
200
201        trace!("applying environmentd role binding");
202        apply_resource(&role_binding_api, &*self.role_binding).await?;
203
204        trace!("applying environmentd per-generation service");
205        apply_resource(&service_api, &*self.generation_service).await?;
206
207        trace!("creating persist pubsub service");
208        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
209
210        if let Some(certificate) = &*self.environmentd_certificate {
211            trace!("creating new environmentd certificate");
212            apply_resource(&certificate_api, certificate).await?;
213        }
214
215        trace!("applying listeners configmap");
216        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
217
218        trace!("creating new environmentd statefulset");
219        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
220
221        // until we have full zero downtime upgrades, we have a tradeoff: if
222        // we use the graceful upgrade mechanism, we minimize environmentd
223        // unavailability but require a full clusterd rehydration every time,
224        // and if we use the in-place upgrade mechanism, we cause a few
225        // minutes of environmentd downtime, but as long as the environmentd
226        // version didn't change, the existing clusterds will remain running
227        // and won't need to rehydrate. during a version bump, the tradeoff
228        // here is obvious (we need to rehydrate either way, so minimizing
229        // environmentd downtime in the meantime is strictly better), but if
230        // we need to force a rollout some other time (for instance, to
231        // increase the environmentd memory request, or something like that),
232        // it is often better to accept the environmentd unavailability in
233        // order to get the environment as a whole back to a working state
234        // sooner. once clusterd rehydration gets moved ahead of the leader
235        // promotion step, this will no longer make a difference and we can
236        // remove the extra codepath.
237        if increment_generation {
238            let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
239
240            let statefulset = get_resource(
241                &statefulset_api,
242                &self.environmentd_statefulset.name_unchecked(),
243            )
244            .await?;
245            if statefulset
246                .and_then(|statefulset| statefulset.status)
247                .and_then(|status| status.ready_replicas)
248                .unwrap_or(0)
249                == 0
250            {
251                trace!("environmentd statefulset is not ready yet...");
252                return Ok(Some(retry_action));
253            }
254
255            let http_client = match &self.connection_info.mz_system_secret_name {
256                Some(mz_system_secret_name) => {
257                    let http_client = reqwest::Client::builder()
258                        .timeout(std::time::Duration::from_secs(10))
259                        .cookie_store(true)
260                        // TODO add_root_certificate instead
261                        .danger_accept_invalid_certs(true)
262                        .build()
263                        .unwrap();
264                    if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
265                        if let Some(password) =
266                            data.get("external_login_password_mz_system").cloned()
267                        {
268                            let password = String::from_utf8_lossy(&password.0).to_string();
269                            let login_url = reqwest::Url::parse(&format!(
270                                "{}/api/login",
271                                self.connection_info.environmentd_url,
272                            ))
273                            .unwrap();
274                            match http_client
275                                .post(login_url)
276                                .body(serde_json::to_string(&LoginCredentials {
277                                    username: "mz_system".to_owned(),
278                                    password,
279                                })?)
280                                .header("Content-Type", "application/json")
281                                .send()
282                                .await
283                            {
284                                Ok(response) => {
285                                    if let Err(e) = response.error_for_status() {
286                                        trace!(
287                                            "failed to login to environmentd, retrying... ({e})"
288                                        );
289                                        return Ok(Some(retry_action));
290                                    }
291                                }
292                                Err(e) => {
293                                    trace!("failed to connect to environmentd, retrying... ({e})");
294                                    return Ok(Some(retry_action));
295                                }
296                            };
297                        }
298                    };
299                    http_client
300                }
301                None => reqwest::Client::builder()
302                    .timeout(std::time::Duration::from_secs(10))
303                    .build()
304                    .unwrap(),
305            };
306            let status_url = reqwest::Url::parse(&format!(
307                "{}/api/leader/status",
308                self.connection_info.environmentd_url,
309            ))
310            .unwrap();
311
312            match http_client.get(status_url.clone()).send().await {
313                Ok(response) => {
314                    let response: BTreeMap<String, DeploymentStatus> =
315                        match response.error_for_status() {
316                            Ok(response) => response.json().await?,
317                            Err(e) => {
318                                trace!("failed to get status of environmentd, retrying... ({e})");
319                                return Ok(Some(retry_action));
320                            }
321                        };
322                    if force_promote {
323                        trace!("skipping cluster catchup");
324                        let skip_catchup_url = reqwest::Url::parse(&format!(
325                            "{}/api/leader/skip-catchup",
326                            self.connection_info.environmentd_url,
327                        ))
328                        .unwrap();
329                        let response = http_client.post(skip_catchup_url).send().await?;
330                        if response.status() == StatusCode::BAD_REQUEST {
331                            let err: SkipCatchupError = response.json().await?;
332                            bail!("failed to skip catchup: {}", err.message);
333                        }
334                    } else if response["status"] == DeploymentStatus::Initializing {
335                        trace!("environmentd is still initializing, retrying...");
336                        return Ok(Some(retry_action));
337                    } else {
338                        trace!("environmentd is ready");
339                    }
340                }
341                Err(e) => {
342                    trace!("failed to connect to environmentd, retrying... ({e})");
343                    return Ok(Some(retry_action));
344                }
345            }
346
347            let promote_url = reqwest::Url::parse(&format!(
348                "{}/api/leader/promote",
349                self.connection_info.environmentd_url,
350            ))
351            .unwrap();
352
353            // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
354            // It is absolutely critical that this promotion is done last!
355            //
356            // If there are any failures in this method, the error handler in
357            // the caller will attempt to revert and delete the new environmentd.
358            // After promotion, the new environmentd is active, so that would
359            // cause an outage!
360            // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
361            trace!("promoting new environmentd to leader");
362            let response = http_client.post(promote_url).send().await?;
363            let response: BecomeLeaderResponse = match response.error_for_status() {
364                Ok(response) => response.json().await?,
365                Err(e) => {
366                    trace!("failed to promote environmentd, retrying... ({e})");
367                    return Ok(Some(retry_action));
368                }
369            };
370            if let BecomeLeaderResult::Failure { message } = response.result {
371                bail!("failed to promote new environmentd: {message}");
372            }
373
374            // A successful POST to the promotion endpoint only indicates
375            // that the promotion process was kicked off. It does not
376            // guarantee that the environment will be successfully promoted
377            // (e.g., if the environment crashes immediately after responding
378            // to the request, but before executing the takeover, the
379            // promotion will be lost).
380            //
381            // To guarantee the environment has been promoted successfully,
382            // we must wait to see at least one `IsLeader` status returned
383            // from the environment.
384
385            match http_client.get(status_url.clone()).send().await {
386                Ok(response) => {
387                    let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
388                    if response["status"] != DeploymentStatus::IsLeader {
389                        trace!(
390                            "environmentd is still promoting (status: {:?}), retrying...",
391                            response["status"]
392                        );
393                        return Ok(Some(retry_action));
394                    } else {
395                        trace!("environmentd is ready");
396                    }
397                }
398                Err(e) => {
399                    trace!("failed to connect to environmentd, retrying... ({e})");
400                    return Ok(Some(retry_action));
401                }
402            }
403        } else {
404            trace!("restarting environmentd pod to pick up statefulset changes");
405            delete_resource(
406                &pod_api,
407                &statefulset_pod_name(&*self.environmentd_statefulset, 0),
408            )
409            .await?;
410        }
411
412        Ok(None)
413    }
414
415    #[instrument]
416    pub async fn promote_services(
417        &self,
418        client: &Client,
419        namespace: &str,
420    ) -> Result<(), anyhow::Error> {
421        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
422
423        trace!("applying environmentd public service");
424        apply_resource(&service_api, &*self.public_service).await?;
425
426        Ok(())
427    }
428
429    #[instrument]
430    pub async fn teardown_generation(
431        &self,
432        client: &Client,
433        mz: &Materialize,
434        generation: u64,
435    ) -> Result<(), anyhow::Error> {
436        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
437        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
438        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
439
440        trace!("deleting environmentd statefulset for generation {generation}");
441        delete_resource(
442            &statefulset_api,
443            &mz.environmentd_statefulset_name(generation),
444        )
445        .await?;
446
447        trace!("deleting persist pubsub service for generation {generation}");
448        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
449
450        trace!("deleting environmentd per-generation service for generation {generation}");
451        delete_resource(
452            &service_api,
453            &mz.environmentd_generation_service_name(generation),
454        )
455        .await?;
456
457        trace!("deleting listeners configmap for generation {generation}");
458        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
459
460        Ok(())
461    }
462
463    // ideally we would just be able to hash the objects directly, but the
464    // generated kubernetes objects don't implement the Hash trait
465    pub fn generate_hash(&self) -> String {
466        let mut hasher = Sha256::new();
467        hasher.update(&serde_json::to_string(self).unwrap());
468        format!("{:x}", hasher.finalize())
469    }
470}
471
472fn create_environmentd_network_policies(
473    config: &super::MaterializeControllerArgs,
474    mz: &Materialize,
475    orchestratord_namespace: &str,
476) -> Vec<NetworkPolicy> {
477    let mut network_policies = Vec::new();
478    if config.network_policies.internal_enabled {
479        let environmentd_label_selector = LabelSelector {
480            match_labels: Some(
481                mz.default_labels()
482                    .into_iter()
483                    .chain([(
484                        "materialize.cloud/app".to_owned(),
485                        mz.environmentd_app_name(),
486                    )])
487                    .collect(),
488            ),
489            ..Default::default()
490        };
491        let orchestratord_label_selector = LabelSelector {
492            match_labels: Some(
493                config
494                    .orchestratord_pod_selector_labels
495                    .iter()
496                    .cloned()
497                    .map(|kv| (kv.key, kv.value))
498                    .collect(),
499            ),
500            ..Default::default()
501        };
502        // TODO (Alex) filter to just clusterd and environmentd,
503        // once we get a consistent set of labels for both.
504        let all_pods_label_selector = LabelSelector {
505            match_labels: Some(mz.default_labels()),
506            ..Default::default()
507        };
508        network_policies.extend([
509            // Allow all clusterd/environmentd traffic (between pods in the
510            // same environment)
511            NetworkPolicy {
512                metadata: mz
513                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
514                spec: Some(NetworkPolicySpec {
515                    egress: Some(vec![NetworkPolicyEgressRule {
516                        to: Some(vec![NetworkPolicyPeer {
517                            pod_selector: Some(all_pods_label_selector.clone()),
518                            ..Default::default()
519                        }]),
520                        ..Default::default()
521                    }]),
522                    ingress: Some(vec![NetworkPolicyIngressRule {
523                        from: Some(vec![NetworkPolicyPeer {
524                            pod_selector: Some(all_pods_label_selector.clone()),
525                            ..Default::default()
526                        }]),
527                        ..Default::default()
528                    }]),
529                    pod_selector: all_pods_label_selector.clone(),
530                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
531                    ..Default::default()
532                }),
533            },
534            // Allow traffic from orchestratord to environmentd in order to hit
535            // the promotion endpoints during upgrades
536            NetworkPolicy {
537                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
538                spec: Some(NetworkPolicySpec {
539                    ingress: Some(vec![NetworkPolicyIngressRule {
540                        from: Some(vec![NetworkPolicyPeer {
541                            namespace_selector: Some(LabelSelector {
542                                match_labels: Some(btreemap! {
543                                    "kubernetes.io/metadata.name".into()
544                                        => orchestratord_namespace.into(),
545                                }),
546                                ..Default::default()
547                            }),
548                            pod_selector: Some(orchestratord_label_selector),
549                            ..Default::default()
550                        }]),
551                        ports: Some(vec![
552                            NetworkPolicyPort {
553                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
554                                protocol: Some("TCP".to_string()),
555                                ..Default::default()
556                            },
557                            NetworkPolicyPort {
558                                port: Some(IntOrString::Int(
559                                    config.environmentd_internal_http_port.into(),
560                                )),
561                                protocol: Some("TCP".to_string()),
562                                ..Default::default()
563                            },
564                        ]),
565                        ..Default::default()
566                    }]),
567                    pod_selector: environmentd_label_selector,
568                    policy_types: Some(vec!["Ingress".to_owned()]),
569                    ..Default::default()
570                }),
571            },
572        ]);
573    }
574    if config.network_policies.ingress_enabled {
575        let mut ingress_label_selector = mz.default_labels();
576        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
577        network_policies.extend([NetworkPolicy {
578            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
579            spec: Some(NetworkPolicySpec {
580                ingress: Some(vec![NetworkPolicyIngressRule {
581                    from: Some(
582                        config
583                            .network_policies
584                            .ingress_cidrs
585                            .iter()
586                            .map(|cidr| NetworkPolicyPeer {
587                                ip_block: Some(IPBlock {
588                                    cidr: cidr.to_owned(),
589                                    except: None,
590                                }),
591                                ..Default::default()
592                            })
593                            .collect(),
594                    ),
595                    ports: Some(vec![
596                        NetworkPolicyPort {
597                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
598                            protocol: Some("TCP".to_string()),
599                            ..Default::default()
600                        },
601                        NetworkPolicyPort {
602                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
603                            protocol: Some("TCP".to_string()),
604                            ..Default::default()
605                        },
606                    ]),
607                    ..Default::default()
608                }]),
609                pod_selector: LabelSelector {
610                    match_expressions: None,
611                    match_labels: Some(ingress_label_selector),
612                },
613                policy_types: Some(vec!["Ingress".to_owned()]),
614                ..Default::default()
615            }),
616        }]);
617    }
618    if config.network_policies.egress_enabled {
619        network_policies.extend([NetworkPolicy {
620            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
621            spec: Some(NetworkPolicySpec {
622                egress: Some(vec![NetworkPolicyEgressRule {
623                    to: Some(
624                        config
625                            .network_policies
626                            .egress_cidrs
627                            .iter()
628                            .map(|cidr| NetworkPolicyPeer {
629                                ip_block: Some(IPBlock {
630                                    cidr: cidr.to_owned(),
631                                    except: None,
632                                }),
633                                ..Default::default()
634                            })
635                            .collect(),
636                    ),
637                    ..Default::default()
638                }]),
639                pod_selector: LabelSelector {
640                    match_expressions: None,
641                    match_labels: Some(mz.default_labels()),
642                },
643                policy_types: Some(vec!["Egress".to_owned()]),
644                ..Default::default()
645            }),
646        }]);
647    }
648    network_policies
649}
650
651fn create_service_account_object(
652    config: &super::MaterializeControllerArgs,
653    mz: &Materialize,
654) -> Option<ServiceAccount> {
655    if mz.create_service_account() {
656        let mut annotations: BTreeMap<String, String> = mz
657            .spec
658            .service_account_annotations
659            .clone()
660            .unwrap_or_default();
661        if let (CloudProvider::Aws, Some(role_arn)) = (
662            config.cloud_provider,
663            mz.spec
664                .environmentd_iam_role_arn
665                .as_deref()
666                .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
667        ) {
668            warn!(
669                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
670            );
671            annotations.insert(
672                "eks.amazonaws.com/role-arn".to_string(),
673                role_arn.to_string(),
674            );
675        };
676
677        let mut labels = mz.default_labels();
678        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
679
680        Some(ServiceAccount {
681            metadata: ObjectMeta {
682                annotations: Some(annotations),
683                labels: Some(labels),
684                ..mz.managed_resource_meta(mz.service_account_name())
685            },
686            ..Default::default()
687        })
688    } else {
689        None
690    }
691}
692
693fn create_role_object(mz: &Materialize) -> Role {
694    Role {
695        metadata: mz.managed_resource_meta(mz.role_name()),
696        rules: Some(vec![
697            PolicyRule {
698                api_groups: Some(vec!["apps".to_string()]),
699                resources: Some(vec!["statefulsets".to_string()]),
700                verbs: vec![
701                    "get".to_string(),
702                    "list".to_string(),
703                    "watch".to_string(),
704                    "create".to_string(),
705                    "update".to_string(),
706                    "patch".to_string(),
707                    "delete".to_string(),
708                ],
709                ..Default::default()
710            },
711            PolicyRule {
712                api_groups: Some(vec!["".to_string()]),
713                resources: Some(vec![
714                    "persistentvolumeclaims".to_string(),
715                    "pods".to_string(),
716                    "secrets".to_string(),
717                    "services".to_string(),
718                ]),
719                verbs: vec![
720                    "get".to_string(),
721                    "list".to_string(),
722                    "watch".to_string(),
723                    "create".to_string(),
724                    "update".to_string(),
725                    "patch".to_string(),
726                    "delete".to_string(),
727                ],
728                ..Default::default()
729            },
730            PolicyRule {
731                api_groups: Some(vec!["".to_string()]),
732                resources: Some(vec!["configmaps".to_string()]),
733                verbs: vec!["get".to_string()],
734                ..Default::default()
735            },
736            PolicyRule {
737                api_groups: Some(vec!["materialize.cloud".to_string()]),
738                resources: Some(vec!["vpcendpoints".to_string()]),
739                verbs: vec![
740                    "get".to_string(),
741                    "list".to_string(),
742                    "watch".to_string(),
743                    "create".to_string(),
744                    "update".to_string(),
745                    "patch".to_string(),
746                    "delete".to_string(),
747                ],
748                ..Default::default()
749            },
750            PolicyRule {
751                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
752                resources: Some(vec!["pods".to_string()]),
753                verbs: vec!["get".to_string(), "list".to_string()],
754                ..Default::default()
755            },
756            PolicyRule {
757                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
758                resources: Some(vec![
759                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
760                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
761                ]),
762                verbs: vec!["get".to_string()],
763                ..Default::default()
764            },
765        ]),
766    }
767}
768
769fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
770    RoleBinding {
771        metadata: mz.managed_resource_meta(mz.role_binding_name()),
772        role_ref: RoleRef {
773            api_group: "".to_string(),
774            kind: "Role".to_string(),
775            name: mz.role_name(),
776        },
777        subjects: Some(vec![Subject {
778            api_group: Some("".to_string()),
779            kind: "ServiceAccount".to_string(),
780            name: mz.service_account_name(),
781            namespace: Some(mz.namespace()),
782        }]),
783    }
784}
785
786fn create_public_service_object(
787    config: &super::MaterializeControllerArgs,
788    mz: &Materialize,
789    generation: u64,
790) -> Service {
791    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
792}
793
794fn create_generation_service_object(
795    config: &super::MaterializeControllerArgs,
796    mz: &Materialize,
797    generation: u64,
798) -> Service {
799    create_base_service_object(
800        config,
801        mz,
802        generation,
803        &mz.environmentd_generation_service_name(generation),
804    )
805}
806
807fn create_base_service_object(
808    config: &super::MaterializeControllerArgs,
809    mz: &Materialize,
810    generation: u64,
811    service_name: &str,
812) -> Service {
813    let ports = vec![
814        ServicePort {
815            port: config.environmentd_sql_port.into(),
816            protocol: Some("TCP".to_string()),
817            name: Some("sql".to_string()),
818            ..Default::default()
819        },
820        ServicePort {
821            port: config.environmentd_http_port.into(),
822            protocol: Some("TCP".to_string()),
823            name: Some("https".to_string()),
824            ..Default::default()
825        },
826        ServicePort {
827            port: config.environmentd_internal_sql_port.into(),
828            protocol: Some("TCP".to_string()),
829            name: Some("internal-sql".to_string()),
830            ..Default::default()
831        },
832        ServicePort {
833            port: config.environmentd_internal_http_port.into(),
834            protocol: Some("TCP".to_string()),
835            name: Some("internal-http".to_string()),
836            ..Default::default()
837        },
838    ];
839
840    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
841
842    let spec = ServiceSpec {
843        type_: Some("ClusterIP".to_string()),
844        cluster_ip: Some("None".to_string()),
845        selector: Some(selector),
846        ports: Some(ports),
847        ..Default::default()
848    };
849
850    Service {
851        metadata: mz.managed_resource_meta(service_name.to_string()),
852        spec: Some(spec),
853        status: None,
854    }
855}
856
857fn create_persist_pubsub_service(
858    config: &super::MaterializeControllerArgs,
859    mz: &Materialize,
860    generation: u64,
861) -> Service {
862    Service {
863        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
864        spec: Some(ServiceSpec {
865            type_: Some("ClusterIP".to_string()),
866            cluster_ip: Some("None".to_string()),
867            selector: Some(btreemap! {
868                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
869            }),
870            ports: Some(vec![ServicePort {
871                name: Some("grpc".to_string()),
872                protocol: Some("TCP".to_string()),
873                port: config.environmentd_internal_persist_pubsub_port.into(),
874                ..Default::default()
875            }]),
876            ..Default::default()
877        }),
878        status: None,
879    }
880}
881
882fn create_environmentd_certificate(
883    config: &super::MaterializeControllerArgs,
884    mz: &Materialize,
885) -> Option<Certificate> {
886    create_certificate(
887        config.default_certificate_specs.internal.clone(),
888        mz,
889        mz.spec.internal_certificate_spec.clone(),
890        mz.environmentd_certificate_name(),
891        mz.environmentd_certificate_secret_name(),
892        Some(vec![
893            mz.environmentd_service_name(),
894            mz.environmentd_service_internal_fqdn(),
895        ]),
896    )
897}
898
899fn create_environmentd_statefulset_object(
900    config: &super::MaterializeControllerArgs,
901    tracing: &TracingCliArgs,
902    mz: &Materialize,
903    generation: u64,
904) -> StatefulSet {
905    // IMPORTANT: Only pass secrets via environment variables. All other
906    // parameters should be passed as command line arguments, possibly gated
907    // with a `meets_minimum_version` call. This ensures typos cause
908    // `environmentd` to fail to start, rather than silently ignoring the
909    // configuration.
910    //
911    // When passing a secret, use a `SecretKeySelector` to forward a secret into
912    // the pod. Do *not* hardcode a secret as the value directly, as doing so
913    // will leak the secret to anyone with permission to describe the pod.
914    let mut env = vec![
915        EnvVar {
916            name: "MZ_METADATA_BACKEND_URL".to_string(),
917            value_from: Some(EnvVarSource {
918                secret_key_ref: Some(SecretKeySelector {
919                    name: mz.backend_secret_name(),
920                    key: "metadata_backend_url".to_string(),
921                    optional: Some(false),
922                }),
923                ..Default::default()
924            }),
925            ..Default::default()
926        },
927        EnvVar {
928            name: "MZ_PERSIST_BLOB_URL".to_string(),
929            value_from: Some(EnvVarSource {
930                secret_key_ref: Some(SecretKeySelector {
931                    name: mz.backend_secret_name(),
932                    key: "persist_backend_url".to_string(),
933                    optional: Some(false),
934                }),
935                ..Default::default()
936            }),
937            ..Default::default()
938        },
939    ];
940
941    env.push(EnvVar {
942        name: "AWS_REGION".to_string(),
943        value: Some(config.region.clone()),
944        ..Default::default()
945    });
946
947    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
948
949    let mut args = vec![];
950
951    if let Some(helm_chart_version) = &config.helm_chart_version {
952        args.push(format!("--helm-chart-version={helm_chart_version}"));
953    }
954
955    // Add environment ID argument.
956    args.push(format!(
957        "--environment-id={}",
958        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
959    ));
960
961    // Add clusterd image argument based on environmentd tag.
962    args.push(format!(
963        "--clusterd-image={}",
964        matching_image_from_environmentd_image_ref(
965            &mz.spec.environmentd_image_ref,
966            "clusterd",
967            None
968        )
969    ));
970
971    // Add cluster and storage host size arguments.
972    args.extend(
973        [
974            config
975                .environmentd_cluster_replica_sizes
976                .as_ref()
977                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
978            config
979                .bootstrap_default_cluster_replica_size
980                .as_ref()
981                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
982            config
983                .bootstrap_builtin_system_cluster_replica_size
984                .as_ref()
985                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
986            config
987                .bootstrap_builtin_probe_cluster_replica_size
988                .as_ref()
989                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
990            config
991                .bootstrap_builtin_support_cluster_replica_size
992                .as_ref()
993                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
994            config
995                .bootstrap_builtin_catalog_server_cluster_replica_size
996                .as_ref()
997                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
998            config
999                .bootstrap_builtin_analytics_cluster_replica_size
1000                .as_ref()
1001                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1002            config
1003                .bootstrap_builtin_system_cluster_replication_factor
1004                .as_ref()
1005                .map(|replication_factor| {
1006                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1007                }),
1008            config
1009                .bootstrap_builtin_probe_cluster_replication_factor
1010                .as_ref()
1011                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1012            config
1013                .bootstrap_builtin_support_cluster_replication_factor
1014                .as_ref()
1015                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1016            config
1017                .bootstrap_builtin_analytics_cluster_replication_factor
1018                .as_ref()
1019                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1020        ]
1021        .into_iter()
1022        .flatten(),
1023    );
1024
1025    args.extend(
1026        config
1027            .environmentd_allowed_origins
1028            .iter()
1029            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1030    );
1031
1032    args.push(format!(
1033        "--secrets-controller={}",
1034        config.secrets_controller
1035    ));
1036
1037    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1038        if let Ok(cluster_replica_sizes) =
1039            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1040        {
1041            let cluster_replica_sizes: Vec<_> =
1042                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1043            args.push(format!(
1044                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1045                cluster_replica_sizes.join("', '")
1046            ));
1047        }
1048    }
1049    if !config.cloud_provider.is_cloud() {
1050        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1051    }
1052
1053    if config.enable_internal_statement_logging {
1054        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1055    }
1056
1057    if config.disable_statement_logging {
1058        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1059    }
1060
1061    if !mz.spec.enable_rbac {
1062        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1063    }
1064
1065    // Add persist arguments.
1066
1067    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1068    args.push("--persist-isolated-runtime-threads=-1".to_string());
1069
1070    // Add AWS arguments.
1071    if config.cloud_provider == CloudProvider::Aws {
1072        if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1073            for az in azs {
1074                args.push(format!("--availability-zone={az}"));
1075            }
1076        }
1077
1078        if let Some(environmentd_connection_role_arn) = mz
1079            .spec
1080            .environmentd_connection_role_arn
1081            .as_deref()
1082            .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1083        {
1084            args.push(format!(
1085                "--aws-connection-role-arn={}",
1086                environmentd_connection_role_arn
1087            ));
1088        }
1089        if let Some(account_id) = &config.aws_info.aws_account_id {
1090            args.push(format!("--aws-account-id={account_id}"));
1091        }
1092
1093        args.extend([format!(
1094            "--aws-secrets-controller-tags=Environment={}",
1095            mz.name_unchecked()
1096        )]);
1097        args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1098    }
1099
1100    // Add Kubernetes arguments.
1101    args.extend([
1102        "--orchestrator=kubernetes".into(),
1103        format!(
1104            "--orchestrator-kubernetes-service-account={}",
1105            &mz.service_account_name()
1106        ),
1107        format!(
1108            "--orchestrator-kubernetes-image-pull-policy={}",
1109            config.image_pull_policy.as_kebab_case_str(),
1110        ),
1111    ]);
1112    for selector in &config.clusterd_node_selector {
1113        args.push(format!(
1114            "--orchestrator-kubernetes-service-node-selector={}={}",
1115            selector.key, selector.value,
1116        ));
1117    }
1118    if mz.meets_minimum_version(&V144) {
1119        if let Some(affinity) = &config.clusterd_affinity {
1120            let affinity = serde_json::to_string(affinity).unwrap();
1121            args.push(format!(
1122                "--orchestrator-kubernetes-service-affinity={affinity}"
1123            ))
1124        }
1125        if let Some(tolerations) = &config.clusterd_tolerations {
1126            let tolerations = serde_json::to_string(tolerations).unwrap();
1127            args.push(format!(
1128                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1129            ))
1130        }
1131    }
1132    if let Some(scheduler_name) = &config.scheduler_name {
1133        args.push(format!(
1134            "--orchestrator-kubernetes-scheduler-name={}",
1135            scheduler_name
1136        ));
1137    }
1138    if mz.meets_minimum_version(&V154_DEV0) {
1139        args.extend(
1140            mz.spec
1141                .pod_annotations
1142                .as_ref()
1143                .map(|annotations| annotations.iter())
1144                .unwrap_or_default()
1145                .map(|(key, val)| {
1146                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1147                }),
1148        );
1149    }
1150    args.extend(
1151        mz.default_labels()
1152            .iter()
1153            .chain(
1154                mz.spec
1155                    .pod_labels
1156                    .as_ref()
1157                    .map(|labels| labels.iter())
1158                    .unwrap_or_default(),
1159            )
1160            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1161    );
1162    if let Some(status) = &mz.status {
1163        args.push(format!(
1164            "--orchestrator-kubernetes-name-prefix=mz{}-",
1165            status.resource_id
1166        ));
1167    }
1168
1169    // Add logging and tracing arguments.
1170    args.extend(["--log-format=json".into()]);
1171    if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1172        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1173    }
1174    // --opentelemetry-resource also configures sentry tags
1175    args.extend([
1176        format!(
1177            "--opentelemetry-resource=organization_id={}",
1178            mz.spec.environment_id
1179        ),
1180        format!(
1181            "--opentelemetry-resource=environment_name={}",
1182            mz.name_unchecked()
1183        ),
1184    ]);
1185
1186    if let Some(segment_api_key) = &config.segment_api_key {
1187        args.push(format!("--segment-api-key={}", segment_api_key));
1188        if config.segment_client_side {
1189            args.push("--segment-client-side".into());
1190        }
1191    }
1192
1193    let mut volumes = Vec::new();
1194    let mut volume_mounts = Vec::new();
1195    if issuer_ref_defined(
1196        &config.default_certificate_specs.internal,
1197        &mz.spec.internal_certificate_spec,
1198    ) {
1199        volumes.push(Volume {
1200            name: "certificate".to_owned(),
1201            secret: Some(SecretVolumeSource {
1202                default_mode: Some(0o400),
1203                secret_name: Some(mz.environmentd_certificate_secret_name()),
1204                items: None,
1205                optional: Some(false),
1206            }),
1207            ..Default::default()
1208        });
1209        volume_mounts.push(VolumeMount {
1210            name: "certificate".to_owned(),
1211            mount_path: "/etc/materialized".to_owned(),
1212            read_only: Some(true),
1213            ..Default::default()
1214        });
1215        args.extend([
1216            "--tls-mode=require".into(),
1217            "--tls-cert=/etc/materialized/tls.crt".into(),
1218            "--tls-key=/etc/materialized/tls.key".into(),
1219        ]);
1220    } else {
1221        args.push("--tls-mode=disable".to_string());
1222    }
1223    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1224        args.push(format!(
1225            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1226            ephemeral_volume_class
1227        ));
1228    }
1229    // The `materialize` user used by clusterd always has gid 999.
1230    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1231
1232    // Add Sentry arguments.
1233    if let Some(sentry_dsn) = &tracing.sentry_dsn {
1234        args.push(format!("--sentry-dsn={}", sentry_dsn));
1235        if let Some(sentry_environment) = &tracing.sentry_environment {
1236            args.push(format!("--sentry-environment={}", sentry_environment));
1237        }
1238        args.push(format!("--sentry-tag=region={}", config.region));
1239    }
1240
1241    // Add Persist PubSub arguments
1242    args.push(format!(
1243        "--persist-pubsub-url=http://{}:{}",
1244        mz.persist_pubsub_service_name(generation),
1245        config.environmentd_internal_persist_pubsub_port,
1246    ));
1247    args.push(format!(
1248        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1249        config.environmentd_internal_persist_pubsub_port
1250    ));
1251
1252    args.push(format!("--deploy-generation={}", generation));
1253
1254    // Add URL for internal user impersonation endpoint
1255    args.push(format!(
1256        "--internal-console-redirect-url={}",
1257        &config.internal_console_proxy_url,
1258    ));
1259
1260    if !config.collect_pod_metrics {
1261        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1262    }
1263    if config.enable_prometheus_scrape_annotations {
1264        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1265    }
1266
1267    if config.disable_license_key_checks {
1268        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1269            args.push("--disable-license-key-checks".into());
1270        }
1271    } else if mz.meets_minimum_version(&V140_DEV0) {
1272        volume_mounts.push(VolumeMount {
1273            name: "license-key".to_string(),
1274            mount_path: "/license_key".to_string(),
1275            ..Default::default()
1276        });
1277        volumes.push(Volume {
1278            name: "license-key".to_string(),
1279            secret: Some(SecretVolumeSource {
1280                default_mode: Some(256),
1281                optional: Some(false),
1282                secret_name: Some(mz.backend_secret_name()),
1283                items: Some(vec![KeyToPath {
1284                    key: "license_key".to_string(),
1285                    path: "license_key".to_string(),
1286                    ..Default::default()
1287                }]),
1288                ..Default::default()
1289            }),
1290            ..Default::default()
1291        });
1292        env.push(EnvVar {
1293            name: "MZ_LICENSE_KEY".to_string(),
1294            value: Some("/license_key/license_key".to_string()),
1295            ..Default::default()
1296        });
1297    }
1298
1299    // Add user-specified extra arguments.
1300    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1301        args.extend(extra_args.iter().cloned());
1302    }
1303
1304    let probe = Probe {
1305        initial_delay_seconds: Some(1),
1306        failure_threshold: Some(12),
1307        tcp_socket: Some(TCPSocketAction {
1308            host: None,
1309            port: IntOrString::Int(config.environmentd_sql_port.into()),
1310        }),
1311        ..Default::default()
1312    };
1313
1314    let security_context = if config.enable_security_context {
1315        // Since we want to adhere to the most restrictive security context, all
1316        // of these fields have to be set how they are.
1317        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1318        Some(SecurityContext {
1319            run_as_non_root: Some(true),
1320            capabilities: Some(Capabilities {
1321                drop: Some(vec!["ALL".to_string()]),
1322                ..Default::default()
1323            }),
1324            seccomp_profile: Some(SeccompProfile {
1325                type_: "RuntimeDefault".to_string(),
1326                ..Default::default()
1327            }),
1328            allow_privilege_escalation: Some(false),
1329            ..Default::default()
1330        })
1331    } else {
1332        None
1333    };
1334
1335    let ports = vec![
1336        ContainerPort {
1337            container_port: config.environmentd_sql_port.into(),
1338            name: Some("sql".to_owned()),
1339            ..Default::default()
1340        },
1341        ContainerPort {
1342            container_port: config.environmentd_internal_sql_port.into(),
1343            name: Some("internal-sql".to_owned()),
1344            ..Default::default()
1345        },
1346        ContainerPort {
1347            container_port: config.environmentd_http_port.into(),
1348            name: Some("http".to_owned()),
1349            ..Default::default()
1350        },
1351        ContainerPort {
1352            container_port: config.environmentd_internal_http_port.into(),
1353            name: Some("internal-http".to_owned()),
1354            ..Default::default()
1355        },
1356        ContainerPort {
1357            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1358            name: Some("persist-pubsub".to_owned()),
1359            ..Default::default()
1360        },
1361    ];
1362
1363    // Add networking arguments.
1364    if mz.meets_minimum_version(&V147_DEV0) {
1365        volume_mounts.push(VolumeMount {
1366            name: "listeners-configmap".to_string(),
1367            mount_path: "/listeners".to_string(),
1368            ..Default::default()
1369        });
1370        volumes.push(Volume {
1371            name: "listeners-configmap".to_string(),
1372            config_map: Some(ConfigMapVolumeSource {
1373                name: mz.listeners_configmap_name(generation),
1374                default_mode: Some(256),
1375                optional: Some(false),
1376                items: Some(vec![KeyToPath {
1377                    key: "listeners.json".to_string(),
1378                    path: "listeners.json".to_string(),
1379                    ..Default::default()
1380                }]),
1381            }),
1382            ..Default::default()
1383        });
1384        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1385        if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1386            args.push("--system-parameter-default=enable_password_auth=true".into());
1387            env.push(EnvVar {
1388                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1389                value_from: Some(EnvVarSource {
1390                    secret_key_ref: Some(SecretKeySelector {
1391                        name: mz.backend_secret_name(),
1392                        key: "external_login_password_mz_system".to_string(),
1393                        optional: Some(false),
1394                    }),
1395                    ..Default::default()
1396                }),
1397                ..Default::default()
1398            })
1399        }
1400    } else {
1401        args.extend([
1402            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1403            format!(
1404                "--http-listen-addr=0.0.0.0:{}",
1405                config.environmentd_http_port
1406            ),
1407            format!(
1408                "--internal-sql-listen-addr=0.0.0.0:{}",
1409                config.environmentd_internal_sql_port
1410            ),
1411            format!(
1412                "--internal-http-listen-addr=0.0.0.0:{}",
1413                config.environmentd_internal_http_port
1414            ),
1415        ]);
1416    }
1417
1418    let container = Container {
1419        name: "environmentd".to_owned(),
1420        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1421        image_pull_policy: Some(config.image_pull_policy.to_string()),
1422        ports: Some(ports),
1423        args: Some(args),
1424        env: Some(env),
1425        volume_mounts: Some(volume_mounts),
1426        liveness_probe: Some(probe.clone()),
1427        readiness_probe: Some(probe),
1428        resources: mz.spec.environmentd_resource_requirements.clone(),
1429        security_context: security_context.clone(),
1430        ..Default::default()
1431    };
1432
1433    let mut pod_template_labels = mz.default_labels();
1434    pod_template_labels.insert(
1435        "materialize.cloud/name".to_owned(),
1436        mz.environmentd_statefulset_name(generation),
1437    );
1438    pod_template_labels.insert(
1439        "materialize.cloud/app".to_owned(),
1440        mz.environmentd_app_name(),
1441    );
1442    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1443    pod_template_labels.extend(
1444        mz.spec
1445            .pod_labels
1446            .as_ref()
1447            .map(|labels| labels.iter())
1448            .unwrap_or_default()
1449            .map(|(key, value)| (key.clone(), value.clone())),
1450    );
1451
1452    let mut pod_template_annotations = btreemap! {
1453        // We can re-enable eviction once we have HA
1454        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1455
1456        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1457        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1458        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1459        "materialize.cloud/generation".to_owned() => generation.to_string(),
1460    };
1461    if config.enable_prometheus_scrape_annotations {
1462        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1463        pod_template_annotations.insert(
1464            "prometheus.io/port".to_owned(),
1465            config.environmentd_internal_http_port.to_string(),
1466        );
1467        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1468        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1469        pod_template_annotations.insert(
1470            "materialize.prometheus.io/mz_usage_path".to_owned(),
1471            "/metrics/mz_usage".to_string(),
1472        );
1473        pod_template_annotations.insert(
1474            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1475            "/metrics/mz_frontier".to_string(),
1476        );
1477        pod_template_annotations.insert(
1478            "materialize.prometheus.io/mz_compute_path".to_owned(),
1479            "/metrics/mz_compute".to_string(),
1480        );
1481        pod_template_annotations.insert(
1482            "materialize.prometheus.io/mz_storage_path".to_owned(),
1483            "/metrics/mz_storage".to_string(),
1484        );
1485    }
1486    pod_template_annotations.extend(
1487        mz.spec
1488            .pod_annotations
1489            .as_ref()
1490            .map(|annotations| annotations.iter())
1491            .unwrap_or_default()
1492            .map(|(key, value)| (key.clone(), value.clone())),
1493    );
1494
1495    let mut tolerations = vec![
1496        // When the node becomes `NotReady` it indicates there is a problem with the node,
1497        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1498        // But we want to limit this to 30s for faster recovery
1499        Toleration {
1500            effect: Some("NoExecute".into()),
1501            key: Some("node.kubernetes.io/not-ready".into()),
1502            operator: Some("Exists".into()),
1503            toleration_seconds: Some(30),
1504            value: None,
1505        },
1506        Toleration {
1507            effect: Some("NoExecute".into()),
1508            key: Some("node.kubernetes.io/unreachable".into()),
1509            operator: Some("Exists".into()),
1510            toleration_seconds: Some(30),
1511            value: None,
1512        },
1513    ];
1514    if let Some(user_tolerations) = &config.environmentd_tolerations {
1515        tolerations.extend(user_tolerations.iter().cloned());
1516    }
1517    let tolerations = Some(tolerations);
1518
1519    let pod_template_spec = PodTemplateSpec {
1520        // not using managed_resource_meta because the pod should be owned
1521        // by the statefulset, not the materialize instance
1522        metadata: Some(ObjectMeta {
1523            labels: Some(pod_template_labels),
1524            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1525            ..Default::default()
1526        }),
1527        spec: Some(PodSpec {
1528            containers: vec![container],
1529            node_selector: Some(
1530                config
1531                    .environmentd_node_selector
1532                    .iter()
1533                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1534                    .collect(),
1535            ),
1536            affinity: config.environmentd_affinity.clone(),
1537            scheduler_name: config.scheduler_name.clone(),
1538            service_account_name: Some(mz.service_account_name()),
1539            volumes: Some(volumes),
1540            security_context: Some(PodSecurityContext {
1541                fs_group: Some(999),
1542                run_as_user: Some(999),
1543                run_as_group: Some(999),
1544                ..Default::default()
1545            }),
1546            tolerations,
1547            // This (apparently) has the side effect of automatically starting a new pod
1548            // when the previous pod is currently terminating. This side steps the statefulset fencing
1549            // but allows for quicker recovery from a failed node
1550            // The Kubernetes documentation strongly advises against this
1551            // setting, as StatefulSets attempt to provide "at most once"
1552            // semantics [0]-- that is, the guarantee that for a given pod in a
1553            // StatefulSet there is *at most* one pod with that identity running
1554            // in the cluster
1555            //
1556            // Materialize, however, has been carefully designed to *not* rely
1557            // on this guarantee. (In fact, we do not believe that correct
1558            // distributed systems can meaningfully rely on Kubernetes's
1559            // guarantee--network packets from a pod can be arbitrarily delayed,
1560            // long past that pod's termination.) Running two `environmentd`
1561            // processes is safe: the newer process will safely and correctly
1562            // fence out the older process via CockroachDB. In the future,
1563            // we'll support running multiple `environmentd` processes in
1564            // parallel for high availability.
1565            //
1566            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1567            termination_grace_period_seconds: Some(0),
1568            ..Default::default()
1569        }),
1570    };
1571
1572    let mut match_labels = BTreeMap::new();
1573    match_labels.insert(
1574        "materialize.cloud/name".to_owned(),
1575        mz.environmentd_statefulset_name(generation),
1576    );
1577
1578    let statefulset_spec = StatefulSetSpec {
1579        replicas: Some(1),
1580        template: pod_template_spec,
1581        update_strategy: Some(StatefulSetUpdateStrategy {
1582            rolling_update: None,
1583            type_: Some("OnDelete".to_owned()),
1584        }),
1585        service_name: Some(mz.environmentd_service_name()),
1586        selector: LabelSelector {
1587            match_expressions: None,
1588            match_labels: Some(match_labels),
1589        },
1590        ..Default::default()
1591    };
1592
1593    StatefulSet {
1594        metadata: ObjectMeta {
1595            annotations: Some(btreemap! {
1596                "materialize.cloud/generation".to_owned() => generation.to_string(),
1597                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1598            }),
1599            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1600        },
1601        spec: Some(statefulset_spec),
1602        status: None,
1603    }
1604}
1605
1606fn create_connection_info(
1607    config: &super::MaterializeControllerArgs,
1608    mz: &Materialize,
1609    generation: u64,
1610) -> ConnectionInfo {
1611    let external_enable_tls = issuer_ref_defined(
1612        &config.default_certificate_specs.internal,
1613        &mz.spec.internal_certificate_spec,
1614    );
1615    let authenticator_kind = mz.spec.authenticator_kind;
1616    let mut listeners_config = ListenersConfig {
1617        sql: btreemap! {
1618            "external".to_owned() => SqlListenerConfig{
1619                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1620                authenticator_kind,
1621                allowed_roles: AllowedRoles::Normal,
1622                enable_tls: external_enable_tls,
1623            },
1624            "internal".to_owned() => SqlListenerConfig{
1625                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1626                authenticator_kind: AuthenticatorKind::None,
1627                // Should this just be Internal?
1628                allowed_roles: AllowedRoles::NormalAndInternal,
1629                enable_tls: false,
1630            },
1631        },
1632        http: btreemap! {
1633            "external".to_owned() => HttpListenerConfig{
1634                base: BaseListenerConfig {
1635                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1636                    authenticator_kind,
1637                    allowed_roles: AllowedRoles::Normal,
1638                    enable_tls: external_enable_tls,
1639                },
1640                routes: HttpRoutesEnabled{
1641                    base: true,
1642                    webhook: true,
1643                    internal: false,
1644                    metrics: false,
1645                    profiling: false,
1646                }
1647            },
1648            "internal".to_owned() => HttpListenerConfig{
1649                base: BaseListenerConfig {
1650                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1651                    authenticator_kind: AuthenticatorKind::None,
1652                    // Should this just be Internal?
1653                    allowed_roles: AllowedRoles::NormalAndInternal,
1654                    enable_tls: false,
1655                },
1656                routes: HttpRoutesEnabled{
1657                    base: true,
1658                    webhook: true,
1659                    internal: true,
1660                    metrics: true,
1661                    profiling: true,
1662                }
1663            },
1664        },
1665    };
1666    if authenticator_kind == AuthenticatorKind::Password {
1667        listeners_config.sql.remove("internal");
1668        listeners_config.http.remove("internal");
1669
1670        listeners_config.sql.get_mut("external").map(|listener| {
1671            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1672            listener
1673        });
1674        listeners_config.http.get_mut("external").map(|listener| {
1675            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1676            listener.routes.internal = true;
1677            listener.routes.profiling = true;
1678            listener
1679        });
1680
1681        listeners_config.http.insert(
1682            "metrics".to_owned(),
1683            HttpListenerConfig {
1684                base: BaseListenerConfig {
1685                    addr: SocketAddr::new(
1686                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1687                        config.environmentd_internal_http_port,
1688                    ),
1689                    authenticator_kind: AuthenticatorKind::None,
1690                    allowed_roles: AllowedRoles::NormalAndInternal,
1691                    enable_tls: false,
1692                },
1693                routes: HttpRoutesEnabled {
1694                    base: false,
1695                    webhook: false,
1696                    internal: false,
1697                    metrics: true,
1698                    profiling: false,
1699                },
1700            },
1701        );
1702    }
1703
1704    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1705    let listeners_configmap = ConfigMap {
1706        binary_data: None,
1707        data: Some(btreemap! {
1708            "listeners.json".to_owned() => listeners_json,
1709        }),
1710        immutable: None,
1711        metadata: ObjectMeta {
1712            annotations: Some(btreemap! {
1713                "materialize.cloud/generation".to_owned() => generation.to_string(),
1714            }),
1715            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1716        },
1717    };
1718
1719    let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1720        AuthenticatorKind::Password => {
1721            let scheme = if external_enable_tls { "https" } else { "http" };
1722            (
1723                scheme,
1724                config.environmentd_http_port,
1725                Some(mz.spec.backend_secret_name.clone()),
1726            )
1727        }
1728        _ => ("http", config.environmentd_internal_http_port, None),
1729    };
1730    let environmentd_url = format!(
1731        "{}://{}.{}.svc.cluster.local:{}",
1732        scheme,
1733        mz.environmentd_generation_service_name(generation),
1734        mz.namespace(),
1735        leader_api_port,
1736    );
1737    ConnectionInfo {
1738        environmentd_url,
1739        listeners_configmap,
1740        mz_system_secret_name,
1741    }
1742}
1743
1744// see materialize/src/environmentd/src/http.rs
1745#[derive(Debug, Deserialize, PartialEq, Eq)]
1746struct BecomeLeaderResponse {
1747    result: BecomeLeaderResult,
1748}
1749
1750#[derive(Debug, Deserialize, PartialEq, Eq)]
1751enum BecomeLeaderResult {
1752    Success,
1753    Failure { message: String },
1754}
1755
1756#[derive(Debug, Deserialize, PartialEq, Eq)]
1757struct SkipCatchupError {
1758    message: String,
1759}
1760
1761fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String {
1762    format!("{}-{}", statefulset.name_unchecked(), idx)
1763}