mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
25            VolumeMount,
26        },
27        networking::v1::{
28            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
29            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
30        },
31        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
32    },
33    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
34};
35use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
36use maplit::btreemap;
37use mz_server_core::listeners::{
38    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
39    ListenersConfig, SqlListenerConfig,
40};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::k8s::{apply_resource, delete_resource, get_resource};
50use crate::tls::{create_certificate, issuer_ref_defined};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
53use mz_cloud_resources::crd::{
54    ManagedResource,
55    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
56};
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 140,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69    major: 0,
70    minor: 147,
71    patch: 0,
72    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 0,
78    minor: 154,
79    patch: 0,
80    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85/// Describes the status of a deployment.
86///
87/// This is a simplified representation of `DeploymentState`, suitable for
88/// announcement to the external orchestrator.
89#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91    /// This deployment is not the leader. It is initializing and is not yet
92    /// ready to become the leader.
93    Initializing,
94    /// This deployment is not the leader, but it is ready to become the leader.
95    ReadyToPromote,
96    /// This deployment is in the process of becoming the leader.
97    Promoting,
98    /// This deployment is the leader.
99    IsLeader,
100}
101
102#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
103pub struct GetLeaderStatusResponse {
104    status: DeploymentStatus,
105}
106
107#[derive(Deserialize, Serialize)]
108pub struct LoginCredentials {
109    username: String,
110    // TODO Password type?
111    password: String,
112}
113
114#[derive(Debug, Serialize)]
115pub struct ConnectionInfo {
116    pub environmentd_url: String,
117    pub mz_system_secret_name: Option<String>,
118    pub listeners_configmap: ConfigMap,
119}
120
121#[derive(Debug, Serialize)]
122pub struct Resources {
123    pub generation: u64,
124    pub environmentd_network_policies: Vec<NetworkPolicy>,
125    pub service_account: Box<Option<ServiceAccount>>,
126    pub role: Box<Role>,
127    pub role_binding: Box<RoleBinding>,
128    pub public_service: Box<Service>,
129    pub generation_service: Box<Service>,
130    pub persist_pubsub_service: Box<Service>,
131    pub environmentd_certificate: Box<Option<Certificate>>,
132    pub environmentd_statefulset: Box<StatefulSet>,
133    pub connection_info: Box<ConnectionInfo>,
134}
135
136impl Resources {
137    pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
138        let environmentd_network_policies = create_environmentd_network_policies(config, mz);
139
140        let service_account = Box::new(create_service_account_object(config, mz));
141        let role = Box::new(create_role_object(mz));
142        let role_binding = Box::new(create_role_binding_object(mz));
143        let public_service = Box::new(create_public_service_object(config, mz, generation));
144        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
145        let persist_pubsub_service =
146            Box::new(create_persist_pubsub_service(config, mz, generation));
147        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
148        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
149            config, mz, generation,
150        ));
151        let connection_info = Box::new(create_connection_info(config, mz, generation));
152
153        Self {
154            generation,
155            environmentd_network_policies,
156            service_account,
157            role,
158            role_binding,
159            public_service,
160            generation_service,
161            persist_pubsub_service,
162            environmentd_certificate,
163            environmentd_statefulset,
164            connection_info,
165        }
166    }
167
168    #[instrument]
169    pub async fn apply(
170        &self,
171        client: &Client,
172        force_promote: bool,
173        namespace: &str,
174    ) -> Result<Option<Action>, Error> {
175        let environmentd_network_policy_api: Api<NetworkPolicy> =
176            Api::namespaced(client.clone(), namespace);
177        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
178        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
179        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
180        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
181        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
182        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
183        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
184
185        for policy in &self.environmentd_network_policies {
186            trace!("applying network policy {}", policy.name_unchecked());
187            apply_resource(&environmentd_network_policy_api, policy).await?;
188        }
189
190        if let Some(service_account) = &*self.service_account {
191            trace!("applying environmentd service account");
192            apply_resource(&service_account_api, service_account).await?;
193        }
194
195        trace!("applying environmentd role");
196        apply_resource(&role_api, &*self.role).await?;
197
198        trace!("applying environmentd role binding");
199        apply_resource(&role_binding_api, &*self.role_binding).await?;
200
201        trace!("applying environmentd per-generation service");
202        apply_resource(&service_api, &*self.generation_service).await?;
203
204        trace!("creating persist pubsub service");
205        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
206
207        if let Some(certificate) = &*self.environmentd_certificate {
208            trace!("creating new environmentd certificate");
209            apply_resource(&certificate_api, certificate).await?;
210        }
211
212        trace!("applying listeners configmap");
213        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
214
215        trace!("creating new environmentd statefulset");
216        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
217
218        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
219
220        let statefulset = get_resource(
221            &statefulset_api,
222            &self.environmentd_statefulset.name_unchecked(),
223        )
224        .await?;
225        if statefulset
226            .and_then(|statefulset| statefulset.status)
227            .and_then(|status| status.ready_replicas)
228            .unwrap_or(0)
229            == 0
230        {
231            trace!("environmentd statefulset is not ready yet...");
232            return Ok(Some(retry_action));
233        }
234
235        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
236            return Ok(Some(retry_action));
237        };
238        let status_url = reqwest::Url::parse(&format!(
239            "{}/api/leader/status",
240            self.connection_info.environmentd_url,
241        ))
242        .unwrap();
243
244        match http_client.get(status_url.clone()).send().await {
245            Ok(response) => {
246                let response: GetLeaderStatusResponse = match response.error_for_status() {
247                    Ok(response) => response.json().await?,
248                    Err(e) => {
249                        trace!("failed to get status of environmentd, retrying... ({e})");
250                        return Ok(Some(retry_action));
251                    }
252                };
253                if force_promote {
254                    trace!("skipping cluster catchup");
255                    let skip_catchup_url = reqwest::Url::parse(&format!(
256                        "{}/api/leader/skip-catchup",
257                        self.connection_info.environmentd_url,
258                    ))
259                    .unwrap();
260                    let response = http_client.post(skip_catchup_url).send().await?;
261                    if response.status() == StatusCode::BAD_REQUEST {
262                        let err: SkipCatchupError = response.json().await?;
263                        return Err(
264                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
265                        );
266                    }
267                } else {
268                    match response.status {
269                        DeploymentStatus::Initializing => {
270                            trace!("environmentd is still initializing, retrying...");
271                            return Ok(Some(retry_action));
272                        }
273                        DeploymentStatus::ReadyToPromote
274                        | DeploymentStatus::Promoting
275                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
276                    }
277                }
278            }
279            Err(e) => {
280                trace!("failed to connect to environmentd, retrying... ({e})");
281                return Ok(Some(retry_action));
282            }
283        }
284
285        Ok(None)
286    }
287
288    #[instrument]
289    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
290        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
291        Some(match &self.connection_info.mz_system_secret_name {
292            Some(mz_system_secret_name) => {
293                let http_client = reqwest::Client::builder()
294                    .timeout(std::time::Duration::from_secs(10))
295                    .cookie_store(true)
296                    // TODO add_root_certificate instead
297                    .danger_accept_invalid_certs(true)
298                    .build()
299                    .unwrap();
300
301                let secret = secret_api
302                    .get(mz_system_secret_name)
303                    .await
304                    .map_err(|e| {
305                        error!("Failed to get backend secret: {:?}", e);
306                        e
307                    })
308                    .ok()?;
309                if let Some(data) = secret.data {
310                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
311                        let password = String::from_utf8_lossy(&password.0).to_string();
312                        let login_url = reqwest::Url::parse(&format!(
313                            "{}/api/login",
314                            self.connection_info.environmentd_url,
315                        ))
316                        .unwrap();
317                        match http_client
318                            .post(login_url)
319                            .body(
320                                serde_json::to_string(&LoginCredentials {
321                                    username: "mz_system".to_owned(),
322                                    password,
323                                })
324                                .expect(
325                                    "Serializing a simple struct with utf8 strings doesn't fail.",
326                                ),
327                            )
328                            .header("Content-Type", "application/json")
329                            .send()
330                            .await
331                        {
332                            Ok(response) => {
333                                if let Err(e) = response.error_for_status() {
334                                    trace!("failed to login to environmentd, retrying... ({e})");
335                                    return None;
336                                }
337                            }
338                            Err(e) => {
339                                trace!("failed to connect to environmentd, retrying... ({e})");
340                                return None;
341                            }
342                        };
343                    }
344                };
345                http_client
346            }
347            None => reqwest::Client::builder()
348                .timeout(std::time::Duration::from_secs(10))
349                .build()
350                .unwrap(),
351        })
352    }
353
354    #[instrument]
355    pub async fn promote_services(
356        &self,
357        client: &Client,
358        namespace: &str,
359    ) -> Result<Option<Action>, Error> {
360        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
361        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
362
363        let promote_url = reqwest::Url::parse(&format!(
364            "{}/api/leader/promote",
365            self.connection_info.environmentd_url,
366        ))
367        .unwrap();
368
369        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
370            return Ok(Some(retry_action));
371        };
372
373        trace!("promoting new environmentd to leader");
374        let response = http_client.post(promote_url).send().await?;
375        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
376        if let BecomeLeaderResult::Failure { message } = response.result {
377            return Err(Error::Anyhow(anyhow::anyhow!(
378                "failed to promote new environmentd: {message}"
379            )));
380        }
381
382        // A successful POST to the promotion endpoint only indicates
383        // that the promotion process was kicked off. It does not
384        // guarantee that the environment will be successfully promoted
385        // (e.g., if the environment crashes immediately after responding
386        // to the request, but before executing the takeover, the
387        // promotion will be lost).
388        //
389        // To guarantee the environment has been promoted successfully,
390        // we must wait to see at least one `IsLeader` status returned
391        // from the environment.
392
393        let status_url = reqwest::Url::parse(&format!(
394            "{}/api/leader/status",
395            self.connection_info.environmentd_url,
396        ))
397        .unwrap();
398        match http_client.get(status_url.clone()).send().await {
399            Ok(response) => {
400                let response: GetLeaderStatusResponse = response.json().await?;
401                if response.status != DeploymentStatus::IsLeader {
402                    trace!(
403                        "environmentd is still promoting (status: {:?}), retrying...",
404                        response.status
405                    );
406                    return Ok(Some(retry_action));
407                } else {
408                    trace!("environmentd is ready");
409                }
410            }
411            Err(e) => {
412                trace!("failed to connect to environmentd, retrying... ({e})");
413                return Ok(Some(retry_action));
414            }
415        }
416
417        trace!("applying environmentd public service");
418        apply_resource(&service_api, &*self.public_service).await?;
419
420        Ok(None)
421    }
422
423    #[instrument]
424    pub async fn teardown_generation(
425        &self,
426        client: &Client,
427        mz: &Materialize,
428        generation: u64,
429    ) -> Result<(), anyhow::Error> {
430        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
431        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
432        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
433
434        trace!("deleting environmentd statefulset for generation {generation}");
435        delete_resource(
436            &statefulset_api,
437            &mz.environmentd_statefulset_name(generation),
438        )
439        .await?;
440
441        trace!("deleting persist pubsub service for generation {generation}");
442        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
443
444        trace!("deleting environmentd per-generation service for generation {generation}");
445        delete_resource(
446            &service_api,
447            &mz.environmentd_generation_service_name(generation),
448        )
449        .await?;
450
451        trace!("deleting listeners configmap for generation {generation}");
452        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
453
454        Ok(())
455    }
456
457    // ideally we would just be able to hash the objects directly, but the
458    // generated kubernetes objects don't implement the Hash trait
459    pub fn generate_hash(&self) -> String {
460        let mut hasher = Sha256::new();
461        hasher.update(&serde_json::to_string(self).unwrap());
462        format!("{:x}", hasher.finalize())
463    }
464}
465
466fn create_environmentd_network_policies(
467    config: &super::Config,
468    mz: &Materialize,
469) -> Vec<NetworkPolicy> {
470    let mut network_policies = Vec::new();
471    if config.network_policies_internal_enabled {
472        let environmentd_label_selector = LabelSelector {
473            match_labels: Some(
474                mz.default_labels()
475                    .into_iter()
476                    .chain([(
477                        "materialize.cloud/app".to_owned(),
478                        mz.environmentd_app_name(),
479                    )])
480                    .collect(),
481            ),
482            ..Default::default()
483        };
484        let orchestratord_label_selector = LabelSelector {
485            match_labels: Some(
486                config
487                    .orchestratord_pod_selector_labels
488                    .iter()
489                    .cloned()
490                    .map(|kv| (kv.key, kv.value))
491                    .collect(),
492            ),
493            ..Default::default()
494        };
495        // TODO (Alex) filter to just clusterd and environmentd,
496        // once we get a consistent set of labels for both.
497        let all_pods_label_selector = LabelSelector {
498            match_labels: Some(mz.default_labels()),
499            ..Default::default()
500        };
501        network_policies.extend([
502            // Allow all clusterd/environmentd traffic (between pods in the
503            // same environment)
504            NetworkPolicy {
505                metadata: mz
506                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
507                spec: Some(NetworkPolicySpec {
508                    egress: Some(vec![NetworkPolicyEgressRule {
509                        to: Some(vec![NetworkPolicyPeer {
510                            pod_selector: Some(all_pods_label_selector.clone()),
511                            ..Default::default()
512                        }]),
513                        ..Default::default()
514                    }]),
515                    ingress: Some(vec![NetworkPolicyIngressRule {
516                        from: Some(vec![NetworkPolicyPeer {
517                            pod_selector: Some(all_pods_label_selector.clone()),
518                            ..Default::default()
519                        }]),
520                        ..Default::default()
521                    }]),
522                    pod_selector: Some(all_pods_label_selector.clone()),
523                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
524                    ..Default::default()
525                }),
526            },
527            // Allow traffic from orchestratord to environmentd in order to hit
528            // the promotion endpoints during upgrades
529            NetworkPolicy {
530                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
531                spec: Some(NetworkPolicySpec {
532                    ingress: Some(vec![NetworkPolicyIngressRule {
533                        from: Some(vec![NetworkPolicyPeer {
534                            namespace_selector: Some(LabelSelector {
535                                match_labels: Some(btreemap! {
536                                    "kubernetes.io/metadata.name".into()
537                                        => config.orchestratord_namespace.clone(),
538                                }),
539                                ..Default::default()
540                            }),
541                            pod_selector: Some(orchestratord_label_selector),
542                            ..Default::default()
543                        }]),
544                        ports: Some(vec![
545                            NetworkPolicyPort {
546                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
547                                protocol: Some("TCP".to_string()),
548                                ..Default::default()
549                            },
550                            NetworkPolicyPort {
551                                port: Some(IntOrString::Int(
552                                    config.environmentd_internal_http_port.into(),
553                                )),
554                                protocol: Some("TCP".to_string()),
555                                ..Default::default()
556                            },
557                        ]),
558                        ..Default::default()
559                    }]),
560                    pod_selector: Some(environmentd_label_selector),
561                    policy_types: Some(vec!["Ingress".to_owned()]),
562                    ..Default::default()
563                }),
564            },
565        ]);
566    }
567    if config.network_policies_ingress_enabled {
568        let mut ingress_label_selector = mz.default_labels();
569        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
570        network_policies.extend([NetworkPolicy {
571            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
572            spec: Some(NetworkPolicySpec {
573                ingress: Some(vec![NetworkPolicyIngressRule {
574                    from: Some(
575                        config
576                            .network_policies_ingress_cidrs
577                            .iter()
578                            .map(|cidr| NetworkPolicyPeer {
579                                ip_block: Some(IPBlock {
580                                    cidr: cidr.to_owned(),
581                                    except: None,
582                                }),
583                                ..Default::default()
584                            })
585                            .collect(),
586                    ),
587                    ports: Some(vec![
588                        NetworkPolicyPort {
589                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
590                            protocol: Some("TCP".to_string()),
591                            ..Default::default()
592                        },
593                        NetworkPolicyPort {
594                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
595                            protocol: Some("TCP".to_string()),
596                            ..Default::default()
597                        },
598                    ]),
599                    ..Default::default()
600                }]),
601                pod_selector: Some(LabelSelector {
602                    match_expressions: None,
603                    match_labels: Some(ingress_label_selector),
604                }),
605                policy_types: Some(vec!["Ingress".to_owned()]),
606                ..Default::default()
607            }),
608        }]);
609    }
610    if config.network_policies_egress_enabled {
611        network_policies.extend([NetworkPolicy {
612            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
613            spec: Some(NetworkPolicySpec {
614                egress: Some(vec![NetworkPolicyEgressRule {
615                    to: Some(
616                        config
617                            .network_policies_egress_cidrs
618                            .iter()
619                            .map(|cidr| NetworkPolicyPeer {
620                                ip_block: Some(IPBlock {
621                                    cidr: cidr.to_owned(),
622                                    except: None,
623                                }),
624                                ..Default::default()
625                            })
626                            .collect(),
627                    ),
628                    ..Default::default()
629                }]),
630                pod_selector: Some(LabelSelector {
631                    match_expressions: None,
632                    match_labels: Some(mz.default_labels()),
633                }),
634                policy_types: Some(vec!["Egress".to_owned()]),
635                ..Default::default()
636            }),
637        }]);
638    }
639    network_policies
640}
641
642fn create_service_account_object(
643    config: &super::Config,
644    mz: &Materialize,
645) -> Option<ServiceAccount> {
646    if mz.create_service_account() {
647        let mut annotations: BTreeMap<String, String> = mz
648            .spec
649            .service_account_annotations
650            .clone()
651            .unwrap_or_default();
652        if let (CloudProvider::Aws, Some(role_arn)) = (
653            config.cloud_provider,
654            mz.spec
655                .environmentd_iam_role_arn
656                .as_deref()
657                .or(config.environmentd_iam_role_arn.as_deref()),
658        ) {
659            warn!(
660                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
661            );
662            annotations.insert(
663                "eks.amazonaws.com/role-arn".to_string(),
664                role_arn.to_string(),
665            );
666        };
667
668        let mut labels = mz.default_labels();
669        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
670
671        Some(ServiceAccount {
672            metadata: ObjectMeta {
673                annotations: Some(annotations),
674                labels: Some(labels),
675                ..mz.managed_resource_meta(mz.service_account_name())
676            },
677            ..Default::default()
678        })
679    } else {
680        None
681    }
682}
683
684fn create_role_object(mz: &Materialize) -> Role {
685    Role {
686        metadata: mz.managed_resource_meta(mz.role_name()),
687        rules: Some(vec![
688            PolicyRule {
689                api_groups: Some(vec!["apps".to_string()]),
690                resources: Some(vec!["statefulsets".to_string()]),
691                verbs: vec![
692                    "get".to_string(),
693                    "list".to_string(),
694                    "watch".to_string(),
695                    "create".to_string(),
696                    "update".to_string(),
697                    "patch".to_string(),
698                    "delete".to_string(),
699                ],
700                ..Default::default()
701            },
702            PolicyRule {
703                api_groups: Some(vec!["".to_string()]),
704                resources: Some(vec![
705                    "persistentvolumeclaims".to_string(),
706                    "pods".to_string(),
707                    "secrets".to_string(),
708                    "services".to_string(),
709                ]),
710                verbs: vec![
711                    "get".to_string(),
712                    "list".to_string(),
713                    "watch".to_string(),
714                    "create".to_string(),
715                    "update".to_string(),
716                    "patch".to_string(),
717                    "delete".to_string(),
718                ],
719                ..Default::default()
720            },
721            PolicyRule {
722                api_groups: Some(vec!["".to_string()]),
723                resources: Some(vec!["configmaps".to_string()]),
724                verbs: vec!["get".to_string()],
725                ..Default::default()
726            },
727            PolicyRule {
728                api_groups: Some(vec!["materialize.cloud".to_string()]),
729                resources: Some(vec!["vpcendpoints".to_string()]),
730                verbs: vec![
731                    "get".to_string(),
732                    "list".to_string(),
733                    "watch".to_string(),
734                    "create".to_string(),
735                    "update".to_string(),
736                    "patch".to_string(),
737                    "delete".to_string(),
738                ],
739                ..Default::default()
740            },
741            PolicyRule {
742                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
743                resources: Some(vec!["pods".to_string()]),
744                verbs: vec!["get".to_string(), "list".to_string()],
745                ..Default::default()
746            },
747            PolicyRule {
748                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
749                resources: Some(vec![
750                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
751                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
752                ]),
753                verbs: vec!["get".to_string()],
754                ..Default::default()
755            },
756        ]),
757    }
758}
759
760fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
761    RoleBinding {
762        metadata: mz.managed_resource_meta(mz.role_binding_name()),
763        role_ref: RoleRef {
764            api_group: "".to_string(),
765            kind: "Role".to_string(),
766            name: mz.role_name(),
767        },
768        subjects: Some(vec![Subject {
769            api_group: Some("".to_string()),
770            kind: "ServiceAccount".to_string(),
771            name: mz.service_account_name(),
772            namespace: Some(mz.namespace()),
773        }]),
774    }
775}
776
777fn create_public_service_object(
778    config: &super::Config,
779    mz: &Materialize,
780    generation: u64,
781) -> Service {
782    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
783}
784
785fn create_generation_service_object(
786    config: &super::Config,
787    mz: &Materialize,
788    generation: u64,
789) -> Service {
790    create_base_service_object(
791        config,
792        mz,
793        generation,
794        &mz.environmentd_generation_service_name(generation),
795    )
796}
797
798fn create_base_service_object(
799    config: &super::Config,
800    mz: &Materialize,
801    generation: u64,
802    service_name: &str,
803) -> Service {
804    let ports = vec![
805        ServicePort {
806            port: config.environmentd_sql_port.into(),
807            protocol: Some("TCP".to_string()),
808            name: Some("sql".to_string()),
809            ..Default::default()
810        },
811        ServicePort {
812            port: config.environmentd_http_port.into(),
813            protocol: Some("TCP".to_string()),
814            name: Some("https".to_string()),
815            ..Default::default()
816        },
817        ServicePort {
818            port: config.environmentd_internal_sql_port.into(),
819            protocol: Some("TCP".to_string()),
820            name: Some("internal-sql".to_string()),
821            ..Default::default()
822        },
823        ServicePort {
824            port: config.environmentd_internal_http_port.into(),
825            protocol: Some("TCP".to_string()),
826            name: Some("internal-http".to_string()),
827            ..Default::default()
828        },
829    ];
830
831    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
832
833    let spec = ServiceSpec {
834        type_: Some("ClusterIP".to_string()),
835        cluster_ip: Some("None".to_string()),
836        selector: Some(selector),
837        ports: Some(ports),
838        ..Default::default()
839    };
840
841    Service {
842        metadata: mz.managed_resource_meta(service_name.to_string()),
843        spec: Some(spec),
844        status: None,
845    }
846}
847
848fn create_persist_pubsub_service(
849    config: &super::Config,
850    mz: &Materialize,
851    generation: u64,
852) -> Service {
853    Service {
854        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
855        spec: Some(ServiceSpec {
856            type_: Some("ClusterIP".to_string()),
857            cluster_ip: Some("None".to_string()),
858            selector: Some(btreemap! {
859                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
860            }),
861            ports: Some(vec![ServicePort {
862                name: Some("grpc".to_string()),
863                protocol: Some("TCP".to_string()),
864                port: config.environmentd_internal_persist_pubsub_port.into(),
865                ..Default::default()
866            }]),
867            ..Default::default()
868        }),
869        status: None,
870    }
871}
872
873fn create_environmentd_certificate(
874    config: &super::Config,
875    mz: &Materialize,
876) -> Option<Certificate> {
877    create_certificate(
878        config.default_certificate_specs.internal.clone(),
879        mz,
880        mz.spec.internal_certificate_spec.clone(),
881        mz.environmentd_certificate_name(),
882        mz.environmentd_certificate_secret_name(),
883        Some(vec![
884            mz.environmentd_service_name(),
885            mz.environmentd_service_internal_fqdn(),
886        ]),
887        CertificatePrivateKeyAlgorithm::Ed25519,
888        None,
889    )
890}
891
892fn create_environmentd_statefulset_object(
893    config: &super::Config,
894    mz: &Materialize,
895    generation: u64,
896) -> StatefulSet {
897    // IMPORTANT: Only pass secrets via environment variables. All other
898    // parameters should be passed as command line arguments, possibly gated
899    // with a `meets_minimum_version` call. This ensures typos cause
900    // `environmentd` to fail to start, rather than silently ignoring the
901    // configuration.
902    //
903    // When passing a secret, use a `SecretKeySelector` to forward a secret into
904    // the pod. Do *not* hardcode a secret as the value directly, as doing so
905    // will leak the secret to anyone with permission to describe the pod.
906    let mut env = vec![
907        EnvVar {
908            name: "MZ_METADATA_BACKEND_URL".to_string(),
909            value_from: Some(EnvVarSource {
910                secret_key_ref: Some(SecretKeySelector {
911                    name: mz.backend_secret_name(),
912                    key: "metadata_backend_url".to_string(),
913                    optional: Some(false),
914                }),
915                ..Default::default()
916            }),
917            ..Default::default()
918        },
919        EnvVar {
920            name: "MZ_PERSIST_BLOB_URL".to_string(),
921            value_from: Some(EnvVarSource {
922                secret_key_ref: Some(SecretKeySelector {
923                    name: mz.backend_secret_name(),
924                    key: "persist_backend_url".to_string(),
925                    optional: Some(false),
926                }),
927                ..Default::default()
928            }),
929            ..Default::default()
930        },
931    ];
932
933    env.push(EnvVar {
934        name: "AWS_REGION".to_string(),
935        value: Some(config.region.clone()),
936        ..Default::default()
937    });
938
939    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
940
941    let mut args = vec![];
942
943    if let Some(helm_chart_version) = &config.helm_chart_version {
944        args.push(format!("--helm-chart-version={helm_chart_version}"));
945    }
946
947    // Add environment ID argument.
948    args.push(format!(
949        "--environment-id={}",
950        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
951    ));
952
953    // Add clusterd image argument based on environmentd tag.
954    args.push(format!(
955        "--clusterd-image={}",
956        matching_image_from_environmentd_image_ref(
957            &mz.spec.environmentd_image_ref,
958            "clusterd",
959            None
960        )
961    ));
962
963    // Add cluster and storage host size arguments.
964    args.extend(
965        [
966            config
967                .environmentd_cluster_replica_sizes
968                .as_ref()
969                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
970            config
971                .bootstrap_default_cluster_replica_size
972                .as_ref()
973                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
974            config
975                .bootstrap_builtin_system_cluster_replica_size
976                .as_ref()
977                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
978            config
979                .bootstrap_builtin_probe_cluster_replica_size
980                .as_ref()
981                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
982            config
983                .bootstrap_builtin_support_cluster_replica_size
984                .as_ref()
985                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
986            config
987                .bootstrap_builtin_catalog_server_cluster_replica_size
988                .as_ref()
989                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
990            config
991                .bootstrap_builtin_analytics_cluster_replica_size
992                .as_ref()
993                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
994            config
995                .bootstrap_builtin_system_cluster_replication_factor
996                .as_ref()
997                .map(|replication_factor| {
998                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
999                }),
1000            config
1001                .bootstrap_builtin_probe_cluster_replication_factor
1002                .as_ref()
1003                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1004            config
1005                .bootstrap_builtin_support_cluster_replication_factor
1006                .as_ref()
1007                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1008            config
1009                .bootstrap_builtin_analytics_cluster_replication_factor
1010                .as_ref()
1011                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1012        ]
1013        .into_iter()
1014        .flatten(),
1015    );
1016
1017    args.extend(
1018        config
1019            .environmentd_allowed_origins
1020            .iter()
1021            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1022    );
1023
1024    args.push(format!(
1025        "--secrets-controller={}",
1026        config.secrets_controller
1027    ));
1028
1029    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1030        if let Ok(cluster_replica_sizes) =
1031            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1032        {
1033            let cluster_replica_sizes: Vec<_> =
1034                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1035            args.push(format!(
1036                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1037                cluster_replica_sizes.join("', '")
1038            ));
1039        }
1040    }
1041    if !config.cloud_provider.is_cloud() {
1042        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1043    }
1044
1045    if config.enable_internal_statement_logging {
1046        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1047    }
1048
1049    if config.disable_statement_logging {
1050        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1051    }
1052
1053    if !mz.spec.enable_rbac {
1054        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1055    }
1056
1057    // Add persist arguments.
1058
1059    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1060    args.push("--persist-isolated-runtime-threads=-1".to_string());
1061
1062    // Add AWS arguments.
1063    if config.cloud_provider == CloudProvider::Aws {
1064        if let Some(azs) = config.environmentd_availability_zones.as_ref() {
1065            for az in azs {
1066                args.push(format!("--availability-zone={az}"));
1067            }
1068        }
1069
1070        if let Some(environmentd_connection_role_arn) = mz
1071            .spec
1072            .environmentd_connection_role_arn
1073            .as_deref()
1074            .or(config.environmentd_connection_role_arn.as_deref())
1075        {
1076            args.push(format!(
1077                "--aws-connection-role-arn={}",
1078                environmentd_connection_role_arn
1079            ));
1080        }
1081        if let Some(account_id) = &config.aws_account_id {
1082            args.push(format!("--aws-account-id={account_id}"));
1083        }
1084
1085        args.extend([format!(
1086            "--aws-secrets-controller-tags=Environment={}",
1087            mz.name_unchecked()
1088        )]);
1089        args.extend_from_slice(&config.aws_secrets_controller_tags);
1090    }
1091
1092    // Add Kubernetes arguments.
1093    args.extend([
1094        "--orchestrator=kubernetes".into(),
1095        format!(
1096            "--orchestrator-kubernetes-service-account={}",
1097            &mz.service_account_name()
1098        ),
1099        format!(
1100            "--orchestrator-kubernetes-image-pull-policy={}",
1101            config.image_pull_policy.as_kebab_case_str(),
1102        ),
1103    ]);
1104    for selector in &config.clusterd_node_selector {
1105        args.push(format!(
1106            "--orchestrator-kubernetes-service-node-selector={}={}",
1107            selector.key, selector.value,
1108        ));
1109    }
1110    if mz.meets_minimum_version(&V144) {
1111        if let Some(affinity) = &config.clusterd_affinity {
1112            let affinity = serde_json::to_string(affinity).unwrap();
1113            args.push(format!(
1114                "--orchestrator-kubernetes-service-affinity={affinity}"
1115            ))
1116        }
1117        if let Some(tolerations) = &config.clusterd_tolerations {
1118            let tolerations = serde_json::to_string(tolerations).unwrap();
1119            args.push(format!(
1120                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1121            ))
1122        }
1123    }
1124    if let Some(scheduler_name) = &config.scheduler_name {
1125        args.push(format!(
1126            "--orchestrator-kubernetes-scheduler-name={}",
1127            scheduler_name
1128        ));
1129    }
1130    if mz.meets_minimum_version(&V154_DEV0) {
1131        args.extend(
1132            mz.spec
1133                .pod_annotations
1134                .as_ref()
1135                .map(|annotations| annotations.iter())
1136                .unwrap_or_default()
1137                .map(|(key, val)| {
1138                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1139                }),
1140        );
1141    }
1142    args.extend(
1143        mz.default_labels()
1144            .iter()
1145            .chain(
1146                mz.spec
1147                    .pod_labels
1148                    .as_ref()
1149                    .map(|labels| labels.iter())
1150                    .unwrap_or_default(),
1151            )
1152            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1153    );
1154    if let Some(status) = &mz.status {
1155        args.push(format!(
1156            "--orchestrator-kubernetes-name-prefix=mz{}-",
1157            status.resource_id
1158        ));
1159    }
1160
1161    // Add logging and tracing arguments.
1162    args.extend(["--log-format=json".into()]);
1163    if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
1164        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1165    }
1166    // --opentelemetry-resource also configures sentry tags
1167    args.extend([
1168        format!(
1169            "--opentelemetry-resource=organization_id={}",
1170            mz.spec.environment_id
1171        ),
1172        format!(
1173            "--opentelemetry-resource=environment_name={}",
1174            mz.name_unchecked()
1175        ),
1176    ]);
1177
1178    if let Some(segment_api_key) = &config.segment_api_key {
1179        args.push(format!("--segment-api-key={}", segment_api_key));
1180        if config.segment_client_side {
1181            args.push("--segment-client-side".into());
1182        }
1183    }
1184
1185    let mut volumes = Vec::new();
1186    let mut volume_mounts = Vec::new();
1187    if issuer_ref_defined(
1188        &config.default_certificate_specs.internal,
1189        &mz.spec.internal_certificate_spec,
1190    ) {
1191        volumes.push(Volume {
1192            name: "certificate".to_owned(),
1193            secret: Some(SecretVolumeSource {
1194                default_mode: Some(0o400),
1195                secret_name: Some(mz.environmentd_certificate_secret_name()),
1196                items: None,
1197                optional: Some(false),
1198            }),
1199            ..Default::default()
1200        });
1201        volume_mounts.push(VolumeMount {
1202            name: "certificate".to_owned(),
1203            mount_path: "/etc/materialized".to_owned(),
1204            read_only: Some(true),
1205            ..Default::default()
1206        });
1207        args.extend([
1208            "--tls-mode=require".into(),
1209            "--tls-cert=/etc/materialized/tls.crt".into(),
1210            "--tls-key=/etc/materialized/tls.key".into(),
1211        ]);
1212    } else {
1213        args.push("--tls-mode=disable".to_string());
1214    }
1215    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1216        args.push(format!(
1217            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1218            ephemeral_volume_class
1219        ));
1220    }
1221    // The `materialize` user used by clusterd always has gid 999.
1222    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1223
1224    // Add Sentry arguments.
1225    if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
1226        args.push(format!("--sentry-dsn={}", sentry_dsn));
1227        if let Some(sentry_environment) = &config.tracing.sentry_environment {
1228            args.push(format!("--sentry-environment={}", sentry_environment));
1229        }
1230        args.push(format!("--sentry-tag=region={}", config.region));
1231    }
1232
1233    // Add Persist PubSub arguments
1234    args.push(format!(
1235        "--persist-pubsub-url=http://{}:{}",
1236        mz.persist_pubsub_service_name(generation),
1237        config.environmentd_internal_persist_pubsub_port,
1238    ));
1239    args.push(format!(
1240        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1241        config.environmentd_internal_persist_pubsub_port
1242    ));
1243
1244    args.push(format!("--deploy-generation={}", generation));
1245
1246    // Add URL for internal user impersonation endpoint
1247    args.push(format!(
1248        "--internal-console-redirect-url={}",
1249        &config.internal_console_proxy_url,
1250    ));
1251
1252    if !config.collect_pod_metrics {
1253        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1254    }
1255    if config.enable_prometheus_scrape_annotations {
1256        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1257    }
1258
1259    // the --disable-license-key-checks environmentd flag only existed
1260    // between these versions
1261    if config.disable_license_key_checks {
1262        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1263            args.push("--disable-license-key-checks".into());
1264        }
1265    }
1266
1267    // as of version 0.153, the ability to disable license key checks was
1268    // removed, so we should always set up license keys in that case
1269    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1270        || mz.meets_minimum_version(&V153)
1271    {
1272        volume_mounts.push(VolumeMount {
1273            name: "license-key".to_string(),
1274            mount_path: "/license_key".to_string(),
1275            ..Default::default()
1276        });
1277        volumes.push(Volume {
1278            name: "license-key".to_string(),
1279            secret: Some(SecretVolumeSource {
1280                default_mode: Some(256),
1281                optional: Some(false),
1282                secret_name: Some(mz.backend_secret_name()),
1283                items: Some(vec![KeyToPath {
1284                    key: "license_key".to_string(),
1285                    path: "license_key".to_string(),
1286                    ..Default::default()
1287                }]),
1288                ..Default::default()
1289            }),
1290            ..Default::default()
1291        });
1292        env.push(EnvVar {
1293            name: "MZ_LICENSE_KEY".to_string(),
1294            value: Some("/license_key/license_key".to_string()),
1295            ..Default::default()
1296        });
1297    }
1298
1299    // Add user-specified extra arguments.
1300    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1301        args.extend(extra_args.iter().cloned());
1302    }
1303
1304    let probe = Probe {
1305        initial_delay_seconds: Some(1),
1306        failure_threshold: Some(12),
1307        tcp_socket: Some(TCPSocketAction {
1308            host: None,
1309            port: IntOrString::Int(config.environmentd_sql_port.into()),
1310        }),
1311        ..Default::default()
1312    };
1313
1314    let security_context = if config.enable_security_context {
1315        // Since we want to adhere to the most restrictive security context, all
1316        // of these fields have to be set how they are.
1317        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1318        Some(SecurityContext {
1319            run_as_non_root: Some(true),
1320            capabilities: Some(Capabilities {
1321                drop: Some(vec!["ALL".to_string()]),
1322                ..Default::default()
1323            }),
1324            seccomp_profile: Some(SeccompProfile {
1325                type_: "RuntimeDefault".to_string(),
1326                ..Default::default()
1327            }),
1328            allow_privilege_escalation: Some(false),
1329            ..Default::default()
1330        })
1331    } else {
1332        None
1333    };
1334
1335    let ports = vec![
1336        ContainerPort {
1337            container_port: config.environmentd_sql_port.into(),
1338            name: Some("sql".to_owned()),
1339            ..Default::default()
1340        },
1341        ContainerPort {
1342            container_port: config.environmentd_internal_sql_port.into(),
1343            name: Some("internal-sql".to_owned()),
1344            ..Default::default()
1345        },
1346        ContainerPort {
1347            container_port: config.environmentd_http_port.into(),
1348            name: Some("http".to_owned()),
1349            ..Default::default()
1350        },
1351        ContainerPort {
1352            container_port: config.environmentd_internal_http_port.into(),
1353            name: Some("internal-http".to_owned()),
1354            ..Default::default()
1355        },
1356        ContainerPort {
1357            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1358            name: Some("persist-pubsub".to_owned()),
1359            ..Default::default()
1360        },
1361    ];
1362
1363    // Add networking arguments.
1364    if mz.meets_minimum_version(&V147_DEV0) {
1365        volume_mounts.push(VolumeMount {
1366            name: "listeners-configmap".to_string(),
1367            mount_path: "/listeners".to_string(),
1368            ..Default::default()
1369        });
1370        volumes.push(Volume {
1371            name: "listeners-configmap".to_string(),
1372            config_map: Some(ConfigMapVolumeSource {
1373                name: mz.listeners_configmap_name(generation),
1374                default_mode: Some(256),
1375                optional: Some(false),
1376                items: Some(vec![KeyToPath {
1377                    key: "listeners.json".to_string(),
1378                    path: "listeners.json".to_string(),
1379                    ..Default::default()
1380                }]),
1381            }),
1382            ..Default::default()
1383        });
1384        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1385        if matches!(
1386            mz.spec.authenticator_kind,
1387            AuthenticatorKind::Password | AuthenticatorKind::Sasl
1388        ) {
1389            args.push("--system-parameter-default=enable_password_auth=true".into());
1390            env.push(EnvVar {
1391                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1392                value_from: Some(EnvVarSource {
1393                    secret_key_ref: Some(SecretKeySelector {
1394                        name: mz.backend_secret_name(),
1395                        key: "external_login_password_mz_system".to_string(),
1396                        optional: Some(false),
1397                    }),
1398                    ..Default::default()
1399                }),
1400                ..Default::default()
1401            })
1402        }
1403    } else {
1404        args.extend([
1405            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1406            format!(
1407                "--http-listen-addr=0.0.0.0:{}",
1408                config.environmentd_http_port
1409            ),
1410            format!(
1411                "--internal-sql-listen-addr=0.0.0.0:{}",
1412                config.environmentd_internal_sql_port
1413            ),
1414            format!(
1415                "--internal-http-listen-addr=0.0.0.0:{}",
1416                config.environmentd_internal_http_port
1417            ),
1418        ]);
1419    }
1420
1421    let container = Container {
1422        name: "environmentd".to_owned(),
1423        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1424        image_pull_policy: Some(config.image_pull_policy.to_string()),
1425        ports: Some(ports),
1426        args: Some(args),
1427        env: Some(env),
1428        volume_mounts: Some(volume_mounts),
1429        liveness_probe: Some(probe.clone()),
1430        readiness_probe: Some(probe),
1431        resources: mz
1432            .spec
1433            .environmentd_resource_requirements
1434            .clone()
1435            .or_else(|| config.environmentd_default_resources.clone()),
1436        security_context: security_context.clone(),
1437        ..Default::default()
1438    };
1439
1440    let mut pod_template_labels = mz.default_labels();
1441    pod_template_labels.insert(
1442        "materialize.cloud/name".to_owned(),
1443        mz.environmentd_statefulset_name(generation),
1444    );
1445    pod_template_labels.insert(
1446        "materialize.cloud/app".to_owned(),
1447        mz.environmentd_app_name(),
1448    );
1449    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1450    pod_template_labels.extend(
1451        mz.spec
1452            .pod_labels
1453            .as_ref()
1454            .map(|labels| labels.iter())
1455            .unwrap_or_default()
1456            .map(|(key, value)| (key.clone(), value.clone())),
1457    );
1458
1459    let mut pod_template_annotations = btreemap! {
1460        // We can re-enable eviction once we have HA
1461        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1462
1463        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1464        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1465        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1466        "materialize.cloud/generation".to_owned() => generation.to_string(),
1467    };
1468    if config.enable_prometheus_scrape_annotations {
1469        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1470        pod_template_annotations.insert(
1471            "prometheus.io/port".to_owned(),
1472            config.environmentd_internal_http_port.to_string(),
1473        );
1474        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1475        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1476        pod_template_annotations.insert(
1477            "materialize.prometheus.io/mz_usage_path".to_owned(),
1478            "/metrics/mz_usage".to_string(),
1479        );
1480        pod_template_annotations.insert(
1481            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1482            "/metrics/mz_frontier".to_string(),
1483        );
1484        pod_template_annotations.insert(
1485            "materialize.prometheus.io/mz_compute_path".to_owned(),
1486            "/metrics/mz_compute".to_string(),
1487        );
1488        pod_template_annotations.insert(
1489            "materialize.prometheus.io/mz_storage_path".to_owned(),
1490            "/metrics/mz_storage".to_string(),
1491        );
1492    }
1493    pod_template_annotations.extend(
1494        mz.spec
1495            .pod_annotations
1496            .as_ref()
1497            .map(|annotations| annotations.iter())
1498            .unwrap_or_default()
1499            .map(|(key, value)| (key.clone(), value.clone())),
1500    );
1501
1502    let mut tolerations = vec![
1503        // When the node becomes `NotReady` it indicates there is a problem with the node,
1504        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1505        // But we want to limit this to 30s for faster recovery
1506        Toleration {
1507            effect: Some("NoExecute".into()),
1508            key: Some("node.kubernetes.io/not-ready".into()),
1509            operator: Some("Exists".into()),
1510            toleration_seconds: Some(30),
1511            value: None,
1512        },
1513        Toleration {
1514            effect: Some("NoExecute".into()),
1515            key: Some("node.kubernetes.io/unreachable".into()),
1516            operator: Some("Exists".into()),
1517            toleration_seconds: Some(30),
1518            value: None,
1519        },
1520    ];
1521    if let Some(user_tolerations) = &config.environmentd_tolerations {
1522        tolerations.extend(user_tolerations.iter().cloned());
1523    }
1524    let tolerations = Some(tolerations);
1525
1526    let pod_template_spec = PodTemplateSpec {
1527        // not using managed_resource_meta because the pod should be owned
1528        // by the statefulset, not the materialize instance
1529        metadata: Some(ObjectMeta {
1530            labels: Some(pod_template_labels),
1531            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1532            ..Default::default()
1533        }),
1534        spec: Some(PodSpec {
1535            containers: vec![container],
1536            node_selector: Some(
1537                config
1538                    .environmentd_node_selector
1539                    .iter()
1540                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1541                    .collect(),
1542            ),
1543            affinity: config.environmentd_affinity.clone(),
1544            scheduler_name: config.scheduler_name.clone(),
1545            service_account_name: Some(mz.service_account_name()),
1546            volumes: Some(volumes),
1547            security_context: Some(PodSecurityContext {
1548                fs_group: Some(999),
1549                run_as_user: Some(999),
1550                run_as_group: Some(999),
1551                ..Default::default()
1552            }),
1553            tolerations,
1554            // This (apparently) has the side effect of automatically starting a new pod
1555            // when the previous pod is currently terminating. This side steps the statefulset fencing
1556            // but allows for quicker recovery from a failed node
1557            // The Kubernetes documentation strongly advises against this
1558            // setting, as StatefulSets attempt to provide "at most once"
1559            // semantics [0]-- that is, the guarantee that for a given pod in a
1560            // StatefulSet there is *at most* one pod with that identity running
1561            // in the cluster
1562            //
1563            // Materialize, however, has been carefully designed to *not* rely
1564            // on this guarantee. (In fact, we do not believe that correct
1565            // distributed systems can meaningfully rely on Kubernetes's
1566            // guarantee--network packets from a pod can be arbitrarily delayed,
1567            // long past that pod's termination.) Running two `environmentd`
1568            // processes is safe: the newer process will safely and correctly
1569            // fence out the older process via CockroachDB. In the future,
1570            // we'll support running multiple `environmentd` processes in
1571            // parallel for high availability.
1572            //
1573            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1574            termination_grace_period_seconds: Some(0),
1575            ..Default::default()
1576        }),
1577    };
1578
1579    let mut match_labels = BTreeMap::new();
1580    match_labels.insert(
1581        "materialize.cloud/name".to_owned(),
1582        mz.environmentd_statefulset_name(generation),
1583    );
1584
1585    let statefulset_spec = StatefulSetSpec {
1586        replicas: Some(1),
1587        template: pod_template_spec,
1588        service_name: Some(mz.environmentd_service_name()),
1589        selector: LabelSelector {
1590            match_expressions: None,
1591            match_labels: Some(match_labels),
1592        },
1593        ..Default::default()
1594    };
1595
1596    StatefulSet {
1597        metadata: ObjectMeta {
1598            annotations: Some(btreemap! {
1599                "materialize.cloud/generation".to_owned() => generation.to_string(),
1600                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1601            }),
1602            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1603        },
1604        spec: Some(statefulset_spec),
1605        status: None,
1606    }
1607}
1608
1609fn create_connection_info(
1610    config: &super::Config,
1611    mz: &Materialize,
1612    generation: u64,
1613) -> ConnectionInfo {
1614    let external_enable_tls = issuer_ref_defined(
1615        &config.default_certificate_specs.internal,
1616        &mz.spec.internal_certificate_spec,
1617    );
1618    let authenticator_kind = mz.spec.authenticator_kind;
1619
1620    let mut listeners_config = ListenersConfig {
1621        sql: btreemap! {
1622            "external".to_owned() => SqlListenerConfig{
1623                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1624                authenticator_kind,
1625                allowed_roles: AllowedRoles::Normal,
1626                enable_tls: external_enable_tls,
1627            },
1628            "internal".to_owned() => SqlListenerConfig{
1629                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1630                authenticator_kind: AuthenticatorKind::None,
1631                // Should this just be Internal?
1632                allowed_roles: AllowedRoles::NormalAndInternal,
1633                enable_tls: false,
1634            },
1635        },
1636        http: btreemap! {
1637            "external".to_owned() => HttpListenerConfig{
1638                base: BaseListenerConfig {
1639                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1640                    // SASL authentication is only supported for SQL (PostgreSQL wire protocol).
1641                    // HTTP listeners must use Password authentication when SASL is enabled.
1642                    // This is validated at environmentd startup via ListenerConfig::validate().
1643                    authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1644                        AuthenticatorKind::Password
1645                    } else {
1646                        authenticator_kind
1647                    },
1648                    allowed_roles: AllowedRoles::Normal,
1649                    enable_tls: external_enable_tls,
1650                },
1651                routes: HttpRoutesEnabled{
1652                    base: true,
1653                    webhook: true,
1654                    internal: false,
1655                    metrics: false,
1656                    profiling: false,
1657                }
1658            },
1659            "internal".to_owned() => HttpListenerConfig{
1660                base: BaseListenerConfig {
1661                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1662                    authenticator_kind: AuthenticatorKind::None,
1663                    // Should this just be Internal?
1664                    allowed_roles: AllowedRoles::NormalAndInternal,
1665                    enable_tls: false,
1666                },
1667                routes: HttpRoutesEnabled{
1668                    base: true,
1669                    webhook: true,
1670                    internal: true,
1671                    metrics: true,
1672                    profiling: true,
1673                }
1674            },
1675        },
1676    };
1677
1678    if matches!(
1679        authenticator_kind,
1680        AuthenticatorKind::Password | AuthenticatorKind::Sasl
1681    ) {
1682        listeners_config.sql.remove("internal");
1683        listeners_config.http.remove("internal");
1684
1685        listeners_config.sql.get_mut("external").map(|listener| {
1686            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1687            listener
1688        });
1689        listeners_config.http.get_mut("external").map(|listener| {
1690            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1691            listener.routes.internal = true;
1692            listener.routes.profiling = true;
1693            listener
1694        });
1695
1696        listeners_config.http.insert(
1697            "metrics".to_owned(),
1698            HttpListenerConfig {
1699                base: BaseListenerConfig {
1700                    addr: SocketAddr::new(
1701                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1702                        config.environmentd_internal_http_port,
1703                    ),
1704                    authenticator_kind: AuthenticatorKind::None,
1705                    allowed_roles: AllowedRoles::NormalAndInternal,
1706                    enable_tls: false,
1707                },
1708                routes: HttpRoutesEnabled {
1709                    base: false,
1710                    webhook: false,
1711                    internal: false,
1712                    metrics: true,
1713                    profiling: false,
1714                },
1715            },
1716        );
1717    }
1718
1719    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1720    let listeners_configmap = ConfigMap {
1721        binary_data: None,
1722        data: Some(btreemap! {
1723            "listeners.json".to_owned() => listeners_json,
1724        }),
1725        immutable: None,
1726        metadata: ObjectMeta {
1727            annotations: Some(btreemap! {
1728                "materialize.cloud/generation".to_owned() => generation.to_string(),
1729            }),
1730            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1731        },
1732    };
1733
1734    let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1735        AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1736            let scheme = if external_enable_tls { "https" } else { "http" };
1737            (
1738                scheme,
1739                config.environmentd_http_port,
1740                Some(mz.spec.backend_secret_name.clone()),
1741            )
1742        }
1743        _ => ("http", config.environmentd_internal_http_port, None),
1744    };
1745    let environmentd_url = format!(
1746        "{}://{}.{}.svc.cluster.local:{}",
1747        scheme,
1748        mz.environmentd_generation_service_name(generation),
1749        mz.namespace(),
1750        leader_api_port,
1751    );
1752    ConnectionInfo {
1753        environmentd_url,
1754        listeners_configmap,
1755        mz_system_secret_name,
1756    }
1757}
1758
1759// see materialize/src/environmentd/src/http.rs
1760#[derive(Debug, Deserialize, PartialEq, Eq)]
1761struct BecomeLeaderResponse {
1762    result: BecomeLeaderResult,
1763}
1764
1765#[derive(Debug, Deserialize, PartialEq, Eq)]
1766enum BecomeLeaderResult {
1767    Success,
1768    Failure { message: String },
1769}
1770
1771#[derive(Debug, Deserialize, PartialEq, Eq)]
1772struct SkipCatchupError {
1773    message: String,
1774}