mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
25            VolumeMount,
26        },
27        networking::v1::{
28            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
29            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
30        },
31        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
32    },
33    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
34};
35use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
36use maplit::btreemap;
37use mz_server_core::listeners::{
38    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
39    ListenersConfig, SqlListenerConfig,
40};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::k8s::{apply_resource, delete_resource, get_resource};
50use crate::tls::{create_certificate, issuer_ref_defined};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
53use mz_cloud_resources::crd::{
54    ManagedResource,
55    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
56};
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 140,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69    major: 0,
70    minor: 147,
71    patch: 0,
72    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 0,
78    minor: 154,
79    patch: 0,
80    pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
86    major: 26,
87    minor: 1,
88    patch: 0,
89    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
90    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
91});
92
93/// Describes the status of a deployment.
94///
95/// This is a simplified representation of `DeploymentState`, suitable for
96/// announcement to the external orchestrator.
97#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
98pub enum DeploymentStatus {
99    /// This deployment is not the leader. It is initializing and is not yet
100    /// ready to become the leader.
101    Initializing,
102    /// This deployment is not the leader, but it is ready to become the leader.
103    ReadyToPromote,
104    /// This deployment is in the process of becoming the leader.
105    Promoting,
106    /// This deployment is the leader.
107    IsLeader,
108}
109
110#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct GetLeaderStatusResponse {
112    status: DeploymentStatus,
113}
114
115#[derive(Deserialize, Serialize)]
116pub struct LoginCredentials {
117    username: String,
118    // TODO Password type?
119    password: String,
120}
121
122#[derive(Debug, Serialize)]
123pub struct ConnectionInfo {
124    pub environmentd_url: String,
125    pub mz_system_secret_name: Option<String>,
126    pub listeners_configmap: ConfigMap,
127}
128
129#[derive(Debug, Serialize)]
130pub struct Resources {
131    pub generation: u64,
132    pub environmentd_network_policies: Vec<NetworkPolicy>,
133    pub service_account: Box<Option<ServiceAccount>>,
134    pub role: Box<Role>,
135    pub role_binding: Box<RoleBinding>,
136    pub public_service: Box<Service>,
137    pub generation_service: Box<Service>,
138    pub persist_pubsub_service: Box<Service>,
139    pub environmentd_certificate: Box<Option<Certificate>>,
140    pub environmentd_statefulset: Box<StatefulSet>,
141    pub connection_info: Box<ConnectionInfo>,
142}
143
144impl Resources {
145    pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
146        let environmentd_network_policies = create_environmentd_network_policies(config, mz);
147
148        let service_account = Box::new(create_service_account_object(config, mz));
149        let role = Box::new(create_role_object(mz));
150        let role_binding = Box::new(create_role_binding_object(mz));
151        let public_service = Box::new(create_public_service_object(config, mz, generation));
152        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
153        let persist_pubsub_service =
154            Box::new(create_persist_pubsub_service(config, mz, generation));
155        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
156        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157            config, mz, generation,
158        ));
159        let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161        Self {
162            generation,
163            environmentd_network_policies,
164            service_account,
165            role,
166            role_binding,
167            public_service,
168            generation_service,
169            persist_pubsub_service,
170            environmentd_certificate,
171            environmentd_statefulset,
172            connection_info,
173        }
174    }
175
176    #[instrument]
177    pub async fn apply(
178        &self,
179        client: &Client,
180        force_promote: bool,
181        namespace: &str,
182    ) -> Result<Option<Action>, Error> {
183        let environmentd_network_policy_api: Api<NetworkPolicy> =
184            Api::namespaced(client.clone(), namespace);
185        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
186        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
187        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
188        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
189        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
190        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
191        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
192
193        for policy in &self.environmentd_network_policies {
194            trace!("applying network policy {}", policy.name_unchecked());
195            apply_resource(&environmentd_network_policy_api, policy).await?;
196        }
197
198        if let Some(service_account) = &*self.service_account {
199            trace!("applying environmentd service account");
200            apply_resource(&service_account_api, service_account).await?;
201        }
202
203        trace!("applying environmentd role");
204        apply_resource(&role_api, &*self.role).await?;
205
206        trace!("applying environmentd role binding");
207        apply_resource(&role_binding_api, &*self.role_binding).await?;
208
209        trace!("applying environmentd per-generation service");
210        apply_resource(&service_api, &*self.generation_service).await?;
211
212        trace!("creating persist pubsub service");
213        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
214
215        if let Some(certificate) = &*self.environmentd_certificate {
216            trace!("creating new environmentd certificate");
217            apply_resource(&certificate_api, certificate).await?;
218        }
219
220        trace!("applying listeners configmap");
221        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
222
223        trace!("creating new environmentd statefulset");
224        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
225
226        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
227
228        let statefulset = get_resource(
229            &statefulset_api,
230            &self.environmentd_statefulset.name_unchecked(),
231        )
232        .await?;
233        if statefulset
234            .and_then(|statefulset| statefulset.status)
235            .and_then(|status| status.ready_replicas)
236            .unwrap_or(0)
237            == 0
238        {
239            trace!("environmentd statefulset is not ready yet...");
240            return Ok(Some(retry_action));
241        }
242
243        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
244            return Ok(Some(retry_action));
245        };
246        let status_url = reqwest::Url::parse(&format!(
247            "{}/api/leader/status",
248            self.connection_info.environmentd_url,
249        ))
250        .unwrap();
251
252        match http_client.get(status_url.clone()).send().await {
253            Ok(response) => {
254                let response: GetLeaderStatusResponse = match response.error_for_status() {
255                    Ok(response) => response.json().await?,
256                    Err(e) => {
257                        trace!("failed to get status of environmentd, retrying... ({e})");
258                        return Ok(Some(retry_action));
259                    }
260                };
261                if force_promote {
262                    trace!("skipping cluster catchup");
263                    let skip_catchup_url = reqwest::Url::parse(&format!(
264                        "{}/api/leader/skip-catchup",
265                        self.connection_info.environmentd_url,
266                    ))
267                    .unwrap();
268                    let response = http_client.post(skip_catchup_url).send().await?;
269                    if response.status() == StatusCode::BAD_REQUEST {
270                        let err: SkipCatchupError = response.json().await?;
271                        return Err(
272                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
273                        );
274                    }
275                } else {
276                    match response.status {
277                        DeploymentStatus::Initializing => {
278                            trace!("environmentd is still initializing, retrying...");
279                            return Ok(Some(retry_action));
280                        }
281                        DeploymentStatus::ReadyToPromote
282                        | DeploymentStatus::Promoting
283                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
284                    }
285                }
286            }
287            Err(e) => {
288                trace!("failed to connect to environmentd, retrying... ({e})");
289                return Ok(Some(retry_action));
290            }
291        }
292
293        Ok(None)
294    }
295
296    #[instrument]
297    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
298        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
299        Some(match &self.connection_info.mz_system_secret_name {
300            Some(mz_system_secret_name) => {
301                let http_client = reqwest::Client::builder()
302                    .timeout(std::time::Duration::from_secs(10))
303                    .cookie_store(true)
304                    // TODO add_root_certificate instead
305                    .danger_accept_invalid_certs(true)
306                    .build()
307                    .unwrap();
308
309                let secret = secret_api
310                    .get(mz_system_secret_name)
311                    .await
312                    .map_err(|e| {
313                        error!("Failed to get backend secret: {:?}", e);
314                        e
315                    })
316                    .ok()?;
317                if let Some(data) = secret.data {
318                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
319                        let password = String::from_utf8_lossy(&password.0).to_string();
320                        let login_url = reqwest::Url::parse(&format!(
321                            "{}/api/login",
322                            self.connection_info.environmentd_url,
323                        ))
324                        .unwrap();
325                        match http_client
326                            .post(login_url)
327                            .body(
328                                serde_json::to_string(&LoginCredentials {
329                                    username: "mz_system".to_owned(),
330                                    password,
331                                })
332                                .expect(
333                                    "Serializing a simple struct with utf8 strings doesn't fail.",
334                                ),
335                            )
336                            .header("Content-Type", "application/json")
337                            .send()
338                            .await
339                        {
340                            Ok(response) => {
341                                if let Err(e) = response.error_for_status() {
342                                    trace!("failed to login to environmentd, retrying... ({e})");
343                                    return None;
344                                }
345                            }
346                            Err(e) => {
347                                trace!("failed to connect to environmentd, retrying... ({e})");
348                                return None;
349                            }
350                        };
351                    }
352                };
353                http_client
354            }
355            None => reqwest::Client::builder()
356                .timeout(std::time::Duration::from_secs(10))
357                .build()
358                .unwrap(),
359        })
360    }
361
362    #[instrument]
363    pub async fn promote_services(
364        &self,
365        client: &Client,
366        namespace: &str,
367    ) -> Result<Option<Action>, Error> {
368        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
369        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
370
371        let promote_url = reqwest::Url::parse(&format!(
372            "{}/api/leader/promote",
373            self.connection_info.environmentd_url,
374        ))
375        .unwrap();
376
377        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
378            return Ok(Some(retry_action));
379        };
380
381        trace!("promoting new environmentd to leader");
382        let response = http_client.post(promote_url).send().await?;
383        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
384        if let BecomeLeaderResult::Failure { message } = response.result {
385            return Err(Error::Anyhow(anyhow::anyhow!(
386                "failed to promote new environmentd: {message}"
387            )));
388        }
389
390        // A successful POST to the promotion endpoint only indicates
391        // that the promotion process was kicked off. It does not
392        // guarantee that the environment will be successfully promoted
393        // (e.g., if the environment crashes immediately after responding
394        // to the request, but before executing the takeover, the
395        // promotion will be lost).
396        //
397        // To guarantee the environment has been promoted successfully,
398        // we must wait to see at least one `IsLeader` status returned
399        // from the environment.
400
401        let status_url = reqwest::Url::parse(&format!(
402            "{}/api/leader/status",
403            self.connection_info.environmentd_url,
404        ))
405        .unwrap();
406        match http_client.get(status_url.clone()).send().await {
407            Ok(response) => {
408                let response: GetLeaderStatusResponse = response.json().await?;
409                if response.status != DeploymentStatus::IsLeader {
410                    trace!(
411                        "environmentd is still promoting (status: {:?}), retrying...",
412                        response.status
413                    );
414                    return Ok(Some(retry_action));
415                } else {
416                    trace!("environmentd is ready");
417                }
418            }
419            Err(e) => {
420                trace!("failed to connect to environmentd, retrying... ({e})");
421                return Ok(Some(retry_action));
422            }
423        }
424
425        trace!("applying environmentd public service");
426        apply_resource(&service_api, &*self.public_service).await?;
427
428        Ok(None)
429    }
430
431    #[instrument]
432    pub async fn teardown_generation(
433        &self,
434        client: &Client,
435        mz: &Materialize,
436        generation: u64,
437    ) -> Result<(), anyhow::Error> {
438        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
439        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
440        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
441
442        trace!("deleting environmentd statefulset for generation {generation}");
443        delete_resource(
444            &statefulset_api,
445            &mz.environmentd_statefulset_name(generation),
446        )
447        .await?;
448
449        trace!("deleting persist pubsub service for generation {generation}");
450        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
451
452        trace!("deleting environmentd per-generation service for generation {generation}");
453        delete_resource(
454            &service_api,
455            &mz.environmentd_generation_service_name(generation),
456        )
457        .await?;
458
459        trace!("deleting listeners configmap for generation {generation}");
460        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
461
462        Ok(())
463    }
464
465    // ideally we would just be able to hash the objects directly, but the
466    // generated kubernetes objects don't implement the Hash trait
467    pub fn generate_hash(&self) -> String {
468        let mut hasher = Sha256::new();
469        hasher.update(&serde_json::to_string(self).unwrap());
470        format!("{:x}", hasher.finalize())
471    }
472}
473
474fn create_environmentd_network_policies(
475    config: &super::Config,
476    mz: &Materialize,
477) -> Vec<NetworkPolicy> {
478    let mut network_policies = Vec::new();
479    if config.network_policies_internal_enabled {
480        let environmentd_label_selector = LabelSelector {
481            match_labels: Some(
482                mz.default_labels()
483                    .into_iter()
484                    .chain([(
485                        "materialize.cloud/app".to_owned(),
486                        mz.environmentd_app_name(),
487                    )])
488                    .collect(),
489            ),
490            ..Default::default()
491        };
492        let orchestratord_label_selector = LabelSelector {
493            match_labels: Some(
494                config
495                    .orchestratord_pod_selector_labels
496                    .iter()
497                    .cloned()
498                    .map(|kv| (kv.key, kv.value))
499                    .collect(),
500            ),
501            ..Default::default()
502        };
503        // TODO (Alex) filter to just clusterd and environmentd,
504        // once we get a consistent set of labels for both.
505        let all_pods_label_selector = LabelSelector {
506            // TODO: can't use default_labels() here because it needs to be
507            // consistent between balancer and materialize resources, and
508            // materialize resources have additional labels - we should
509            // figure out something better here (probably balancers should
510            // install their own network policies)
511            match_labels: Some(
512                [(
513                    "materialize.cloud/mz-resource-id".to_owned(),
514                    mz.resource_id().to_owned(),
515                )]
516                .into(),
517            ),
518            ..Default::default()
519        };
520        network_policies.extend([
521            // Allow all clusterd/environmentd traffic (between pods in the
522            // same environment)
523            NetworkPolicy {
524                metadata: mz
525                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
526                spec: Some(NetworkPolicySpec {
527                    egress: Some(vec![NetworkPolicyEgressRule {
528                        to: Some(vec![NetworkPolicyPeer {
529                            pod_selector: Some(all_pods_label_selector.clone()),
530                            ..Default::default()
531                        }]),
532                        ..Default::default()
533                    }]),
534                    ingress: Some(vec![NetworkPolicyIngressRule {
535                        from: Some(vec![NetworkPolicyPeer {
536                            pod_selector: Some(all_pods_label_selector.clone()),
537                            ..Default::default()
538                        }]),
539                        ..Default::default()
540                    }]),
541                    pod_selector: Some(all_pods_label_selector.clone()),
542                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
543                    ..Default::default()
544                }),
545            },
546            // Allow traffic from orchestratord to environmentd in order to hit
547            // the promotion endpoints during upgrades
548            NetworkPolicy {
549                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
550                spec: Some(NetworkPolicySpec {
551                    ingress: Some(vec![NetworkPolicyIngressRule {
552                        from: Some(vec![NetworkPolicyPeer {
553                            namespace_selector: Some(LabelSelector {
554                                match_labels: Some(btreemap! {
555                                    "kubernetes.io/metadata.name".into()
556                                        => config.orchestratord_namespace.clone(),
557                                }),
558                                ..Default::default()
559                            }),
560                            pod_selector: Some(orchestratord_label_selector),
561                            ..Default::default()
562                        }]),
563                        ports: Some(vec![
564                            NetworkPolicyPort {
565                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
566                                protocol: Some("TCP".to_string()),
567                                ..Default::default()
568                            },
569                            NetworkPolicyPort {
570                                port: Some(IntOrString::Int(
571                                    config.environmentd_internal_http_port.into(),
572                                )),
573                                protocol: Some("TCP".to_string()),
574                                ..Default::default()
575                            },
576                        ]),
577                        ..Default::default()
578                    }]),
579                    pod_selector: Some(environmentd_label_selector),
580                    policy_types: Some(vec!["Ingress".to_owned()]),
581                    ..Default::default()
582                }),
583            },
584        ]);
585    }
586    if config.network_policies_ingress_enabled {
587        let mut ingress_label_selector = mz.default_labels();
588        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
589        network_policies.extend([NetworkPolicy {
590            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
591            spec: Some(NetworkPolicySpec {
592                ingress: Some(vec![NetworkPolicyIngressRule {
593                    from: Some(
594                        config
595                            .network_policies_ingress_cidrs
596                            .iter()
597                            .map(|cidr| NetworkPolicyPeer {
598                                ip_block: Some(IPBlock {
599                                    cidr: cidr.to_owned(),
600                                    except: None,
601                                }),
602                                ..Default::default()
603                            })
604                            .collect(),
605                    ),
606                    ports: Some(vec![
607                        NetworkPolicyPort {
608                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
609                            protocol: Some("TCP".to_string()),
610                            ..Default::default()
611                        },
612                        NetworkPolicyPort {
613                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
614                            protocol: Some("TCP".to_string()),
615                            ..Default::default()
616                        },
617                    ]),
618                    ..Default::default()
619                }]),
620                pod_selector: Some(LabelSelector {
621                    match_expressions: None,
622                    match_labels: Some(ingress_label_selector),
623                }),
624                policy_types: Some(vec!["Ingress".to_owned()]),
625                ..Default::default()
626            }),
627        }]);
628    }
629    if config.network_policies_egress_enabled {
630        network_policies.extend([NetworkPolicy {
631            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
632            spec: Some(NetworkPolicySpec {
633                egress: Some(vec![NetworkPolicyEgressRule {
634                    to: Some(
635                        config
636                            .network_policies_egress_cidrs
637                            .iter()
638                            .map(|cidr| NetworkPolicyPeer {
639                                ip_block: Some(IPBlock {
640                                    cidr: cidr.to_owned(),
641                                    except: None,
642                                }),
643                                ..Default::default()
644                            })
645                            .collect(),
646                    ),
647                    ..Default::default()
648                }]),
649                pod_selector: Some(LabelSelector {
650                    match_expressions: None,
651                    match_labels: Some(mz.default_labels()),
652                }),
653                policy_types: Some(vec!["Egress".to_owned()]),
654                ..Default::default()
655            }),
656        }]);
657    }
658    network_policies
659}
660
661fn create_service_account_object(
662    config: &super::Config,
663    mz: &Materialize,
664) -> Option<ServiceAccount> {
665    if mz.create_service_account() {
666        let mut annotations: BTreeMap<String, String> = mz
667            .spec
668            .service_account_annotations
669            .clone()
670            .unwrap_or_default();
671        if let (CloudProvider::Aws, Some(role_arn)) = (
672            config.cloud_provider,
673            mz.spec
674                .environmentd_iam_role_arn
675                .as_deref()
676                .or(config.environmentd_iam_role_arn.as_deref()),
677        ) {
678            warn!(
679                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
680            );
681            annotations.insert(
682                "eks.amazonaws.com/role-arn".to_string(),
683                role_arn.to_string(),
684            );
685        };
686
687        let mut labels = mz.default_labels();
688        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
689
690        Some(ServiceAccount {
691            metadata: ObjectMeta {
692                annotations: Some(annotations),
693                labels: Some(labels),
694                ..mz.managed_resource_meta(mz.service_account_name())
695            },
696            ..Default::default()
697        })
698    } else {
699        None
700    }
701}
702
703fn create_role_object(mz: &Materialize) -> Role {
704    Role {
705        metadata: mz.managed_resource_meta(mz.role_name()),
706        rules: Some(vec![
707            PolicyRule {
708                api_groups: Some(vec!["apps".to_string()]),
709                resources: Some(vec!["statefulsets".to_string()]),
710                verbs: vec![
711                    "get".to_string(),
712                    "list".to_string(),
713                    "watch".to_string(),
714                    "create".to_string(),
715                    "update".to_string(),
716                    "patch".to_string(),
717                    "delete".to_string(),
718                ],
719                ..Default::default()
720            },
721            PolicyRule {
722                api_groups: Some(vec!["".to_string()]),
723                resources: Some(vec![
724                    "persistentvolumeclaims".to_string(),
725                    "pods".to_string(),
726                    "secrets".to_string(),
727                    "services".to_string(),
728                ]),
729                verbs: vec![
730                    "get".to_string(),
731                    "list".to_string(),
732                    "watch".to_string(),
733                    "create".to_string(),
734                    "update".to_string(),
735                    "patch".to_string(),
736                    "delete".to_string(),
737                ],
738                ..Default::default()
739            },
740            PolicyRule {
741                api_groups: Some(vec!["".to_string()]),
742                resources: Some(vec!["configmaps".to_string()]),
743                verbs: vec!["get".to_string()],
744                ..Default::default()
745            },
746            PolicyRule {
747                api_groups: Some(vec!["materialize.cloud".to_string()]),
748                resources: Some(vec!["vpcendpoints".to_string()]),
749                verbs: vec![
750                    "get".to_string(),
751                    "list".to_string(),
752                    "watch".to_string(),
753                    "create".to_string(),
754                    "update".to_string(),
755                    "patch".to_string(),
756                    "delete".to_string(),
757                ],
758                ..Default::default()
759            },
760            PolicyRule {
761                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
762                resources: Some(vec!["pods".to_string()]),
763                verbs: vec!["get".to_string(), "list".to_string()],
764                ..Default::default()
765            },
766            PolicyRule {
767                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
768                resources: Some(vec![
769                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
770                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
771                ]),
772                verbs: vec!["get".to_string()],
773                ..Default::default()
774            },
775        ]),
776    }
777}
778
779fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
780    RoleBinding {
781        metadata: mz.managed_resource_meta(mz.role_binding_name()),
782        role_ref: RoleRef {
783            api_group: "".to_string(),
784            kind: "Role".to_string(),
785            name: mz.role_name(),
786        },
787        subjects: Some(vec![Subject {
788            api_group: Some("".to_string()),
789            kind: "ServiceAccount".to_string(),
790            name: mz.service_account_name(),
791            namespace: Some(mz.namespace()),
792        }]),
793    }
794}
795
796fn create_public_service_object(
797    config: &super::Config,
798    mz: &Materialize,
799    generation: u64,
800) -> Service {
801    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
802}
803
804fn create_generation_service_object(
805    config: &super::Config,
806    mz: &Materialize,
807    generation: u64,
808) -> Service {
809    create_base_service_object(
810        config,
811        mz,
812        generation,
813        &mz.environmentd_generation_service_name(generation),
814    )
815}
816
817fn create_base_service_object(
818    config: &super::Config,
819    mz: &Materialize,
820    generation: u64,
821    service_name: &str,
822) -> Service {
823    let ports = vec![
824        ServicePort {
825            port: config.environmentd_sql_port.into(),
826            protocol: Some("TCP".to_string()),
827            name: Some("sql".to_string()),
828            ..Default::default()
829        },
830        ServicePort {
831            port: config.environmentd_http_port.into(),
832            protocol: Some("TCP".to_string()),
833            name: Some("https".to_string()),
834            ..Default::default()
835        },
836        ServicePort {
837            port: config.environmentd_internal_sql_port.into(),
838            protocol: Some("TCP".to_string()),
839            name: Some("internal-sql".to_string()),
840            ..Default::default()
841        },
842        ServicePort {
843            port: config.environmentd_internal_http_port.into(),
844            protocol: Some("TCP".to_string()),
845            name: Some("internal-http".to_string()),
846            ..Default::default()
847        },
848    ];
849
850    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
851
852    let spec = ServiceSpec {
853        type_: Some("ClusterIP".to_string()),
854        cluster_ip: Some("None".to_string()),
855        selector: Some(selector),
856        ports: Some(ports),
857        ..Default::default()
858    };
859
860    Service {
861        metadata: mz.managed_resource_meta(service_name.to_string()),
862        spec: Some(spec),
863        status: None,
864    }
865}
866
867fn create_persist_pubsub_service(
868    config: &super::Config,
869    mz: &Materialize,
870    generation: u64,
871) -> Service {
872    Service {
873        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
874        spec: Some(ServiceSpec {
875            type_: Some("ClusterIP".to_string()),
876            cluster_ip: Some("None".to_string()),
877            selector: Some(btreemap! {
878                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
879            }),
880            ports: Some(vec![ServicePort {
881                name: Some("grpc".to_string()),
882                protocol: Some("TCP".to_string()),
883                port: config.environmentd_internal_persist_pubsub_port.into(),
884                ..Default::default()
885            }]),
886            ..Default::default()
887        }),
888        status: None,
889    }
890}
891
892fn create_environmentd_certificate(
893    config: &super::Config,
894    mz: &Materialize,
895) -> Option<Certificate> {
896    create_certificate(
897        config.default_certificate_specs.internal.clone(),
898        mz,
899        mz.spec.internal_certificate_spec.clone(),
900        mz.environmentd_certificate_name(),
901        mz.environmentd_certificate_secret_name(),
902        Some(vec![
903            mz.environmentd_service_name(),
904            mz.environmentd_service_internal_fqdn(),
905        ]),
906        CertificatePrivateKeyAlgorithm::Ed25519,
907        None,
908    )
909}
910
911fn create_environmentd_statefulset_object(
912    config: &super::Config,
913    mz: &Materialize,
914    generation: u64,
915) -> StatefulSet {
916    // IMPORTANT: Only pass secrets via environment variables. All other
917    // parameters should be passed as command line arguments, possibly gated
918    // with a `meets_minimum_version` call. This ensures typos cause
919    // `environmentd` to fail to start, rather than silently ignoring the
920    // configuration.
921    //
922    // When passing a secret, use a `SecretKeySelector` to forward a secret into
923    // the pod. Do *not* hardcode a secret as the value directly, as doing so
924    // will leak the secret to anyone with permission to describe the pod.
925    let mut env = vec![
926        EnvVar {
927            name: "MZ_METADATA_BACKEND_URL".to_string(),
928            value_from: Some(EnvVarSource {
929                secret_key_ref: Some(SecretKeySelector {
930                    name: mz.backend_secret_name(),
931                    key: "metadata_backend_url".to_string(),
932                    optional: Some(false),
933                }),
934                ..Default::default()
935            }),
936            ..Default::default()
937        },
938        EnvVar {
939            name: "MZ_PERSIST_BLOB_URL".to_string(),
940            value_from: Some(EnvVarSource {
941                secret_key_ref: Some(SecretKeySelector {
942                    name: mz.backend_secret_name(),
943                    key: "persist_backend_url".to_string(),
944                    optional: Some(false),
945                }),
946                ..Default::default()
947            }),
948            ..Default::default()
949        },
950    ];
951
952    env.push(EnvVar {
953        name: "AWS_REGION".to_string(),
954        value: Some(config.region.clone()),
955        ..Default::default()
956    });
957
958    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
959
960    let mut args = vec![];
961
962    if let Some(helm_chart_version) = &config.helm_chart_version {
963        args.push(format!("--helm-chart-version={helm_chart_version}"));
964    }
965
966    // Add environment ID argument.
967    args.push(format!(
968        "--environment-id={}",
969        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
970    ));
971
972    // Add clusterd image argument based on environmentd tag.
973    args.push(format!(
974        "--clusterd-image={}",
975        matching_image_from_environmentd_image_ref(
976            &mz.spec.environmentd_image_ref,
977            "clusterd",
978            None
979        )
980    ));
981
982    // Add cluster and storage host size arguments.
983    args.extend(
984        [
985            config
986                .environmentd_cluster_replica_sizes
987                .as_ref()
988                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
989            config
990                .bootstrap_default_cluster_replica_size
991                .as_ref()
992                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
993            config
994                .bootstrap_builtin_system_cluster_replica_size
995                .as_ref()
996                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
997            config
998                .bootstrap_builtin_probe_cluster_replica_size
999                .as_ref()
1000                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
1001            config
1002                .bootstrap_builtin_support_cluster_replica_size
1003                .as_ref()
1004                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
1005            config
1006                .bootstrap_builtin_catalog_server_cluster_replica_size
1007                .as_ref()
1008                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1009            config
1010                .bootstrap_builtin_analytics_cluster_replica_size
1011                .as_ref()
1012                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1013            config
1014                .bootstrap_builtin_system_cluster_replication_factor
1015                .as_ref()
1016                .map(|replication_factor| {
1017                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1018                }),
1019            config
1020                .bootstrap_builtin_probe_cluster_replication_factor
1021                .as_ref()
1022                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1023            config
1024                .bootstrap_builtin_support_cluster_replication_factor
1025                .as_ref()
1026                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1027            config
1028                .bootstrap_builtin_analytics_cluster_replication_factor
1029                .as_ref()
1030                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1031        ]
1032        .into_iter()
1033        .flatten(),
1034    );
1035
1036    args.extend(
1037        config
1038            .environmentd_allowed_origins
1039            .iter()
1040            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1041    );
1042
1043    args.push(format!(
1044        "--secrets-controller={}",
1045        config.secrets_controller
1046    ));
1047
1048    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1049        if let Ok(cluster_replica_sizes) =
1050            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1051        {
1052            let cluster_replica_sizes: Vec<_> =
1053                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1054            args.push(format!(
1055                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1056                cluster_replica_sizes.join("', '")
1057            ));
1058        }
1059    }
1060    if !config.cloud_provider.is_cloud() {
1061        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1062    }
1063
1064    if config.enable_internal_statement_logging {
1065        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1066    }
1067
1068    if config.disable_statement_logging {
1069        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1070    }
1071
1072    if !mz.spec.enable_rbac {
1073        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1074    }
1075
1076    // Add persist arguments.
1077
1078    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1079    args.push("--persist-isolated-runtime-threads=-1".to_string());
1080
1081    // Add AWS arguments.
1082    if config.cloud_provider == CloudProvider::Aws {
1083        if let Some(azs) = config.environmentd_availability_zones.as_ref() {
1084            for az in azs {
1085                args.push(format!("--availability-zone={az}"));
1086            }
1087        }
1088
1089        if let Some(environmentd_connection_role_arn) = mz
1090            .spec
1091            .environmentd_connection_role_arn
1092            .as_deref()
1093            .or(config.environmentd_connection_role_arn.as_deref())
1094        {
1095            args.push(format!(
1096                "--aws-connection-role-arn={}",
1097                environmentd_connection_role_arn
1098            ));
1099        }
1100        if let Some(account_id) = &config.aws_account_id {
1101            args.push(format!("--aws-account-id={account_id}"));
1102        }
1103
1104        args.extend([format!(
1105            "--aws-secrets-controller-tags=Environment={}",
1106            mz.name_unchecked()
1107        )]);
1108        args.extend_from_slice(&config.aws_secrets_controller_tags);
1109    }
1110
1111    // Add Kubernetes arguments.
1112    args.extend([
1113        "--orchestrator=kubernetes".into(),
1114        format!(
1115            "--orchestrator-kubernetes-service-account={}",
1116            &mz.service_account_name()
1117        ),
1118        format!(
1119            "--orchestrator-kubernetes-image-pull-policy={}",
1120            config.image_pull_policy.as_kebab_case_str(),
1121        ),
1122    ]);
1123    for selector in &config.clusterd_node_selector {
1124        args.push(format!(
1125            "--orchestrator-kubernetes-service-node-selector={}={}",
1126            selector.key, selector.value,
1127        ));
1128    }
1129    if mz.meets_minimum_version(&V144) {
1130        if let Some(affinity) = &config.clusterd_affinity {
1131            let affinity = serde_json::to_string(affinity).unwrap();
1132            args.push(format!(
1133                "--orchestrator-kubernetes-service-affinity={affinity}"
1134            ))
1135        }
1136        if let Some(tolerations) = &config.clusterd_tolerations {
1137            let tolerations = serde_json::to_string(tolerations).unwrap();
1138            args.push(format!(
1139                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1140            ))
1141        }
1142    }
1143    if let Some(scheduler_name) = &config.scheduler_name {
1144        args.push(format!(
1145            "--orchestrator-kubernetes-scheduler-name={}",
1146            scheduler_name
1147        ));
1148    }
1149    if mz.meets_minimum_version(&V154_DEV0) {
1150        args.extend(
1151            mz.spec
1152                .pod_annotations
1153                .as_ref()
1154                .map(|annotations| annotations.iter())
1155                .unwrap_or_default()
1156                .map(|(key, val)| {
1157                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1158                }),
1159        );
1160    }
1161    args.extend(
1162        mz.default_labels()
1163            .iter()
1164            .chain(
1165                mz.spec
1166                    .pod_labels
1167                    .as_ref()
1168                    .map(|labels| labels.iter())
1169                    .unwrap_or_default(),
1170            )
1171            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1172    );
1173    if let Some(status) = &mz.status {
1174        args.push(format!(
1175            "--orchestrator-kubernetes-name-prefix=mz{}-",
1176            status.resource_id
1177        ));
1178    }
1179
1180    // Add logging and tracing arguments.
1181    args.extend(["--log-format=json".into()]);
1182    if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
1183        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1184    }
1185    // --opentelemetry-resource also configures sentry tags
1186    args.extend([
1187        format!(
1188            "--opentelemetry-resource=organization_id={}",
1189            mz.spec.environment_id
1190        ),
1191        format!(
1192            "--opentelemetry-resource=environment_name={}",
1193            mz.name_unchecked()
1194        ),
1195    ]);
1196
1197    if let Some(segment_api_key) = &config.segment_api_key {
1198        args.push(format!("--segment-api-key={}", segment_api_key));
1199        if config.segment_client_side {
1200            args.push("--segment-client-side".into());
1201        }
1202    }
1203
1204    let mut volumes = Vec::new();
1205    let mut volume_mounts = Vec::new();
1206    if issuer_ref_defined(
1207        &config.default_certificate_specs.internal,
1208        &mz.spec.internal_certificate_spec,
1209    ) {
1210        volumes.push(Volume {
1211            name: "certificate".to_owned(),
1212            secret: Some(SecretVolumeSource {
1213                default_mode: Some(0o400),
1214                secret_name: Some(mz.environmentd_certificate_secret_name()),
1215                items: None,
1216                optional: Some(false),
1217            }),
1218            ..Default::default()
1219        });
1220        volume_mounts.push(VolumeMount {
1221            name: "certificate".to_owned(),
1222            mount_path: "/etc/materialized".to_owned(),
1223            read_only: Some(true),
1224            ..Default::default()
1225        });
1226        args.extend([
1227            "--tls-mode=require".into(),
1228            "--tls-cert=/etc/materialized/tls.crt".into(),
1229            "--tls-key=/etc/materialized/tls.key".into(),
1230        ]);
1231    } else {
1232        args.push("--tls-mode=disable".to_string());
1233    }
1234    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1235        args.push(format!(
1236            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1237            ephemeral_volume_class
1238        ));
1239    }
1240    // The `materialize` user used by clusterd always has gid 999.
1241    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1242
1243    // Add system_param configmap
1244    // This feature was enabled in 0.163 but did not have testing until after 0.164.
1245    // 0.165 should work with anything greater than 0.164 including v26 and v25.
1246    if mz.meets_minimum_version(&V26_1_0) {
1247        if let Some(ref name) = mz.spec.system_parameter_configmap_name {
1248            volumes.push(Volume {
1249                name: "system-params".to_string(),
1250                config_map: Some(ConfigMapVolumeSource {
1251                    default_mode: Some(0o400),
1252                    name: name.to_owned(),
1253                    items: None,
1254                    optional: Some(true),
1255                }),
1256                ..Default::default()
1257            });
1258            volume_mounts.push(VolumeMount {
1259                name: "system-params".to_string(),
1260                // The user must write to the `system-params.json` entry in the config map
1261                mount_path: "/system-params".to_owned(),
1262                read_only: Some(true),
1263                ..Default::default()
1264            });
1265            args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
1266            args.push("--config-sync-loop-interval=1s".to_string());
1267        }
1268    }
1269
1270    // Add Sentry arguments.
1271    if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
1272        args.push(format!("--sentry-dsn={}", sentry_dsn));
1273        if let Some(sentry_environment) = &config.tracing.sentry_environment {
1274            args.push(format!("--sentry-environment={}", sentry_environment));
1275        }
1276        args.push(format!("--sentry-tag=region={}", config.region));
1277    }
1278
1279    // Add Persist PubSub arguments
1280    args.push(format!(
1281        "--persist-pubsub-url=http://{}:{}",
1282        mz.persist_pubsub_service_name(generation),
1283        config.environmentd_internal_persist_pubsub_port,
1284    ));
1285    args.push(format!(
1286        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1287        config.environmentd_internal_persist_pubsub_port
1288    ));
1289
1290    args.push(format!("--deploy-generation={}", generation));
1291
1292    // Add URL for internal user impersonation endpoint
1293    args.push(format!(
1294        "--internal-console-redirect-url={}",
1295        &config.internal_console_proxy_url,
1296    ));
1297
1298    if !config.collect_pod_metrics {
1299        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1300    }
1301    if config.enable_prometheus_scrape_annotations {
1302        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1303    }
1304
1305    // the --disable-license-key-checks environmentd flag only existed
1306    // between these versions
1307    if config.disable_license_key_checks {
1308        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1309            args.push("--disable-license-key-checks".into());
1310        }
1311    }
1312
1313    // as of version 0.153, the ability to disable license key checks was
1314    // removed, so we should always set up license keys in that case
1315    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1316        || mz.meets_minimum_version(&V153)
1317    {
1318        volume_mounts.push(VolumeMount {
1319            name: "license-key".to_string(),
1320            mount_path: "/license_key".to_string(),
1321            ..Default::default()
1322        });
1323        volumes.push(Volume {
1324            name: "license-key".to_string(),
1325            secret: Some(SecretVolumeSource {
1326                default_mode: Some(256),
1327                optional: Some(false),
1328                secret_name: Some(mz.backend_secret_name()),
1329                items: Some(vec![KeyToPath {
1330                    key: "license_key".to_string(),
1331                    path: "license_key".to_string(),
1332                    ..Default::default()
1333                }]),
1334                ..Default::default()
1335            }),
1336            ..Default::default()
1337        });
1338        env.push(EnvVar {
1339            name: "MZ_LICENSE_KEY".to_string(),
1340            value: Some("/license_key/license_key".to_string()),
1341            ..Default::default()
1342        });
1343    }
1344
1345    // Add user-specified extra arguments.
1346    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1347        args.extend(extra_args.iter().cloned());
1348    }
1349
1350    let probe = Probe {
1351        initial_delay_seconds: Some(1),
1352        failure_threshold: Some(12),
1353        tcp_socket: Some(TCPSocketAction {
1354            host: None,
1355            port: IntOrString::Int(config.environmentd_sql_port.into()),
1356        }),
1357        ..Default::default()
1358    };
1359
1360    let security_context = if config.enable_security_context {
1361        // Since we want to adhere to the most restrictive security context, all
1362        // of these fields have to be set how they are.
1363        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1364        Some(SecurityContext {
1365            run_as_non_root: Some(true),
1366            capabilities: Some(Capabilities {
1367                drop: Some(vec!["ALL".to_string()]),
1368                ..Default::default()
1369            }),
1370            seccomp_profile: Some(SeccompProfile {
1371                type_: "RuntimeDefault".to_string(),
1372                ..Default::default()
1373            }),
1374            allow_privilege_escalation: Some(false),
1375            ..Default::default()
1376        })
1377    } else {
1378        None
1379    };
1380
1381    let ports = vec![
1382        ContainerPort {
1383            container_port: config.environmentd_sql_port.into(),
1384            name: Some("sql".to_owned()),
1385            ..Default::default()
1386        },
1387        ContainerPort {
1388            container_port: config.environmentd_internal_sql_port.into(),
1389            name: Some("internal-sql".to_owned()),
1390            ..Default::default()
1391        },
1392        ContainerPort {
1393            container_port: config.environmentd_http_port.into(),
1394            name: Some("http".to_owned()),
1395            ..Default::default()
1396        },
1397        ContainerPort {
1398            container_port: config.environmentd_internal_http_port.into(),
1399            name: Some("internal-http".to_owned()),
1400            ..Default::default()
1401        },
1402        ContainerPort {
1403            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1404            name: Some("persist-pubsub".to_owned()),
1405            ..Default::default()
1406        },
1407    ];
1408
1409    // Add networking arguments.
1410    if mz.meets_minimum_version(&V147_DEV0) {
1411        volume_mounts.push(VolumeMount {
1412            name: "listeners-configmap".to_string(),
1413            mount_path: "/listeners".to_string(),
1414            ..Default::default()
1415        });
1416        volumes.push(Volume {
1417            name: "listeners-configmap".to_string(),
1418            config_map: Some(ConfigMapVolumeSource {
1419                name: mz.listeners_configmap_name(generation),
1420                default_mode: Some(256),
1421                optional: Some(false),
1422                items: Some(vec![KeyToPath {
1423                    key: "listeners.json".to_string(),
1424                    path: "listeners.json".to_string(),
1425                    ..Default::default()
1426                }]),
1427            }),
1428            ..Default::default()
1429        });
1430        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1431        if matches!(
1432            mz.spec.authenticator_kind,
1433            AuthenticatorKind::Password | AuthenticatorKind::Sasl
1434        ) {
1435            args.push("--system-parameter-default=enable_password_auth=true".into());
1436            env.push(EnvVar {
1437                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1438                value_from: Some(EnvVarSource {
1439                    secret_key_ref: Some(SecretKeySelector {
1440                        name: mz.backend_secret_name(),
1441                        key: "external_login_password_mz_system".to_string(),
1442                        optional: Some(false),
1443                    }),
1444                    ..Default::default()
1445                }),
1446                ..Default::default()
1447            })
1448        }
1449    } else {
1450        args.extend([
1451            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1452            format!(
1453                "--http-listen-addr=0.0.0.0:{}",
1454                config.environmentd_http_port
1455            ),
1456            format!(
1457                "--internal-sql-listen-addr=0.0.0.0:{}",
1458                config.environmentd_internal_sql_port
1459            ),
1460            format!(
1461                "--internal-http-listen-addr=0.0.0.0:{}",
1462                config.environmentd_internal_http_port
1463            ),
1464        ]);
1465    }
1466
1467    let container = Container {
1468        name: "environmentd".to_owned(),
1469        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1470        image_pull_policy: Some(config.image_pull_policy.to_string()),
1471        ports: Some(ports),
1472        args: Some(args),
1473        env: Some(env),
1474        volume_mounts: Some(volume_mounts),
1475        liveness_probe: Some(probe.clone()),
1476        readiness_probe: Some(probe),
1477        resources: mz
1478            .spec
1479            .environmentd_resource_requirements
1480            .clone()
1481            .or_else(|| config.environmentd_default_resources.clone()),
1482        security_context: security_context.clone(),
1483        ..Default::default()
1484    };
1485
1486    let mut pod_template_labels = mz.default_labels();
1487    pod_template_labels.insert(
1488        "materialize.cloud/name".to_owned(),
1489        mz.environmentd_statefulset_name(generation),
1490    );
1491    pod_template_labels.insert(
1492        "materialize.cloud/app".to_owned(),
1493        mz.environmentd_app_name(),
1494    );
1495    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1496    pod_template_labels.extend(
1497        mz.spec
1498            .pod_labels
1499            .as_ref()
1500            .map(|labels| labels.iter())
1501            .unwrap_or_default()
1502            .map(|(key, value)| (key.clone(), value.clone())),
1503    );
1504
1505    let mut pod_template_annotations = btreemap! {
1506        // We can re-enable eviction once we have HA
1507        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1508
1509        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1510        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1511        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1512        "materialize.cloud/generation".to_owned() => generation.to_string(),
1513    };
1514    if config.enable_prometheus_scrape_annotations {
1515        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1516        pod_template_annotations.insert(
1517            "prometheus.io/port".to_owned(),
1518            config.environmentd_internal_http_port.to_string(),
1519        );
1520        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1521        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1522        pod_template_annotations.insert(
1523            "materialize.prometheus.io/mz_usage_path".to_owned(),
1524            "/metrics/mz_usage".to_string(),
1525        );
1526        pod_template_annotations.insert(
1527            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1528            "/metrics/mz_frontier".to_string(),
1529        );
1530        pod_template_annotations.insert(
1531            "materialize.prometheus.io/mz_compute_path".to_owned(),
1532            "/metrics/mz_compute".to_string(),
1533        );
1534        pod_template_annotations.insert(
1535            "materialize.prometheus.io/mz_storage_path".to_owned(),
1536            "/metrics/mz_storage".to_string(),
1537        );
1538    }
1539    pod_template_annotations.extend(
1540        mz.spec
1541            .pod_annotations
1542            .as_ref()
1543            .map(|annotations| annotations.iter())
1544            .unwrap_or_default()
1545            .map(|(key, value)| (key.clone(), value.clone())),
1546    );
1547
1548    let mut tolerations = vec![
1549        // When the node becomes `NotReady` it indicates there is a problem with the node,
1550        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1551        // But we want to limit this to 30s for faster recovery
1552        Toleration {
1553            effect: Some("NoExecute".into()),
1554            key: Some("node.kubernetes.io/not-ready".into()),
1555            operator: Some("Exists".into()),
1556            toleration_seconds: Some(30),
1557            value: None,
1558        },
1559        Toleration {
1560            effect: Some("NoExecute".into()),
1561            key: Some("node.kubernetes.io/unreachable".into()),
1562            operator: Some("Exists".into()),
1563            toleration_seconds: Some(30),
1564            value: None,
1565        },
1566    ];
1567    if let Some(user_tolerations) = &config.environmentd_tolerations {
1568        tolerations.extend(user_tolerations.iter().cloned());
1569    }
1570    let tolerations = Some(tolerations);
1571
1572    let pod_template_spec = PodTemplateSpec {
1573        // not using managed_resource_meta because the pod should be owned
1574        // by the statefulset, not the materialize instance
1575        metadata: Some(ObjectMeta {
1576            labels: Some(pod_template_labels),
1577            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1578            ..Default::default()
1579        }),
1580        spec: Some(PodSpec {
1581            containers: vec![container],
1582            node_selector: Some(
1583                config
1584                    .environmentd_node_selector
1585                    .iter()
1586                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1587                    .collect(),
1588            ),
1589            affinity: config.environmentd_affinity.clone(),
1590            scheduler_name: config.scheduler_name.clone(),
1591            service_account_name: Some(mz.service_account_name()),
1592            volumes: Some(volumes),
1593            security_context: Some(PodSecurityContext {
1594                fs_group: Some(999),
1595                run_as_user: Some(999),
1596                run_as_group: Some(999),
1597                ..Default::default()
1598            }),
1599            tolerations,
1600            // This (apparently) has the side effect of automatically starting a new pod
1601            // when the previous pod is currently terminating. This side steps the statefulset fencing
1602            // but allows for quicker recovery from a failed node
1603            // The Kubernetes documentation strongly advises against this
1604            // setting, as StatefulSets attempt to provide "at most once"
1605            // semantics [0]-- that is, the guarantee that for a given pod in a
1606            // StatefulSet there is *at most* one pod with that identity running
1607            // in the cluster
1608            //
1609            // Materialize, however, has been carefully designed to *not* rely
1610            // on this guarantee. (In fact, we do not believe that correct
1611            // distributed systems can meaningfully rely on Kubernetes's
1612            // guarantee--network packets from a pod can be arbitrarily delayed,
1613            // long past that pod's termination.) Running two `environmentd`
1614            // processes is safe: the newer process will safely and correctly
1615            // fence out the older process via CockroachDB. In the future,
1616            // we'll support running multiple `environmentd` processes in
1617            // parallel for high availability.
1618            //
1619            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1620            termination_grace_period_seconds: Some(0),
1621            ..Default::default()
1622        }),
1623    };
1624
1625    let mut match_labels = BTreeMap::new();
1626    match_labels.insert(
1627        "materialize.cloud/name".to_owned(),
1628        mz.environmentd_statefulset_name(generation),
1629    );
1630
1631    let statefulset_spec = StatefulSetSpec {
1632        replicas: Some(1),
1633        template: pod_template_spec,
1634        service_name: Some(mz.environmentd_service_name()),
1635        selector: LabelSelector {
1636            match_expressions: None,
1637            match_labels: Some(match_labels),
1638        },
1639        ..Default::default()
1640    };
1641
1642    StatefulSet {
1643        metadata: ObjectMeta {
1644            annotations: Some(btreemap! {
1645                "materialize.cloud/generation".to_owned() => generation.to_string(),
1646                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1647            }),
1648            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1649        },
1650        spec: Some(statefulset_spec),
1651        status: None,
1652    }
1653}
1654
1655fn create_connection_info(
1656    config: &super::Config,
1657    mz: &Materialize,
1658    generation: u64,
1659) -> ConnectionInfo {
1660    let external_enable_tls = issuer_ref_defined(
1661        &config.default_certificate_specs.internal,
1662        &mz.spec.internal_certificate_spec,
1663    );
1664    let authenticator_kind = mz.spec.authenticator_kind;
1665
1666    let mut listeners_config = ListenersConfig {
1667        sql: btreemap! {
1668            "external".to_owned() => SqlListenerConfig{
1669                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1670                authenticator_kind,
1671                allowed_roles: AllowedRoles::Normal,
1672                enable_tls: external_enable_tls,
1673            },
1674            "internal".to_owned() => SqlListenerConfig{
1675                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1676                authenticator_kind: AuthenticatorKind::None,
1677                // Should this just be Internal?
1678                allowed_roles: AllowedRoles::NormalAndInternal,
1679                enable_tls: false,
1680            },
1681        },
1682        http: btreemap! {
1683            "external".to_owned() => HttpListenerConfig{
1684                base: BaseListenerConfig {
1685                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1686                    // SASL authentication is only supported for SQL (PostgreSQL wire protocol).
1687                    // HTTP listeners must use Password authentication when SASL is enabled.
1688                    // This is validated at environmentd startup via ListenerConfig::validate().
1689                    authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1690                        AuthenticatorKind::Password
1691                    } else {
1692                        authenticator_kind
1693                    },
1694                    allowed_roles: AllowedRoles::Normal,
1695                    enable_tls: external_enable_tls,
1696                },
1697                routes: HttpRoutesEnabled{
1698                    base: true,
1699                    webhook: true,
1700                    internal: false,
1701                    metrics: false,
1702                    profiling: false,
1703                }
1704            },
1705            "internal".to_owned() => HttpListenerConfig{
1706                base: BaseListenerConfig {
1707                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1708                    authenticator_kind: AuthenticatorKind::None,
1709                    // Should this just be Internal?
1710                    allowed_roles: AllowedRoles::NormalAndInternal,
1711                    enable_tls: false,
1712                },
1713                routes: HttpRoutesEnabled{
1714                    base: true,
1715                    webhook: true,
1716                    internal: true,
1717                    metrics: true,
1718                    profiling: true,
1719                }
1720            },
1721        },
1722    };
1723
1724    if matches!(
1725        authenticator_kind,
1726        AuthenticatorKind::Password | AuthenticatorKind::Sasl
1727    ) {
1728        listeners_config.sql.remove("internal");
1729        listeners_config.http.remove("internal");
1730
1731        listeners_config.sql.get_mut("external").map(|listener| {
1732            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1733            listener
1734        });
1735        listeners_config.http.get_mut("external").map(|listener| {
1736            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1737            listener.routes.internal = true;
1738            listener.routes.profiling = true;
1739            listener
1740        });
1741
1742        listeners_config.http.insert(
1743            "metrics".to_owned(),
1744            HttpListenerConfig {
1745                base: BaseListenerConfig {
1746                    addr: SocketAddr::new(
1747                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1748                        config.environmentd_internal_http_port,
1749                    ),
1750                    authenticator_kind: AuthenticatorKind::None,
1751                    allowed_roles: AllowedRoles::NormalAndInternal,
1752                    enable_tls: false,
1753                },
1754                routes: HttpRoutesEnabled {
1755                    base: false,
1756                    webhook: false,
1757                    internal: false,
1758                    metrics: true,
1759                    profiling: false,
1760                },
1761            },
1762        );
1763    }
1764
1765    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1766    let listeners_configmap = ConfigMap {
1767        binary_data: None,
1768        data: Some(btreemap! {
1769            "listeners.json".to_owned() => listeners_json,
1770        }),
1771        immutable: None,
1772        metadata: ObjectMeta {
1773            annotations: Some(btreemap! {
1774                "materialize.cloud/generation".to_owned() => generation.to_string(),
1775            }),
1776            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1777        },
1778    };
1779
1780    let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1781        AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1782            let scheme = if external_enable_tls { "https" } else { "http" };
1783            (
1784                scheme,
1785                config.environmentd_http_port,
1786                Some(mz.spec.backend_secret_name.clone()),
1787            )
1788        }
1789        _ => ("http", config.environmentd_internal_http_port, None),
1790    };
1791    let environmentd_url = format!(
1792        "{}://{}.{}.svc.cluster.local:{}",
1793        scheme,
1794        mz.environmentd_generation_service_name(generation),
1795        mz.namespace(),
1796        leader_api_port,
1797    );
1798    ConnectionInfo {
1799        environmentd_url,
1800        listeners_configmap,
1801        mz_system_secret_name,
1802    }
1803}
1804
1805// see materialize/src/environmentd/src/http.rs
1806#[derive(Debug, Deserialize, PartialEq, Eq)]
1807struct BecomeLeaderResponse {
1808    result: BecomeLeaderResult,
1809}
1810
1811#[derive(Debug, Deserialize, PartialEq, Eq)]
1812enum BecomeLeaderResult {
1813    Success,
1814    Failure { message: String },
1815}
1816
1817#[derive(Debug, Deserialize, PartialEq, Eq)]
1818struct SkipCatchupError {
1819    message: String,
1820}