mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19    api::{
20        apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21        core::v1::{
22            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23            EnvVarSource, KeyToPath, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25            Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26            VolumeMount,
27        },
28        networking::v1::{
29            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31        },
32        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33    },
34    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40    ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::{
54    Certificate, CertificatePrivateKeyAlgorithm,
55};
56use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
57use mz_orchestrator_tracing::TracingCliArgs;
58use mz_ore::instrument;
59
60static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
61    major: 0,
62    minor: 140,
63    patch: 0,
64    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
65    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
66});
67const V143: Version = Version::new(0, 143, 0);
68const V144: Version = Version::new(0, 144, 0);
69static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
70    major: 0,
71    minor: 147,
72    patch: 0,
73    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
74    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
75});
76const V153: Version = Version::new(0, 153, 0);
77static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
78    major: 0,
79    minor: 154,
80    patch: 0,
81    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
82    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
83});
84
85/// Describes the status of a deployment.
86///
87/// This is a simplified representation of `DeploymentState`, suitable for
88/// announcement to the external orchestrator.
89#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91    /// This deployment is not the leader. It is initializing and is not yet
92    /// ready to become the leader.
93    Initializing,
94    /// This deployment is not the leader, but it is ready to become the leader.
95    ReadyToPromote,
96    /// This deployment is in the process of becoming the leader.
97    Promoting,
98    /// This deployment is the leader.
99    IsLeader,
100}
101
102#[derive(Deserialize, Serialize)]
103pub struct LoginCredentials {
104    username: String,
105    // TODO Password type?
106    password: String,
107}
108
109#[derive(Debug, Serialize)]
110pub struct ConnectionInfo {
111    pub environmentd_url: String,
112    pub mz_system_secret_name: Option<String>,
113    pub listeners_configmap: ConfigMap,
114}
115
116#[derive(Debug, Serialize)]
117pub struct Resources {
118    pub generation: u64,
119    pub environmentd_network_policies: Vec<NetworkPolicy>,
120    pub service_account: Box<Option<ServiceAccount>>,
121    pub role: Box<Role>,
122    pub role_binding: Box<RoleBinding>,
123    pub public_service: Box<Service>,
124    pub generation_service: Box<Service>,
125    pub persist_pubsub_service: Box<Service>,
126    pub environmentd_certificate: Box<Option<Certificate>>,
127    pub environmentd_statefulset: Box<StatefulSet>,
128    pub connection_info: Box<ConnectionInfo>,
129}
130
131impl Resources {
132    pub fn new(
133        config: &super::MaterializeControllerArgs,
134        tracing: &TracingCliArgs,
135        orchestratord_namespace: &str,
136        mz: &Materialize,
137        generation: u64,
138    ) -> Self {
139        let environmentd_network_policies =
140            create_environmentd_network_policies(config, mz, orchestratord_namespace);
141
142        let service_account = Box::new(create_service_account_object(config, mz));
143        let role = Box::new(create_role_object(mz));
144        let role_binding = Box::new(create_role_binding_object(mz));
145        let public_service = Box::new(create_public_service_object(config, mz, generation));
146        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
147        let persist_pubsub_service =
148            Box::new(create_persist_pubsub_service(config, mz, generation));
149        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
150        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
151            config, tracing, mz, generation,
152        ));
153        let connection_info = Box::new(create_connection_info(config, mz, generation));
154
155        Self {
156            generation,
157            environmentd_network_policies,
158            service_account,
159            role,
160            role_binding,
161            public_service,
162            generation_service,
163            persist_pubsub_service,
164            environmentd_certificate,
165            environmentd_statefulset,
166            connection_info,
167        }
168    }
169
170    #[instrument]
171    pub async fn apply(
172        &self,
173        client: &Client,
174        increment_generation: bool,
175        force_promote: bool,
176        namespace: &str,
177    ) -> Result<Option<Action>, anyhow::Error> {
178        let environmentd_network_policy_api: Api<NetworkPolicy> =
179            Api::namespaced(client.clone(), namespace);
180        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
181        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
182        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
183        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
184        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
185        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
186        let pod_api: Api<Pod> = Api::namespaced(client.clone(), namespace);
187        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
188        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
189
190        for policy in &self.environmentd_network_policies {
191            trace!("applying network policy {}", policy.name_unchecked());
192            apply_resource(&environmentd_network_policy_api, policy).await?;
193        }
194
195        if let Some(service_account) = &*self.service_account {
196            trace!("applying environmentd service account");
197            apply_resource(&service_account_api, service_account).await?;
198        }
199
200        trace!("applying environmentd role");
201        apply_resource(&role_api, &*self.role).await?;
202
203        trace!("applying environmentd role binding");
204        apply_resource(&role_binding_api, &*self.role_binding).await?;
205
206        trace!("applying environmentd per-generation service");
207        apply_resource(&service_api, &*self.generation_service).await?;
208
209        trace!("creating persist pubsub service");
210        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
211
212        if let Some(certificate) = &*self.environmentd_certificate {
213            trace!("creating new environmentd certificate");
214            apply_resource(&certificate_api, certificate).await?;
215        }
216
217        trace!("applying listeners configmap");
218        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
219
220        trace!("creating new environmentd statefulset");
221        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
222
223        // until we have full zero downtime upgrades, we have a tradeoff: if
224        // we use the graceful upgrade mechanism, we minimize environmentd
225        // unavailability but require a full clusterd rehydration every time,
226        // and if we use the in-place upgrade mechanism, we cause a few
227        // minutes of environmentd downtime, but as long as the environmentd
228        // version didn't change, the existing clusterds will remain running
229        // and won't need to rehydrate. during a version bump, the tradeoff
230        // here is obvious (we need to rehydrate either way, so minimizing
231        // environmentd downtime in the meantime is strictly better), but if
232        // we need to force a rollout some other time (for instance, to
233        // increase the environmentd memory request, or something like that),
234        // it is often better to accept the environmentd unavailability in
235        // order to get the environment as a whole back to a working state
236        // sooner. once clusterd rehydration gets moved ahead of the leader
237        // promotion step, this will no longer make a difference and we can
238        // remove the extra codepath.
239        if increment_generation {
240            let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
241
242            let statefulset = get_resource(
243                &statefulset_api,
244                &self.environmentd_statefulset.name_unchecked(),
245            )
246            .await?;
247            if statefulset
248                .and_then(|statefulset| statefulset.status)
249                .and_then(|status| status.ready_replicas)
250                .unwrap_or(0)
251                == 0
252            {
253                trace!("environmentd statefulset is not ready yet...");
254                return Ok(Some(retry_action));
255            }
256
257            let http_client = match &self.connection_info.mz_system_secret_name {
258                Some(mz_system_secret_name) => {
259                    let http_client = reqwest::Client::builder()
260                        .timeout(std::time::Duration::from_secs(10))
261                        .cookie_store(true)
262                        // TODO add_root_certificate instead
263                        .danger_accept_invalid_certs(true)
264                        .build()
265                        .unwrap();
266                    if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
267                        if let Some(password) =
268                            data.get("external_login_password_mz_system").cloned()
269                        {
270                            let password = String::from_utf8_lossy(&password.0).to_string();
271                            let login_url = reqwest::Url::parse(&format!(
272                                "{}/api/login",
273                                self.connection_info.environmentd_url,
274                            ))
275                            .unwrap();
276                            match http_client
277                                .post(login_url)
278                                .body(serde_json::to_string(&LoginCredentials {
279                                    username: "mz_system".to_owned(),
280                                    password,
281                                })?)
282                                .header("Content-Type", "application/json")
283                                .send()
284                                .await
285                            {
286                                Ok(response) => {
287                                    if let Err(e) = response.error_for_status() {
288                                        trace!(
289                                            "failed to login to environmentd, retrying... ({e})"
290                                        );
291                                        return Ok(Some(retry_action));
292                                    }
293                                }
294                                Err(e) => {
295                                    trace!("failed to connect to environmentd, retrying... ({e})");
296                                    return Ok(Some(retry_action));
297                                }
298                            };
299                        }
300                    };
301                    http_client
302                }
303                None => reqwest::Client::builder()
304                    .timeout(std::time::Duration::from_secs(10))
305                    .build()
306                    .unwrap(),
307            };
308            let status_url = reqwest::Url::parse(&format!(
309                "{}/api/leader/status",
310                self.connection_info.environmentd_url,
311            ))
312            .unwrap();
313
314            match http_client.get(status_url.clone()).send().await {
315                Ok(response) => {
316                    let response: BTreeMap<String, DeploymentStatus> =
317                        match response.error_for_status() {
318                            Ok(response) => response.json().await?,
319                            Err(e) => {
320                                trace!("failed to get status of environmentd, retrying... ({e})");
321                                return Ok(Some(retry_action));
322                            }
323                        };
324                    if force_promote {
325                        trace!("skipping cluster catchup");
326                        let skip_catchup_url = reqwest::Url::parse(&format!(
327                            "{}/api/leader/skip-catchup",
328                            self.connection_info.environmentd_url,
329                        ))
330                        .unwrap();
331                        let response = http_client.post(skip_catchup_url).send().await?;
332                        if response.status() == StatusCode::BAD_REQUEST {
333                            let err: SkipCatchupError = response.json().await?;
334                            bail!("failed to skip catchup: {}", err.message);
335                        }
336                    } else if response["status"] == DeploymentStatus::Initializing {
337                        trace!("environmentd is still initializing, retrying...");
338                        return Ok(Some(retry_action));
339                    } else {
340                        trace!("environmentd is ready");
341                    }
342                }
343                Err(e) => {
344                    trace!("failed to connect to environmentd, retrying... ({e})");
345                    return Ok(Some(retry_action));
346                }
347            }
348
349            let promote_url = reqwest::Url::parse(&format!(
350                "{}/api/leader/promote",
351                self.connection_info.environmentd_url,
352            ))
353            .unwrap();
354
355            // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
356            // It is absolutely critical that this promotion is done last!
357            //
358            // If there are any failures in this method, the error handler in
359            // the caller will attempt to revert and delete the new environmentd.
360            // After promotion, the new environmentd is active, so that would
361            // cause an outage!
362            // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
363            trace!("promoting new environmentd to leader");
364            let response = http_client.post(promote_url).send().await?;
365            let response: BecomeLeaderResponse = match response.error_for_status() {
366                Ok(response) => response.json().await?,
367                Err(e) => {
368                    trace!("failed to promote environmentd, retrying... ({e})");
369                    return Ok(Some(retry_action));
370                }
371            };
372            if let BecomeLeaderResult::Failure { message } = response.result {
373                bail!("failed to promote new environmentd: {message}");
374            }
375
376            // A successful POST to the promotion endpoint only indicates
377            // that the promotion process was kicked off. It does not
378            // guarantee that the environment will be successfully promoted
379            // (e.g., if the environment crashes immediately after responding
380            // to the request, but before executing the takeover, the
381            // promotion will be lost).
382            //
383            // To guarantee the environment has been promoted successfully,
384            // we must wait to see at least one `IsLeader` status returned
385            // from the environment.
386
387            match http_client.get(status_url.clone()).send().await {
388                Ok(response) => {
389                    let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
390                    if response["status"] != DeploymentStatus::IsLeader {
391                        trace!(
392                            "environmentd is still promoting (status: {:?}), retrying...",
393                            response["status"]
394                        );
395                        return Ok(Some(retry_action));
396                    } else {
397                        trace!("environmentd is ready");
398                    }
399                }
400                Err(e) => {
401                    trace!("failed to connect to environmentd, retrying... ({e})");
402                    return Ok(Some(retry_action));
403                }
404            }
405        } else {
406            trace!("restarting environmentd pod to pick up statefulset changes");
407            delete_resource(
408                &pod_api,
409                &statefulset_pod_name(&*self.environmentd_statefulset, 0),
410            )
411            .await?;
412        }
413
414        Ok(None)
415    }
416
417    #[instrument]
418    pub async fn promote_services(
419        &self,
420        client: &Client,
421        namespace: &str,
422    ) -> Result<(), anyhow::Error> {
423        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
424
425        trace!("applying environmentd public service");
426        apply_resource(&service_api, &*self.public_service).await?;
427
428        Ok(())
429    }
430
431    #[instrument]
432    pub async fn teardown_generation(
433        &self,
434        client: &Client,
435        mz: &Materialize,
436        generation: u64,
437    ) -> Result<(), anyhow::Error> {
438        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
439        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
440        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
441
442        trace!("deleting environmentd statefulset for generation {generation}");
443        delete_resource(
444            &statefulset_api,
445            &mz.environmentd_statefulset_name(generation),
446        )
447        .await?;
448
449        trace!("deleting persist pubsub service for generation {generation}");
450        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
451
452        trace!("deleting environmentd per-generation service for generation {generation}");
453        delete_resource(
454            &service_api,
455            &mz.environmentd_generation_service_name(generation),
456        )
457        .await?;
458
459        trace!("deleting listeners configmap for generation {generation}");
460        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
461
462        Ok(())
463    }
464
465    // ideally we would just be able to hash the objects directly, but the
466    // generated kubernetes objects don't implement the Hash trait
467    pub fn generate_hash(&self) -> String {
468        let mut hasher = Sha256::new();
469        hasher.update(&serde_json::to_string(self).unwrap());
470        format!("{:x}", hasher.finalize())
471    }
472}
473
474fn create_environmentd_network_policies(
475    config: &super::MaterializeControllerArgs,
476    mz: &Materialize,
477    orchestratord_namespace: &str,
478) -> Vec<NetworkPolicy> {
479    let mut network_policies = Vec::new();
480    if config.network_policies.internal_enabled {
481        let environmentd_label_selector = LabelSelector {
482            match_labels: Some(
483                mz.default_labels()
484                    .into_iter()
485                    .chain([(
486                        "materialize.cloud/app".to_owned(),
487                        mz.environmentd_app_name(),
488                    )])
489                    .collect(),
490            ),
491            ..Default::default()
492        };
493        let orchestratord_label_selector = LabelSelector {
494            match_labels: Some(
495                config
496                    .orchestratord_pod_selector_labels
497                    .iter()
498                    .cloned()
499                    .map(|kv| (kv.key, kv.value))
500                    .collect(),
501            ),
502            ..Default::default()
503        };
504        // TODO (Alex) filter to just clusterd and environmentd,
505        // once we get a consistent set of labels for both.
506        let all_pods_label_selector = LabelSelector {
507            match_labels: Some(mz.default_labels()),
508            ..Default::default()
509        };
510        network_policies.extend([
511            // Allow all clusterd/environmentd traffic (between pods in the
512            // same environment)
513            NetworkPolicy {
514                metadata: mz
515                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
516                spec: Some(NetworkPolicySpec {
517                    egress: Some(vec![NetworkPolicyEgressRule {
518                        to: Some(vec![NetworkPolicyPeer {
519                            pod_selector: Some(all_pods_label_selector.clone()),
520                            ..Default::default()
521                        }]),
522                        ..Default::default()
523                    }]),
524                    ingress: Some(vec![NetworkPolicyIngressRule {
525                        from: Some(vec![NetworkPolicyPeer {
526                            pod_selector: Some(all_pods_label_selector.clone()),
527                            ..Default::default()
528                        }]),
529                        ..Default::default()
530                    }]),
531                    pod_selector: Some(all_pods_label_selector.clone()),
532                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
533                    ..Default::default()
534                }),
535            },
536            // Allow traffic from orchestratord to environmentd in order to hit
537            // the promotion endpoints during upgrades
538            NetworkPolicy {
539                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
540                spec: Some(NetworkPolicySpec {
541                    ingress: Some(vec![NetworkPolicyIngressRule {
542                        from: Some(vec![NetworkPolicyPeer {
543                            namespace_selector: Some(LabelSelector {
544                                match_labels: Some(btreemap! {
545                                    "kubernetes.io/metadata.name".into()
546                                        => orchestratord_namespace.into(),
547                                }),
548                                ..Default::default()
549                            }),
550                            pod_selector: Some(orchestratord_label_selector),
551                            ..Default::default()
552                        }]),
553                        ports: Some(vec![
554                            NetworkPolicyPort {
555                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
556                                protocol: Some("TCP".to_string()),
557                                ..Default::default()
558                            },
559                            NetworkPolicyPort {
560                                port: Some(IntOrString::Int(
561                                    config.environmentd_internal_http_port.into(),
562                                )),
563                                protocol: Some("TCP".to_string()),
564                                ..Default::default()
565                            },
566                        ]),
567                        ..Default::default()
568                    }]),
569                    pod_selector: Some(environmentd_label_selector),
570                    policy_types: Some(vec!["Ingress".to_owned()]),
571                    ..Default::default()
572                }),
573            },
574        ]);
575    }
576    if config.network_policies.ingress_enabled {
577        let mut ingress_label_selector = mz.default_labels();
578        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
579        network_policies.extend([NetworkPolicy {
580            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
581            spec: Some(NetworkPolicySpec {
582                ingress: Some(vec![NetworkPolicyIngressRule {
583                    from: Some(
584                        config
585                            .network_policies
586                            .ingress_cidrs
587                            .iter()
588                            .map(|cidr| NetworkPolicyPeer {
589                                ip_block: Some(IPBlock {
590                                    cidr: cidr.to_owned(),
591                                    except: None,
592                                }),
593                                ..Default::default()
594                            })
595                            .collect(),
596                    ),
597                    ports: Some(vec![
598                        NetworkPolicyPort {
599                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
600                            protocol: Some("TCP".to_string()),
601                            ..Default::default()
602                        },
603                        NetworkPolicyPort {
604                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
605                            protocol: Some("TCP".to_string()),
606                            ..Default::default()
607                        },
608                    ]),
609                    ..Default::default()
610                }]),
611                pod_selector: Some(LabelSelector {
612                    match_expressions: None,
613                    match_labels: Some(ingress_label_selector),
614                }),
615                policy_types: Some(vec!["Ingress".to_owned()]),
616                ..Default::default()
617            }),
618        }]);
619    }
620    if config.network_policies.egress_enabled {
621        network_policies.extend([NetworkPolicy {
622            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
623            spec: Some(NetworkPolicySpec {
624                egress: Some(vec![NetworkPolicyEgressRule {
625                    to: Some(
626                        config
627                            .network_policies
628                            .egress_cidrs
629                            .iter()
630                            .map(|cidr| NetworkPolicyPeer {
631                                ip_block: Some(IPBlock {
632                                    cidr: cidr.to_owned(),
633                                    except: None,
634                                }),
635                                ..Default::default()
636                            })
637                            .collect(),
638                    ),
639                    ..Default::default()
640                }]),
641                pod_selector: Some(LabelSelector {
642                    match_expressions: None,
643                    match_labels: Some(mz.default_labels()),
644                }),
645                policy_types: Some(vec!["Egress".to_owned()]),
646                ..Default::default()
647            }),
648        }]);
649    }
650    network_policies
651}
652
653fn create_service_account_object(
654    config: &super::MaterializeControllerArgs,
655    mz: &Materialize,
656) -> Option<ServiceAccount> {
657    if mz.create_service_account() {
658        let mut annotations: BTreeMap<String, String> = mz
659            .spec
660            .service_account_annotations
661            .clone()
662            .unwrap_or_default();
663        if let (CloudProvider::Aws, Some(role_arn)) = (
664            config.cloud_provider,
665            mz.spec
666                .environmentd_iam_role_arn
667                .as_deref()
668                .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
669        ) {
670            warn!(
671                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
672            );
673            annotations.insert(
674                "eks.amazonaws.com/role-arn".to_string(),
675                role_arn.to_string(),
676            );
677        };
678
679        let mut labels = mz.default_labels();
680        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
681
682        Some(ServiceAccount {
683            metadata: ObjectMeta {
684                annotations: Some(annotations),
685                labels: Some(labels),
686                ..mz.managed_resource_meta(mz.service_account_name())
687            },
688            ..Default::default()
689        })
690    } else {
691        None
692    }
693}
694
695fn create_role_object(mz: &Materialize) -> Role {
696    Role {
697        metadata: mz.managed_resource_meta(mz.role_name()),
698        rules: Some(vec![
699            PolicyRule {
700                api_groups: Some(vec!["apps".to_string()]),
701                resources: Some(vec!["statefulsets".to_string()]),
702                verbs: vec![
703                    "get".to_string(),
704                    "list".to_string(),
705                    "watch".to_string(),
706                    "create".to_string(),
707                    "update".to_string(),
708                    "patch".to_string(),
709                    "delete".to_string(),
710                ],
711                ..Default::default()
712            },
713            PolicyRule {
714                api_groups: Some(vec!["".to_string()]),
715                resources: Some(vec![
716                    "persistentvolumeclaims".to_string(),
717                    "pods".to_string(),
718                    "secrets".to_string(),
719                    "services".to_string(),
720                ]),
721                verbs: vec![
722                    "get".to_string(),
723                    "list".to_string(),
724                    "watch".to_string(),
725                    "create".to_string(),
726                    "update".to_string(),
727                    "patch".to_string(),
728                    "delete".to_string(),
729                ],
730                ..Default::default()
731            },
732            PolicyRule {
733                api_groups: Some(vec!["".to_string()]),
734                resources: Some(vec!["configmaps".to_string()]),
735                verbs: vec!["get".to_string()],
736                ..Default::default()
737            },
738            PolicyRule {
739                api_groups: Some(vec!["materialize.cloud".to_string()]),
740                resources: Some(vec!["vpcendpoints".to_string()]),
741                verbs: vec![
742                    "get".to_string(),
743                    "list".to_string(),
744                    "watch".to_string(),
745                    "create".to_string(),
746                    "update".to_string(),
747                    "patch".to_string(),
748                    "delete".to_string(),
749                ],
750                ..Default::default()
751            },
752            PolicyRule {
753                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
754                resources: Some(vec!["pods".to_string()]),
755                verbs: vec!["get".to_string(), "list".to_string()],
756                ..Default::default()
757            },
758            PolicyRule {
759                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
760                resources: Some(vec![
761                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
762                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
763                ]),
764                verbs: vec!["get".to_string()],
765                ..Default::default()
766            },
767        ]),
768    }
769}
770
771fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
772    RoleBinding {
773        metadata: mz.managed_resource_meta(mz.role_binding_name()),
774        role_ref: RoleRef {
775            api_group: "".to_string(),
776            kind: "Role".to_string(),
777            name: mz.role_name(),
778        },
779        subjects: Some(vec![Subject {
780            api_group: Some("".to_string()),
781            kind: "ServiceAccount".to_string(),
782            name: mz.service_account_name(),
783            namespace: Some(mz.namespace()),
784        }]),
785    }
786}
787
788fn create_public_service_object(
789    config: &super::MaterializeControllerArgs,
790    mz: &Materialize,
791    generation: u64,
792) -> Service {
793    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
794}
795
796fn create_generation_service_object(
797    config: &super::MaterializeControllerArgs,
798    mz: &Materialize,
799    generation: u64,
800) -> Service {
801    create_base_service_object(
802        config,
803        mz,
804        generation,
805        &mz.environmentd_generation_service_name(generation),
806    )
807}
808
809fn create_base_service_object(
810    config: &super::MaterializeControllerArgs,
811    mz: &Materialize,
812    generation: u64,
813    service_name: &str,
814) -> Service {
815    let ports = vec![
816        ServicePort {
817            port: config.environmentd_sql_port.into(),
818            protocol: Some("TCP".to_string()),
819            name: Some("sql".to_string()),
820            ..Default::default()
821        },
822        ServicePort {
823            port: config.environmentd_http_port.into(),
824            protocol: Some("TCP".to_string()),
825            name: Some("https".to_string()),
826            ..Default::default()
827        },
828        ServicePort {
829            port: config.environmentd_internal_sql_port.into(),
830            protocol: Some("TCP".to_string()),
831            name: Some("internal-sql".to_string()),
832            ..Default::default()
833        },
834        ServicePort {
835            port: config.environmentd_internal_http_port.into(),
836            protocol: Some("TCP".to_string()),
837            name: Some("internal-http".to_string()),
838            ..Default::default()
839        },
840    ];
841
842    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
843
844    let spec = ServiceSpec {
845        type_: Some("ClusterIP".to_string()),
846        cluster_ip: Some("None".to_string()),
847        selector: Some(selector),
848        ports: Some(ports),
849        ..Default::default()
850    };
851
852    Service {
853        metadata: mz.managed_resource_meta(service_name.to_string()),
854        spec: Some(spec),
855        status: None,
856    }
857}
858
859fn create_persist_pubsub_service(
860    config: &super::MaterializeControllerArgs,
861    mz: &Materialize,
862    generation: u64,
863) -> Service {
864    Service {
865        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
866        spec: Some(ServiceSpec {
867            type_: Some("ClusterIP".to_string()),
868            cluster_ip: Some("None".to_string()),
869            selector: Some(btreemap! {
870                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
871            }),
872            ports: Some(vec![ServicePort {
873                name: Some("grpc".to_string()),
874                protocol: Some("TCP".to_string()),
875                port: config.environmentd_internal_persist_pubsub_port.into(),
876                ..Default::default()
877            }]),
878            ..Default::default()
879        }),
880        status: None,
881    }
882}
883
884fn create_environmentd_certificate(
885    config: &super::MaterializeControllerArgs,
886    mz: &Materialize,
887) -> Option<Certificate> {
888    create_certificate(
889        config.default_certificate_specs.internal.clone(),
890        mz,
891        mz.spec.internal_certificate_spec.clone(),
892        mz.environmentd_certificate_name(),
893        mz.environmentd_certificate_secret_name(),
894        Some(vec![
895            mz.environmentd_service_name(),
896            mz.environmentd_service_internal_fqdn(),
897        ]),
898        CertificatePrivateKeyAlgorithm::Ed25519,
899        None,
900    )
901}
902
903fn create_environmentd_statefulset_object(
904    config: &super::MaterializeControllerArgs,
905    tracing: &TracingCliArgs,
906    mz: &Materialize,
907    generation: u64,
908) -> StatefulSet {
909    // IMPORTANT: Only pass secrets via environment variables. All other
910    // parameters should be passed as command line arguments, possibly gated
911    // with a `meets_minimum_version` call. This ensures typos cause
912    // `environmentd` to fail to start, rather than silently ignoring the
913    // configuration.
914    //
915    // When passing a secret, use a `SecretKeySelector` to forward a secret into
916    // the pod. Do *not* hardcode a secret as the value directly, as doing so
917    // will leak the secret to anyone with permission to describe the pod.
918    let mut env = vec![
919        EnvVar {
920            name: "MZ_METADATA_BACKEND_URL".to_string(),
921            value_from: Some(EnvVarSource {
922                secret_key_ref: Some(SecretKeySelector {
923                    name: mz.backend_secret_name(),
924                    key: "metadata_backend_url".to_string(),
925                    optional: Some(false),
926                }),
927                ..Default::default()
928            }),
929            ..Default::default()
930        },
931        EnvVar {
932            name: "MZ_PERSIST_BLOB_URL".to_string(),
933            value_from: Some(EnvVarSource {
934                secret_key_ref: Some(SecretKeySelector {
935                    name: mz.backend_secret_name(),
936                    key: "persist_backend_url".to_string(),
937                    optional: Some(false),
938                }),
939                ..Default::default()
940            }),
941            ..Default::default()
942        },
943    ];
944
945    env.push(EnvVar {
946        name: "AWS_REGION".to_string(),
947        value: Some(config.region.clone()),
948        ..Default::default()
949    });
950
951    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
952
953    let mut args = vec![];
954
955    if let Some(helm_chart_version) = &config.helm_chart_version {
956        args.push(format!("--helm-chart-version={helm_chart_version}"));
957    }
958
959    // Add environment ID argument.
960    args.push(format!(
961        "--environment-id={}",
962        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
963    ));
964
965    // Add clusterd image argument based on environmentd tag.
966    args.push(format!(
967        "--clusterd-image={}",
968        matching_image_from_environmentd_image_ref(
969            &mz.spec.environmentd_image_ref,
970            "clusterd",
971            None
972        )
973    ));
974
975    // Add cluster and storage host size arguments.
976    args.extend(
977        [
978            config
979                .environmentd_cluster_replica_sizes
980                .as_ref()
981                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
982            config
983                .bootstrap_default_cluster_replica_size
984                .as_ref()
985                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
986            config
987                .bootstrap_builtin_system_cluster_replica_size
988                .as_ref()
989                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
990            config
991                .bootstrap_builtin_probe_cluster_replica_size
992                .as_ref()
993                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
994            config
995                .bootstrap_builtin_support_cluster_replica_size
996                .as_ref()
997                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
998            config
999                .bootstrap_builtin_catalog_server_cluster_replica_size
1000                .as_ref()
1001                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1002            config
1003                .bootstrap_builtin_analytics_cluster_replica_size
1004                .as_ref()
1005                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1006            config
1007                .bootstrap_builtin_system_cluster_replication_factor
1008                .as_ref()
1009                .map(|replication_factor| {
1010                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1011                }),
1012            config
1013                .bootstrap_builtin_probe_cluster_replication_factor
1014                .as_ref()
1015                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1016            config
1017                .bootstrap_builtin_support_cluster_replication_factor
1018                .as_ref()
1019                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1020            config
1021                .bootstrap_builtin_analytics_cluster_replication_factor
1022                .as_ref()
1023                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1024        ]
1025        .into_iter()
1026        .flatten(),
1027    );
1028
1029    args.extend(
1030        config
1031            .environmentd_allowed_origins
1032            .iter()
1033            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1034    );
1035
1036    args.push(format!(
1037        "--secrets-controller={}",
1038        config.secrets_controller
1039    ));
1040
1041    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1042        if let Ok(cluster_replica_sizes) =
1043            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1044        {
1045            let cluster_replica_sizes: Vec<_> =
1046                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1047            args.push(format!(
1048                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1049                cluster_replica_sizes.join("', '")
1050            ));
1051        }
1052    }
1053    if !config.cloud_provider.is_cloud() {
1054        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1055    }
1056
1057    if config.enable_internal_statement_logging {
1058        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1059    }
1060
1061    if config.disable_statement_logging {
1062        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1063    }
1064
1065    if !mz.spec.enable_rbac {
1066        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1067    }
1068
1069    // Add persist arguments.
1070
1071    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1072    args.push("--persist-isolated-runtime-threads=-1".to_string());
1073
1074    // Add AWS arguments.
1075    if config.cloud_provider == CloudProvider::Aws {
1076        if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1077            for az in azs {
1078                args.push(format!("--availability-zone={az}"));
1079            }
1080        }
1081
1082        if let Some(environmentd_connection_role_arn) = mz
1083            .spec
1084            .environmentd_connection_role_arn
1085            .as_deref()
1086            .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1087        {
1088            args.push(format!(
1089                "--aws-connection-role-arn={}",
1090                environmentd_connection_role_arn
1091            ));
1092        }
1093        if let Some(account_id) = &config.aws_info.aws_account_id {
1094            args.push(format!("--aws-account-id={account_id}"));
1095        }
1096
1097        args.extend([format!(
1098            "--aws-secrets-controller-tags=Environment={}",
1099            mz.name_unchecked()
1100        )]);
1101        args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1102    }
1103
1104    // Add Kubernetes arguments.
1105    args.extend([
1106        "--orchestrator=kubernetes".into(),
1107        format!(
1108            "--orchestrator-kubernetes-service-account={}",
1109            &mz.service_account_name()
1110        ),
1111        format!(
1112            "--orchestrator-kubernetes-image-pull-policy={}",
1113            config.image_pull_policy.as_kebab_case_str(),
1114        ),
1115    ]);
1116    for selector in &config.clusterd_node_selector {
1117        args.push(format!(
1118            "--orchestrator-kubernetes-service-node-selector={}={}",
1119            selector.key, selector.value,
1120        ));
1121    }
1122    if mz.meets_minimum_version(&V144) {
1123        if let Some(affinity) = &config.clusterd_affinity {
1124            let affinity = serde_json::to_string(affinity).unwrap();
1125            args.push(format!(
1126                "--orchestrator-kubernetes-service-affinity={affinity}"
1127            ))
1128        }
1129        if let Some(tolerations) = &config.clusterd_tolerations {
1130            let tolerations = serde_json::to_string(tolerations).unwrap();
1131            args.push(format!(
1132                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1133            ))
1134        }
1135    }
1136    if let Some(scheduler_name) = &config.scheduler_name {
1137        args.push(format!(
1138            "--orchestrator-kubernetes-scheduler-name={}",
1139            scheduler_name
1140        ));
1141    }
1142    if mz.meets_minimum_version(&V154_DEV0) {
1143        args.extend(
1144            mz.spec
1145                .pod_annotations
1146                .as_ref()
1147                .map(|annotations| annotations.iter())
1148                .unwrap_or_default()
1149                .map(|(key, val)| {
1150                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1151                }),
1152        );
1153    }
1154    args.extend(
1155        mz.default_labels()
1156            .iter()
1157            .chain(
1158                mz.spec
1159                    .pod_labels
1160                    .as_ref()
1161                    .map(|labels| labels.iter())
1162                    .unwrap_or_default(),
1163            )
1164            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1165    );
1166    if let Some(status) = &mz.status {
1167        args.push(format!(
1168            "--orchestrator-kubernetes-name-prefix=mz{}-",
1169            status.resource_id
1170        ));
1171    }
1172
1173    // Add logging and tracing arguments.
1174    args.extend(["--log-format=json".into()]);
1175    if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1176        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1177    }
1178    // --opentelemetry-resource also configures sentry tags
1179    args.extend([
1180        format!(
1181            "--opentelemetry-resource=organization_id={}",
1182            mz.spec.environment_id
1183        ),
1184        format!(
1185            "--opentelemetry-resource=environment_name={}",
1186            mz.name_unchecked()
1187        ),
1188    ]);
1189
1190    if let Some(segment_api_key) = &config.segment_api_key {
1191        args.push(format!("--segment-api-key={}", segment_api_key));
1192        if config.segment_client_side {
1193            args.push("--segment-client-side".into());
1194        }
1195    }
1196
1197    let mut volumes = Vec::new();
1198    let mut volume_mounts = Vec::new();
1199    if issuer_ref_defined(
1200        &config.default_certificate_specs.internal,
1201        &mz.spec.internal_certificate_spec,
1202    ) {
1203        volumes.push(Volume {
1204            name: "certificate".to_owned(),
1205            secret: Some(SecretVolumeSource {
1206                default_mode: Some(0o400),
1207                secret_name: Some(mz.environmentd_certificate_secret_name()),
1208                items: None,
1209                optional: Some(false),
1210            }),
1211            ..Default::default()
1212        });
1213        volume_mounts.push(VolumeMount {
1214            name: "certificate".to_owned(),
1215            mount_path: "/etc/materialized".to_owned(),
1216            read_only: Some(true),
1217            ..Default::default()
1218        });
1219        args.extend([
1220            "--tls-mode=require".into(),
1221            "--tls-cert=/etc/materialized/tls.crt".into(),
1222            "--tls-key=/etc/materialized/tls.key".into(),
1223        ]);
1224    } else {
1225        args.push("--tls-mode=disable".to_string());
1226    }
1227    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1228        args.push(format!(
1229            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1230            ephemeral_volume_class
1231        ));
1232    }
1233    // The `materialize` user used by clusterd always has gid 999.
1234    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1235
1236    // Add Sentry arguments.
1237    if let Some(sentry_dsn) = &tracing.sentry_dsn {
1238        args.push(format!("--sentry-dsn={}", sentry_dsn));
1239        if let Some(sentry_environment) = &tracing.sentry_environment {
1240            args.push(format!("--sentry-environment={}", sentry_environment));
1241        }
1242        args.push(format!("--sentry-tag=region={}", config.region));
1243    }
1244
1245    // Add Persist PubSub arguments
1246    args.push(format!(
1247        "--persist-pubsub-url=http://{}:{}",
1248        mz.persist_pubsub_service_name(generation),
1249        config.environmentd_internal_persist_pubsub_port,
1250    ));
1251    args.push(format!(
1252        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1253        config.environmentd_internal_persist_pubsub_port
1254    ));
1255
1256    args.push(format!("--deploy-generation={}", generation));
1257
1258    // Add URL for internal user impersonation endpoint
1259    args.push(format!(
1260        "--internal-console-redirect-url={}",
1261        &config.internal_console_proxy_url,
1262    ));
1263
1264    if !config.collect_pod_metrics {
1265        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1266    }
1267    if config.enable_prometheus_scrape_annotations {
1268        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1269    }
1270
1271    if config.disable_license_key_checks {
1272        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1273            args.push("--disable-license-key-checks".into());
1274        }
1275    } else if mz.meets_minimum_version(&V140_DEV0) {
1276        volume_mounts.push(VolumeMount {
1277            name: "license-key".to_string(),
1278            mount_path: "/license_key".to_string(),
1279            ..Default::default()
1280        });
1281        volumes.push(Volume {
1282            name: "license-key".to_string(),
1283            secret: Some(SecretVolumeSource {
1284                default_mode: Some(256),
1285                optional: Some(false),
1286                secret_name: Some(mz.backend_secret_name()),
1287                items: Some(vec![KeyToPath {
1288                    key: "license_key".to_string(),
1289                    path: "license_key".to_string(),
1290                    ..Default::default()
1291                }]),
1292                ..Default::default()
1293            }),
1294            ..Default::default()
1295        });
1296        env.push(EnvVar {
1297            name: "MZ_LICENSE_KEY".to_string(),
1298            value: Some("/license_key/license_key".to_string()),
1299            ..Default::default()
1300        });
1301    }
1302
1303    // Add user-specified extra arguments.
1304    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1305        args.extend(extra_args.iter().cloned());
1306    }
1307
1308    let probe = Probe {
1309        initial_delay_seconds: Some(1),
1310        failure_threshold: Some(12),
1311        tcp_socket: Some(TCPSocketAction {
1312            host: None,
1313            port: IntOrString::Int(config.environmentd_sql_port.into()),
1314        }),
1315        ..Default::default()
1316    };
1317
1318    let security_context = if config.enable_security_context {
1319        // Since we want to adhere to the most restrictive security context, all
1320        // of these fields have to be set how they are.
1321        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1322        Some(SecurityContext {
1323            run_as_non_root: Some(true),
1324            capabilities: Some(Capabilities {
1325                drop: Some(vec!["ALL".to_string()]),
1326                ..Default::default()
1327            }),
1328            seccomp_profile: Some(SeccompProfile {
1329                type_: "RuntimeDefault".to_string(),
1330                ..Default::default()
1331            }),
1332            allow_privilege_escalation: Some(false),
1333            ..Default::default()
1334        })
1335    } else {
1336        None
1337    };
1338
1339    let ports = vec![
1340        ContainerPort {
1341            container_port: config.environmentd_sql_port.into(),
1342            name: Some("sql".to_owned()),
1343            ..Default::default()
1344        },
1345        ContainerPort {
1346            container_port: config.environmentd_internal_sql_port.into(),
1347            name: Some("internal-sql".to_owned()),
1348            ..Default::default()
1349        },
1350        ContainerPort {
1351            container_port: config.environmentd_http_port.into(),
1352            name: Some("http".to_owned()),
1353            ..Default::default()
1354        },
1355        ContainerPort {
1356            container_port: config.environmentd_internal_http_port.into(),
1357            name: Some("internal-http".to_owned()),
1358            ..Default::default()
1359        },
1360        ContainerPort {
1361            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1362            name: Some("persist-pubsub".to_owned()),
1363            ..Default::default()
1364        },
1365    ];
1366
1367    // Add networking arguments.
1368    if mz.meets_minimum_version(&V147_DEV0) {
1369        volume_mounts.push(VolumeMount {
1370            name: "listeners-configmap".to_string(),
1371            mount_path: "/listeners".to_string(),
1372            ..Default::default()
1373        });
1374        volumes.push(Volume {
1375            name: "listeners-configmap".to_string(),
1376            config_map: Some(ConfigMapVolumeSource {
1377                name: mz.listeners_configmap_name(generation),
1378                default_mode: Some(256),
1379                optional: Some(false),
1380                items: Some(vec![KeyToPath {
1381                    key: "listeners.json".to_string(),
1382                    path: "listeners.json".to_string(),
1383                    ..Default::default()
1384                }]),
1385            }),
1386            ..Default::default()
1387        });
1388        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1389        if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1390            args.push("--system-parameter-default=enable_password_auth=true".into());
1391            env.push(EnvVar {
1392                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1393                value_from: Some(EnvVarSource {
1394                    secret_key_ref: Some(SecretKeySelector {
1395                        name: mz.backend_secret_name(),
1396                        key: "external_login_password_mz_system".to_string(),
1397                        optional: Some(false),
1398                    }),
1399                    ..Default::default()
1400                }),
1401                ..Default::default()
1402            })
1403        }
1404    } else {
1405        args.extend([
1406            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1407            format!(
1408                "--http-listen-addr=0.0.0.0:{}",
1409                config.environmentd_http_port
1410            ),
1411            format!(
1412                "--internal-sql-listen-addr=0.0.0.0:{}",
1413                config.environmentd_internal_sql_port
1414            ),
1415            format!(
1416                "--internal-http-listen-addr=0.0.0.0:{}",
1417                config.environmentd_internal_http_port
1418            ),
1419        ]);
1420    }
1421
1422    let container = Container {
1423        name: "environmentd".to_owned(),
1424        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1425        image_pull_policy: Some(config.image_pull_policy.to_string()),
1426        ports: Some(ports),
1427        args: Some(args),
1428        env: Some(env),
1429        volume_mounts: Some(volume_mounts),
1430        liveness_probe: Some(probe.clone()),
1431        readiness_probe: Some(probe),
1432        resources: mz
1433            .spec
1434            .environmentd_resource_requirements
1435            .clone()
1436            .or_else(|| config.environmentd_default_resources.clone()),
1437        security_context: security_context.clone(),
1438        ..Default::default()
1439    };
1440
1441    let mut pod_template_labels = mz.default_labels();
1442    pod_template_labels.insert(
1443        "materialize.cloud/name".to_owned(),
1444        mz.environmentd_statefulset_name(generation),
1445    );
1446    pod_template_labels.insert(
1447        "materialize.cloud/app".to_owned(),
1448        mz.environmentd_app_name(),
1449    );
1450    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1451    pod_template_labels.extend(
1452        mz.spec
1453            .pod_labels
1454            .as_ref()
1455            .map(|labels| labels.iter())
1456            .unwrap_or_default()
1457            .map(|(key, value)| (key.clone(), value.clone())),
1458    );
1459
1460    let mut pod_template_annotations = btreemap! {
1461        // We can re-enable eviction once we have HA
1462        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1463
1464        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1465        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1466        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1467        "materialize.cloud/generation".to_owned() => generation.to_string(),
1468    };
1469    if config.enable_prometheus_scrape_annotations {
1470        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1471        pod_template_annotations.insert(
1472            "prometheus.io/port".to_owned(),
1473            config.environmentd_internal_http_port.to_string(),
1474        );
1475        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1476        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1477        pod_template_annotations.insert(
1478            "materialize.prometheus.io/mz_usage_path".to_owned(),
1479            "/metrics/mz_usage".to_string(),
1480        );
1481        pod_template_annotations.insert(
1482            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1483            "/metrics/mz_frontier".to_string(),
1484        );
1485        pod_template_annotations.insert(
1486            "materialize.prometheus.io/mz_compute_path".to_owned(),
1487            "/metrics/mz_compute".to_string(),
1488        );
1489        pod_template_annotations.insert(
1490            "materialize.prometheus.io/mz_storage_path".to_owned(),
1491            "/metrics/mz_storage".to_string(),
1492        );
1493    }
1494    pod_template_annotations.extend(
1495        mz.spec
1496            .pod_annotations
1497            .as_ref()
1498            .map(|annotations| annotations.iter())
1499            .unwrap_or_default()
1500            .map(|(key, value)| (key.clone(), value.clone())),
1501    );
1502
1503    let mut tolerations = vec![
1504        // When the node becomes `NotReady` it indicates there is a problem with the node,
1505        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1506        // But we want to limit this to 30s for faster recovery
1507        Toleration {
1508            effect: Some("NoExecute".into()),
1509            key: Some("node.kubernetes.io/not-ready".into()),
1510            operator: Some("Exists".into()),
1511            toleration_seconds: Some(30),
1512            value: None,
1513        },
1514        Toleration {
1515            effect: Some("NoExecute".into()),
1516            key: Some("node.kubernetes.io/unreachable".into()),
1517            operator: Some("Exists".into()),
1518            toleration_seconds: Some(30),
1519            value: None,
1520        },
1521    ];
1522    if let Some(user_tolerations) = &config.environmentd_tolerations {
1523        tolerations.extend(user_tolerations.iter().cloned());
1524    }
1525    let tolerations = Some(tolerations);
1526
1527    let pod_template_spec = PodTemplateSpec {
1528        // not using managed_resource_meta because the pod should be owned
1529        // by the statefulset, not the materialize instance
1530        metadata: Some(ObjectMeta {
1531            labels: Some(pod_template_labels),
1532            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1533            ..Default::default()
1534        }),
1535        spec: Some(PodSpec {
1536            containers: vec![container],
1537            node_selector: Some(
1538                config
1539                    .environmentd_node_selector
1540                    .iter()
1541                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1542                    .collect(),
1543            ),
1544            affinity: config.environmentd_affinity.clone(),
1545            scheduler_name: config.scheduler_name.clone(),
1546            service_account_name: Some(mz.service_account_name()),
1547            volumes: Some(volumes),
1548            security_context: Some(PodSecurityContext {
1549                fs_group: Some(999),
1550                run_as_user: Some(999),
1551                run_as_group: Some(999),
1552                ..Default::default()
1553            }),
1554            tolerations,
1555            // This (apparently) has the side effect of automatically starting a new pod
1556            // when the previous pod is currently terminating. This side steps the statefulset fencing
1557            // but allows for quicker recovery from a failed node
1558            // The Kubernetes documentation strongly advises against this
1559            // setting, as StatefulSets attempt to provide "at most once"
1560            // semantics [0]-- that is, the guarantee that for a given pod in a
1561            // StatefulSet there is *at most* one pod with that identity running
1562            // in the cluster
1563            //
1564            // Materialize, however, has been carefully designed to *not* rely
1565            // on this guarantee. (In fact, we do not believe that correct
1566            // distributed systems can meaningfully rely on Kubernetes's
1567            // guarantee--network packets from a pod can be arbitrarily delayed,
1568            // long past that pod's termination.) Running two `environmentd`
1569            // processes is safe: the newer process will safely and correctly
1570            // fence out the older process via CockroachDB. In the future,
1571            // we'll support running multiple `environmentd` processes in
1572            // parallel for high availability.
1573            //
1574            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1575            termination_grace_period_seconds: Some(0),
1576            ..Default::default()
1577        }),
1578    };
1579
1580    let mut match_labels = BTreeMap::new();
1581    match_labels.insert(
1582        "materialize.cloud/name".to_owned(),
1583        mz.environmentd_statefulset_name(generation),
1584    );
1585
1586    let statefulset_spec = StatefulSetSpec {
1587        replicas: Some(1),
1588        template: pod_template_spec,
1589        update_strategy: Some(StatefulSetUpdateStrategy {
1590            rolling_update: None,
1591            type_: Some("OnDelete".to_owned()),
1592        }),
1593        service_name: Some(mz.environmentd_service_name()),
1594        selector: LabelSelector {
1595            match_expressions: None,
1596            match_labels: Some(match_labels),
1597        },
1598        ..Default::default()
1599    };
1600
1601    StatefulSet {
1602        metadata: ObjectMeta {
1603            annotations: Some(btreemap! {
1604                "materialize.cloud/generation".to_owned() => generation.to_string(),
1605                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1606            }),
1607            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1608        },
1609        spec: Some(statefulset_spec),
1610        status: None,
1611    }
1612}
1613
1614fn create_connection_info(
1615    config: &super::MaterializeControllerArgs,
1616    mz: &Materialize,
1617    generation: u64,
1618) -> ConnectionInfo {
1619    let external_enable_tls = issuer_ref_defined(
1620        &config.default_certificate_specs.internal,
1621        &mz.spec.internal_certificate_spec,
1622    );
1623    let authenticator_kind = mz.spec.authenticator_kind;
1624    let mut listeners_config = ListenersConfig {
1625        sql: btreemap! {
1626            "external".to_owned() => SqlListenerConfig{
1627                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1628                authenticator_kind,
1629                allowed_roles: AllowedRoles::Normal,
1630                enable_tls: external_enable_tls,
1631            },
1632            "internal".to_owned() => SqlListenerConfig{
1633                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1634                authenticator_kind: AuthenticatorKind::None,
1635                // Should this just be Internal?
1636                allowed_roles: AllowedRoles::NormalAndInternal,
1637                enable_tls: false,
1638            },
1639        },
1640        http: btreemap! {
1641            "external".to_owned() => HttpListenerConfig{
1642                base: BaseListenerConfig {
1643                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1644                    authenticator_kind,
1645                    allowed_roles: AllowedRoles::Normal,
1646                    enable_tls: external_enable_tls,
1647                },
1648                routes: HttpRoutesEnabled{
1649                    base: true,
1650                    webhook: true,
1651                    internal: false,
1652                    metrics: false,
1653                    profiling: false,
1654                }
1655            },
1656            "internal".to_owned() => HttpListenerConfig{
1657                base: BaseListenerConfig {
1658                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1659                    authenticator_kind: AuthenticatorKind::None,
1660                    // Should this just be Internal?
1661                    allowed_roles: AllowedRoles::NormalAndInternal,
1662                    enable_tls: false,
1663                },
1664                routes: HttpRoutesEnabled{
1665                    base: true,
1666                    webhook: true,
1667                    internal: true,
1668                    metrics: true,
1669                    profiling: true,
1670                }
1671            },
1672        },
1673    };
1674    if authenticator_kind == AuthenticatorKind::Password {
1675        listeners_config.sql.remove("internal");
1676        listeners_config.http.remove("internal");
1677
1678        listeners_config.sql.get_mut("external").map(|listener| {
1679            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1680            listener
1681        });
1682        listeners_config.http.get_mut("external").map(|listener| {
1683            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1684            listener.routes.internal = true;
1685            listener.routes.profiling = true;
1686            listener
1687        });
1688
1689        listeners_config.http.insert(
1690            "metrics".to_owned(),
1691            HttpListenerConfig {
1692                base: BaseListenerConfig {
1693                    addr: SocketAddr::new(
1694                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1695                        config.environmentd_internal_http_port,
1696                    ),
1697                    authenticator_kind: AuthenticatorKind::None,
1698                    allowed_roles: AllowedRoles::NormalAndInternal,
1699                    enable_tls: false,
1700                },
1701                routes: HttpRoutesEnabled {
1702                    base: false,
1703                    webhook: false,
1704                    internal: false,
1705                    metrics: true,
1706                    profiling: false,
1707                },
1708            },
1709        );
1710    }
1711
1712    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1713    let listeners_configmap = ConfigMap {
1714        binary_data: None,
1715        data: Some(btreemap! {
1716            "listeners.json".to_owned() => listeners_json,
1717        }),
1718        immutable: None,
1719        metadata: ObjectMeta {
1720            annotations: Some(btreemap! {
1721                "materialize.cloud/generation".to_owned() => generation.to_string(),
1722            }),
1723            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1724        },
1725    };
1726
1727    let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1728        AuthenticatorKind::Password => {
1729            let scheme = if external_enable_tls { "https" } else { "http" };
1730            (
1731                scheme,
1732                config.environmentd_http_port,
1733                Some(mz.spec.backend_secret_name.clone()),
1734            )
1735        }
1736        _ => ("http", config.environmentd_internal_http_port, None),
1737    };
1738    let environmentd_url = format!(
1739        "{}://{}.{}.svc.cluster.local:{}",
1740        scheme,
1741        mz.environmentd_generation_service_name(generation),
1742        mz.namespace(),
1743        leader_api_port,
1744    );
1745    ConnectionInfo {
1746        environmentd_url,
1747        listeners_configmap,
1748        mz_system_secret_name,
1749    }
1750}
1751
1752// see materialize/src/environmentd/src/http.rs
1753#[derive(Debug, Deserialize, PartialEq, Eq)]
1754struct BecomeLeaderResponse {
1755    result: BecomeLeaderResult,
1756}
1757
1758#[derive(Debug, Deserialize, PartialEq, Eq)]
1759enum BecomeLeaderResult {
1760    Success,
1761    Failure { message: String },
1762}
1763
1764#[derive(Debug, Deserialize, PartialEq, Eq)]
1765struct SkipCatchupError {
1766    message: String,
1767}
1768
1769fn statefulset_pod_name(statefulset: &StatefulSet, idx: u64) -> String {
1770    format!("{}-{}", statefulset.name_unchecked(), idx)
1771}