mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec,
23            Probe, SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServiceAccount, ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
25        },
26        networking::v1::{
27            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
28            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
29        },
30        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
31    },
32    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
33};
34use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
35use maplit::btreemap;
36use mz_server_core::listeners::{
37    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
38    ListenersConfig, SqlListenerConfig,
39};
40use rand::{Rng, thread_rng};
41use reqwest::{Client as HttpClient, StatusCode};
42use semver::{BuildMetadata, Prerelease, Version};
43use serde::{Deserialize, Serialize};
44use sha2::{Digest, Sha256};
45use tracing::{error, trace, warn};
46
47use super::Error;
48use super::matching_image_from_environmentd_image_ref;
49use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
50use crate::k8s::{apply_resource, delete_resource, get_resource};
51use mz_cloud_provider::CloudProvider;
52use mz_cloud_resources::crd::generated::cert_manager::certificates::{
53    Certificate, CertificatePrivateKeyAlgorithm,
54};
55use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
56use mz_orchestrator_tracing::TracingCliArgs;
57use mz_ore::instrument;
58
59static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 140,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V143: Version = Version::new(0, 143, 0);
67const V144: Version = Version::new(0, 144, 0);
68static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
69    major: 0,
70    minor: 147,
71    patch: 0,
72    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
73    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
74});
75const V153: Version = Version::new(0, 153, 0);
76static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 0,
78    minor: 154,
79    patch: 0,
80    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83pub const V161: Version = Version::new(0, 161, 0);
84
85/// Describes the status of a deployment.
86///
87/// This is a simplified representation of `DeploymentState`, suitable for
88/// announcement to the external orchestrator.
89#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91    /// This deployment is not the leader. It is initializing and is not yet
92    /// ready to become the leader.
93    Initializing,
94    /// This deployment is not the leader, but it is ready to become the leader.
95    ReadyToPromote,
96    /// This deployment is in the process of becoming the leader.
97    Promoting,
98    /// This deployment is the leader.
99    IsLeader,
100}
101
102#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
103pub struct GetLeaderStatusResponse {
104    status: DeploymentStatus,
105}
106
107#[derive(Deserialize, Serialize)]
108pub struct LoginCredentials {
109    username: String,
110    // TODO Password type?
111    password: String,
112}
113
114#[derive(Debug, Serialize)]
115pub struct ConnectionInfo {
116    pub environmentd_url: String,
117    pub mz_system_secret_name: Option<String>,
118    pub listeners_configmap: ConfigMap,
119}
120
121#[derive(Debug, Serialize)]
122pub struct Resources {
123    pub generation: u64,
124    pub environmentd_network_policies: Vec<NetworkPolicy>,
125    pub service_account: Box<Option<ServiceAccount>>,
126    pub role: Box<Role>,
127    pub role_binding: Box<RoleBinding>,
128    pub public_service: Box<Service>,
129    pub generation_service: Box<Service>,
130    pub persist_pubsub_service: Box<Service>,
131    pub environmentd_certificate: Box<Option<Certificate>>,
132    pub environmentd_statefulset: Box<StatefulSet>,
133    pub connection_info: Box<ConnectionInfo>,
134}
135
136impl Resources {
137    pub fn new(
138        config: &super::MaterializeControllerArgs,
139        tracing: &TracingCliArgs,
140        orchestratord_namespace: &str,
141        mz: &Materialize,
142        generation: u64,
143    ) -> Self {
144        let environmentd_network_policies =
145            create_environmentd_network_policies(config, mz, orchestratord_namespace);
146
147        let service_account = Box::new(create_service_account_object(config, mz));
148        let role = Box::new(create_role_object(mz));
149        let role_binding = Box::new(create_role_binding_object(mz));
150        let public_service = Box::new(create_public_service_object(config, mz, generation));
151        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
152        let persist_pubsub_service =
153            Box::new(create_persist_pubsub_service(config, mz, generation));
154        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
155        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
156            config, tracing, mz, generation,
157        ));
158        let connection_info = Box::new(create_connection_info(config, mz, generation));
159
160        Self {
161            generation,
162            environmentd_network_policies,
163            service_account,
164            role,
165            role_binding,
166            public_service,
167            generation_service,
168            persist_pubsub_service,
169            environmentd_certificate,
170            environmentd_statefulset,
171            connection_info,
172        }
173    }
174
175    #[instrument]
176    pub async fn apply(
177        &self,
178        client: &Client,
179        force_promote: bool,
180        namespace: &str,
181    ) -> Result<Option<Action>, Error> {
182        let environmentd_network_policy_api: Api<NetworkPolicy> =
183            Api::namespaced(client.clone(), namespace);
184        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
185        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
186        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
187        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
188        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
189        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
190        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
191
192        for policy in &self.environmentd_network_policies {
193            trace!("applying network policy {}", policy.name_unchecked());
194            apply_resource(&environmentd_network_policy_api, policy).await?;
195        }
196
197        if let Some(service_account) = &*self.service_account {
198            trace!("applying environmentd service account");
199            apply_resource(&service_account_api, service_account).await?;
200        }
201
202        trace!("applying environmentd role");
203        apply_resource(&role_api, &*self.role).await?;
204
205        trace!("applying environmentd role binding");
206        apply_resource(&role_binding_api, &*self.role_binding).await?;
207
208        trace!("applying environmentd per-generation service");
209        apply_resource(&service_api, &*self.generation_service).await?;
210
211        trace!("creating persist pubsub service");
212        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
213
214        if let Some(certificate) = &*self.environmentd_certificate {
215            trace!("creating new environmentd certificate");
216            apply_resource(&certificate_api, certificate).await?;
217        }
218
219        trace!("applying listeners configmap");
220        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
221
222        trace!("creating new environmentd statefulset");
223        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
224
225        let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
226
227        let statefulset = get_resource(
228            &statefulset_api,
229            &self.environmentd_statefulset.name_unchecked(),
230        )
231        .await?;
232        if statefulset
233            .and_then(|statefulset| statefulset.status)
234            .and_then(|status| status.ready_replicas)
235            .unwrap_or(0)
236            == 0
237        {
238            trace!("environmentd statefulset is not ready yet...");
239            return Ok(Some(retry_action));
240        }
241
242        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
243            return Ok(Some(retry_action));
244        };
245        let status_url = reqwest::Url::parse(&format!(
246            "{}/api/leader/status",
247            self.connection_info.environmentd_url,
248        ))
249        .unwrap();
250
251        match http_client.get(status_url.clone()).send().await {
252            Ok(response) => {
253                let response: GetLeaderStatusResponse = match response.error_for_status() {
254                    Ok(response) => response.json().await?,
255                    Err(e) => {
256                        trace!("failed to get status of environmentd, retrying... ({e})");
257                        return Ok(Some(retry_action));
258                    }
259                };
260                if force_promote {
261                    trace!("skipping cluster catchup");
262                    let skip_catchup_url = reqwest::Url::parse(&format!(
263                        "{}/api/leader/skip-catchup",
264                        self.connection_info.environmentd_url,
265                    ))
266                    .unwrap();
267                    let response = http_client.post(skip_catchup_url).send().await?;
268                    if response.status() == StatusCode::BAD_REQUEST {
269                        let err: SkipCatchupError = response.json().await?;
270                        return Err(
271                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
272                        );
273                    }
274                } else {
275                    match response.status {
276                        DeploymentStatus::Initializing => {
277                            trace!("environmentd is still initializing, retrying...");
278                            return Ok(Some(retry_action));
279                        }
280                        DeploymentStatus::ReadyToPromote
281                        | DeploymentStatus::Promoting
282                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
283                    }
284                }
285            }
286            Err(e) => {
287                trace!("failed to connect to environmentd, retrying... ({e})");
288                return Ok(Some(retry_action));
289            }
290        }
291
292        Ok(None)
293    }
294
295    #[instrument]
296    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
297        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
298        Some(match &self.connection_info.mz_system_secret_name {
299            Some(mz_system_secret_name) => {
300                let http_client = reqwest::Client::builder()
301                    .timeout(std::time::Duration::from_secs(10))
302                    .cookie_store(true)
303                    // TODO add_root_certificate instead
304                    .danger_accept_invalid_certs(true)
305                    .build()
306                    .unwrap();
307
308                let secret = secret_api
309                    .get(mz_system_secret_name)
310                    .await
311                    .map_err(|e| {
312                        error!("Failed to get backend secret: {:?}", e);
313                        e
314                    })
315                    .ok()?;
316                if let Some(data) = secret.data {
317                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
318                        let password = String::from_utf8_lossy(&password.0).to_string();
319                        let login_url = reqwest::Url::parse(&format!(
320                            "{}/api/login",
321                            self.connection_info.environmentd_url,
322                        ))
323                        .unwrap();
324                        match http_client
325                            .post(login_url)
326                            .body(
327                                serde_json::to_string(&LoginCredentials {
328                                    username: "mz_system".to_owned(),
329                                    password,
330                                })
331                                .expect(
332                                    "Serializing a simple struct with utf8 strings doesn't fail.",
333                                ),
334                            )
335                            .header("Content-Type", "application/json")
336                            .send()
337                            .await
338                        {
339                            Ok(response) => {
340                                if let Err(e) = response.error_for_status() {
341                                    trace!("failed to login to environmentd, retrying... ({e})");
342                                    return None;
343                                }
344                            }
345                            Err(e) => {
346                                trace!("failed to connect to environmentd, retrying... ({e})");
347                                return None;
348                            }
349                        };
350                    }
351                };
352                http_client
353            }
354            None => reqwest::Client::builder()
355                .timeout(std::time::Duration::from_secs(10))
356                .build()
357                .unwrap(),
358        })
359    }
360
361    #[instrument]
362    pub async fn promote_services(
363        &self,
364        client: &Client,
365        namespace: &str,
366    ) -> Result<Option<Action>, Error> {
367        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
368        let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
369
370        let promote_url = reqwest::Url::parse(&format!(
371            "{}/api/leader/promote",
372            self.connection_info.environmentd_url,
373        ))
374        .unwrap();
375
376        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
377            return Ok(Some(retry_action));
378        };
379
380        trace!("promoting new environmentd to leader");
381        let response = http_client.post(promote_url).send().await?;
382        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
383        if let BecomeLeaderResult::Failure { message } = response.result {
384            return Err(Error::Anyhow(anyhow::anyhow!(
385                "failed to promote new environmentd: {message}"
386            )));
387        }
388
389        // A successful POST to the promotion endpoint only indicates
390        // that the promotion process was kicked off. It does not
391        // guarantee that the environment will be successfully promoted
392        // (e.g., if the environment crashes immediately after responding
393        // to the request, but before executing the takeover, the
394        // promotion will be lost).
395        //
396        // To guarantee the environment has been promoted successfully,
397        // we must wait to see at least one `IsLeader` status returned
398        // from the environment.
399
400        let status_url = reqwest::Url::parse(&format!(
401            "{}/api/leader/status",
402            self.connection_info.environmentd_url,
403        ))
404        .unwrap();
405        match http_client.get(status_url.clone()).send().await {
406            Ok(response) => {
407                let response: GetLeaderStatusResponse = response.json().await?;
408                if response.status != DeploymentStatus::IsLeader {
409                    trace!(
410                        "environmentd is still promoting (status: {:?}), retrying...",
411                        response.status
412                    );
413                    return Ok(Some(retry_action));
414                } else {
415                    trace!("environmentd is ready");
416                }
417            }
418            Err(e) => {
419                trace!("failed to connect to environmentd, retrying... ({e})");
420                return Ok(Some(retry_action));
421            }
422        }
423
424        trace!("applying environmentd public service");
425        apply_resource(&service_api, &*self.public_service).await?;
426
427        Ok(None)
428    }
429
430    #[instrument]
431    pub async fn teardown_generation(
432        &self,
433        client: &Client,
434        mz: &Materialize,
435        generation: u64,
436    ) -> Result<(), anyhow::Error> {
437        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
438        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
439        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
440
441        trace!("deleting environmentd statefulset for generation {generation}");
442        delete_resource(
443            &statefulset_api,
444            &mz.environmentd_statefulset_name(generation),
445        )
446        .await?;
447
448        trace!("deleting persist pubsub service for generation {generation}");
449        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
450
451        trace!("deleting environmentd per-generation service for generation {generation}");
452        delete_resource(
453            &service_api,
454            &mz.environmentd_generation_service_name(generation),
455        )
456        .await?;
457
458        trace!("deleting listeners configmap for generation {generation}");
459        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
460
461        Ok(())
462    }
463
464    // ideally we would just be able to hash the objects directly, but the
465    // generated kubernetes objects don't implement the Hash trait
466    pub fn generate_hash(&self) -> String {
467        let mut hasher = Sha256::new();
468        hasher.update(&serde_json::to_string(self).unwrap());
469        format!("{:x}", hasher.finalize())
470    }
471}
472
473fn create_environmentd_network_policies(
474    config: &super::MaterializeControllerArgs,
475    mz: &Materialize,
476    orchestratord_namespace: &str,
477) -> Vec<NetworkPolicy> {
478    let mut network_policies = Vec::new();
479    if config.network_policies.internal_enabled {
480        let environmentd_label_selector = LabelSelector {
481            match_labels: Some(
482                mz.default_labels()
483                    .into_iter()
484                    .chain([(
485                        "materialize.cloud/app".to_owned(),
486                        mz.environmentd_app_name(),
487                    )])
488                    .collect(),
489            ),
490            ..Default::default()
491        };
492        let orchestratord_label_selector = LabelSelector {
493            match_labels: Some(
494                config
495                    .orchestratord_pod_selector_labels
496                    .iter()
497                    .cloned()
498                    .map(|kv| (kv.key, kv.value))
499                    .collect(),
500            ),
501            ..Default::default()
502        };
503        // TODO (Alex) filter to just clusterd and environmentd,
504        // once we get a consistent set of labels for both.
505        let all_pods_label_selector = LabelSelector {
506            match_labels: Some(mz.default_labels()),
507            ..Default::default()
508        };
509        network_policies.extend([
510            // Allow all clusterd/environmentd traffic (between pods in the
511            // same environment)
512            NetworkPolicy {
513                metadata: mz
514                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
515                spec: Some(NetworkPolicySpec {
516                    egress: Some(vec![NetworkPolicyEgressRule {
517                        to: Some(vec![NetworkPolicyPeer {
518                            pod_selector: Some(all_pods_label_selector.clone()),
519                            ..Default::default()
520                        }]),
521                        ..Default::default()
522                    }]),
523                    ingress: Some(vec![NetworkPolicyIngressRule {
524                        from: Some(vec![NetworkPolicyPeer {
525                            pod_selector: Some(all_pods_label_selector.clone()),
526                            ..Default::default()
527                        }]),
528                        ..Default::default()
529                    }]),
530                    pod_selector: Some(all_pods_label_selector.clone()),
531                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
532                    ..Default::default()
533                }),
534            },
535            // Allow traffic from orchestratord to environmentd in order to hit
536            // the promotion endpoints during upgrades
537            NetworkPolicy {
538                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
539                spec: Some(NetworkPolicySpec {
540                    ingress: Some(vec![NetworkPolicyIngressRule {
541                        from: Some(vec![NetworkPolicyPeer {
542                            namespace_selector: Some(LabelSelector {
543                                match_labels: Some(btreemap! {
544                                    "kubernetes.io/metadata.name".into()
545                                        => orchestratord_namespace.into(),
546                                }),
547                                ..Default::default()
548                            }),
549                            pod_selector: Some(orchestratord_label_selector),
550                            ..Default::default()
551                        }]),
552                        ports: Some(vec![
553                            NetworkPolicyPort {
554                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
555                                protocol: Some("TCP".to_string()),
556                                ..Default::default()
557                            },
558                            NetworkPolicyPort {
559                                port: Some(IntOrString::Int(
560                                    config.environmentd_internal_http_port.into(),
561                                )),
562                                protocol: Some("TCP".to_string()),
563                                ..Default::default()
564                            },
565                        ]),
566                        ..Default::default()
567                    }]),
568                    pod_selector: Some(environmentd_label_selector),
569                    policy_types: Some(vec!["Ingress".to_owned()]),
570                    ..Default::default()
571                }),
572            },
573        ]);
574    }
575    if config.network_policies.ingress_enabled {
576        let mut ingress_label_selector = mz.default_labels();
577        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
578        network_policies.extend([NetworkPolicy {
579            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
580            spec: Some(NetworkPolicySpec {
581                ingress: Some(vec![NetworkPolicyIngressRule {
582                    from: Some(
583                        config
584                            .network_policies
585                            .ingress_cidrs
586                            .iter()
587                            .map(|cidr| NetworkPolicyPeer {
588                                ip_block: Some(IPBlock {
589                                    cidr: cidr.to_owned(),
590                                    except: None,
591                                }),
592                                ..Default::default()
593                            })
594                            .collect(),
595                    ),
596                    ports: Some(vec![
597                        NetworkPolicyPort {
598                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
599                            protocol: Some("TCP".to_string()),
600                            ..Default::default()
601                        },
602                        NetworkPolicyPort {
603                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
604                            protocol: Some("TCP".to_string()),
605                            ..Default::default()
606                        },
607                    ]),
608                    ..Default::default()
609                }]),
610                pod_selector: Some(LabelSelector {
611                    match_expressions: None,
612                    match_labels: Some(ingress_label_selector),
613                }),
614                policy_types: Some(vec!["Ingress".to_owned()]),
615                ..Default::default()
616            }),
617        }]);
618    }
619    if config.network_policies.egress_enabled {
620        network_policies.extend([NetworkPolicy {
621            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
622            spec: Some(NetworkPolicySpec {
623                egress: Some(vec![NetworkPolicyEgressRule {
624                    to: Some(
625                        config
626                            .network_policies
627                            .egress_cidrs
628                            .iter()
629                            .map(|cidr| NetworkPolicyPeer {
630                                ip_block: Some(IPBlock {
631                                    cidr: cidr.to_owned(),
632                                    except: None,
633                                }),
634                                ..Default::default()
635                            })
636                            .collect(),
637                    ),
638                    ..Default::default()
639                }]),
640                pod_selector: Some(LabelSelector {
641                    match_expressions: None,
642                    match_labels: Some(mz.default_labels()),
643                }),
644                policy_types: Some(vec!["Egress".to_owned()]),
645                ..Default::default()
646            }),
647        }]);
648    }
649    network_policies
650}
651
652fn create_service_account_object(
653    config: &super::MaterializeControllerArgs,
654    mz: &Materialize,
655) -> Option<ServiceAccount> {
656    if mz.create_service_account() {
657        let mut annotations: BTreeMap<String, String> = mz
658            .spec
659            .service_account_annotations
660            .clone()
661            .unwrap_or_default();
662        if let (CloudProvider::Aws, Some(role_arn)) = (
663            config.cloud_provider,
664            mz.spec
665                .environmentd_iam_role_arn
666                .as_deref()
667                .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
668        ) {
669            warn!(
670                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
671            );
672            annotations.insert(
673                "eks.amazonaws.com/role-arn".to_string(),
674                role_arn.to_string(),
675            );
676        };
677
678        let mut labels = mz.default_labels();
679        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
680
681        Some(ServiceAccount {
682            metadata: ObjectMeta {
683                annotations: Some(annotations),
684                labels: Some(labels),
685                ..mz.managed_resource_meta(mz.service_account_name())
686            },
687            ..Default::default()
688        })
689    } else {
690        None
691    }
692}
693
694fn create_role_object(mz: &Materialize) -> Role {
695    Role {
696        metadata: mz.managed_resource_meta(mz.role_name()),
697        rules: Some(vec![
698            PolicyRule {
699                api_groups: Some(vec!["apps".to_string()]),
700                resources: Some(vec!["statefulsets".to_string()]),
701                verbs: vec![
702                    "get".to_string(),
703                    "list".to_string(),
704                    "watch".to_string(),
705                    "create".to_string(),
706                    "update".to_string(),
707                    "patch".to_string(),
708                    "delete".to_string(),
709                ],
710                ..Default::default()
711            },
712            PolicyRule {
713                api_groups: Some(vec!["".to_string()]),
714                resources: Some(vec![
715                    "persistentvolumeclaims".to_string(),
716                    "pods".to_string(),
717                    "secrets".to_string(),
718                    "services".to_string(),
719                ]),
720                verbs: vec![
721                    "get".to_string(),
722                    "list".to_string(),
723                    "watch".to_string(),
724                    "create".to_string(),
725                    "update".to_string(),
726                    "patch".to_string(),
727                    "delete".to_string(),
728                ],
729                ..Default::default()
730            },
731            PolicyRule {
732                api_groups: Some(vec!["".to_string()]),
733                resources: Some(vec!["configmaps".to_string()]),
734                verbs: vec!["get".to_string()],
735                ..Default::default()
736            },
737            PolicyRule {
738                api_groups: Some(vec!["materialize.cloud".to_string()]),
739                resources: Some(vec!["vpcendpoints".to_string()]),
740                verbs: vec![
741                    "get".to_string(),
742                    "list".to_string(),
743                    "watch".to_string(),
744                    "create".to_string(),
745                    "update".to_string(),
746                    "patch".to_string(),
747                    "delete".to_string(),
748                ],
749                ..Default::default()
750            },
751            PolicyRule {
752                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
753                resources: Some(vec!["pods".to_string()]),
754                verbs: vec!["get".to_string(), "list".to_string()],
755                ..Default::default()
756            },
757            PolicyRule {
758                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
759                resources: Some(vec![
760                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
761                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
762                ]),
763                verbs: vec!["get".to_string()],
764                ..Default::default()
765            },
766        ]),
767    }
768}
769
770fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
771    RoleBinding {
772        metadata: mz.managed_resource_meta(mz.role_binding_name()),
773        role_ref: RoleRef {
774            api_group: "".to_string(),
775            kind: "Role".to_string(),
776            name: mz.role_name(),
777        },
778        subjects: Some(vec![Subject {
779            api_group: Some("".to_string()),
780            kind: "ServiceAccount".to_string(),
781            name: mz.service_account_name(),
782            namespace: Some(mz.namespace()),
783        }]),
784    }
785}
786
787fn create_public_service_object(
788    config: &super::MaterializeControllerArgs,
789    mz: &Materialize,
790    generation: u64,
791) -> Service {
792    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
793}
794
795fn create_generation_service_object(
796    config: &super::MaterializeControllerArgs,
797    mz: &Materialize,
798    generation: u64,
799) -> Service {
800    create_base_service_object(
801        config,
802        mz,
803        generation,
804        &mz.environmentd_generation_service_name(generation),
805    )
806}
807
808fn create_base_service_object(
809    config: &super::MaterializeControllerArgs,
810    mz: &Materialize,
811    generation: u64,
812    service_name: &str,
813) -> Service {
814    let ports = vec![
815        ServicePort {
816            port: config.environmentd_sql_port.into(),
817            protocol: Some("TCP".to_string()),
818            name: Some("sql".to_string()),
819            ..Default::default()
820        },
821        ServicePort {
822            port: config.environmentd_http_port.into(),
823            protocol: Some("TCP".to_string()),
824            name: Some("https".to_string()),
825            ..Default::default()
826        },
827        ServicePort {
828            port: config.environmentd_internal_sql_port.into(),
829            protocol: Some("TCP".to_string()),
830            name: Some("internal-sql".to_string()),
831            ..Default::default()
832        },
833        ServicePort {
834            port: config.environmentd_internal_http_port.into(),
835            protocol: Some("TCP".to_string()),
836            name: Some("internal-http".to_string()),
837            ..Default::default()
838        },
839    ];
840
841    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
842
843    let spec = ServiceSpec {
844        type_: Some("ClusterIP".to_string()),
845        cluster_ip: Some("None".to_string()),
846        selector: Some(selector),
847        ports: Some(ports),
848        ..Default::default()
849    };
850
851    Service {
852        metadata: mz.managed_resource_meta(service_name.to_string()),
853        spec: Some(spec),
854        status: None,
855    }
856}
857
858fn create_persist_pubsub_service(
859    config: &super::MaterializeControllerArgs,
860    mz: &Materialize,
861    generation: u64,
862) -> Service {
863    Service {
864        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
865        spec: Some(ServiceSpec {
866            type_: Some("ClusterIP".to_string()),
867            cluster_ip: Some("None".to_string()),
868            selector: Some(btreemap! {
869                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
870            }),
871            ports: Some(vec![ServicePort {
872                name: Some("grpc".to_string()),
873                protocol: Some("TCP".to_string()),
874                port: config.environmentd_internal_persist_pubsub_port.into(),
875                ..Default::default()
876            }]),
877            ..Default::default()
878        }),
879        status: None,
880    }
881}
882
883fn create_environmentd_certificate(
884    config: &super::MaterializeControllerArgs,
885    mz: &Materialize,
886) -> Option<Certificate> {
887    create_certificate(
888        config.default_certificate_specs.internal.clone(),
889        mz,
890        mz.spec.internal_certificate_spec.clone(),
891        mz.environmentd_certificate_name(),
892        mz.environmentd_certificate_secret_name(),
893        Some(vec![
894            mz.environmentd_service_name(),
895            mz.environmentd_service_internal_fqdn(),
896        ]),
897        CertificatePrivateKeyAlgorithm::Ed25519,
898        None,
899    )
900}
901
902fn create_environmentd_statefulset_object(
903    config: &super::MaterializeControllerArgs,
904    tracing: &TracingCliArgs,
905    mz: &Materialize,
906    generation: u64,
907) -> StatefulSet {
908    // IMPORTANT: Only pass secrets via environment variables. All other
909    // parameters should be passed as command line arguments, possibly gated
910    // with a `meets_minimum_version` call. This ensures typos cause
911    // `environmentd` to fail to start, rather than silently ignoring the
912    // configuration.
913    //
914    // When passing a secret, use a `SecretKeySelector` to forward a secret into
915    // the pod. Do *not* hardcode a secret as the value directly, as doing so
916    // will leak the secret to anyone with permission to describe the pod.
917    let mut env = vec![
918        EnvVar {
919            name: "MZ_METADATA_BACKEND_URL".to_string(),
920            value_from: Some(EnvVarSource {
921                secret_key_ref: Some(SecretKeySelector {
922                    name: mz.backend_secret_name(),
923                    key: "metadata_backend_url".to_string(),
924                    optional: Some(false),
925                }),
926                ..Default::default()
927            }),
928            ..Default::default()
929        },
930        EnvVar {
931            name: "MZ_PERSIST_BLOB_URL".to_string(),
932            value_from: Some(EnvVarSource {
933                secret_key_ref: Some(SecretKeySelector {
934                    name: mz.backend_secret_name(),
935                    key: "persist_backend_url".to_string(),
936                    optional: Some(false),
937                }),
938                ..Default::default()
939            }),
940            ..Default::default()
941        },
942    ];
943
944    env.push(EnvVar {
945        name: "AWS_REGION".to_string(),
946        value: Some(config.region.clone()),
947        ..Default::default()
948    });
949
950    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
951
952    let mut args = vec![];
953
954    if let Some(helm_chart_version) = &config.helm_chart_version {
955        args.push(format!("--helm-chart-version={helm_chart_version}"));
956    }
957
958    // Add environment ID argument.
959    args.push(format!(
960        "--environment-id={}",
961        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
962    ));
963
964    // Add clusterd image argument based on environmentd tag.
965    args.push(format!(
966        "--clusterd-image={}",
967        matching_image_from_environmentd_image_ref(
968            &mz.spec.environmentd_image_ref,
969            "clusterd",
970            None
971        )
972    ));
973
974    // Add cluster and storage host size arguments.
975    args.extend(
976        [
977            config
978                .environmentd_cluster_replica_sizes
979                .as_ref()
980                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
981            config
982                .bootstrap_default_cluster_replica_size
983                .as_ref()
984                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
985            config
986                .bootstrap_builtin_system_cluster_replica_size
987                .as_ref()
988                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
989            config
990                .bootstrap_builtin_probe_cluster_replica_size
991                .as_ref()
992                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
993            config
994                .bootstrap_builtin_support_cluster_replica_size
995                .as_ref()
996                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
997            config
998                .bootstrap_builtin_catalog_server_cluster_replica_size
999                .as_ref()
1000                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
1001            config
1002                .bootstrap_builtin_analytics_cluster_replica_size
1003                .as_ref()
1004                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
1005            config
1006                .bootstrap_builtin_system_cluster_replication_factor
1007                .as_ref()
1008                .map(|replication_factor| {
1009                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
1010                }),
1011            config
1012                .bootstrap_builtin_probe_cluster_replication_factor
1013                .as_ref()
1014                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
1015            config
1016                .bootstrap_builtin_support_cluster_replication_factor
1017                .as_ref()
1018                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
1019            config
1020                .bootstrap_builtin_analytics_cluster_replication_factor
1021                .as_ref()
1022                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
1023        ]
1024        .into_iter()
1025        .flatten(),
1026    );
1027
1028    args.extend(
1029        config
1030            .environmentd_allowed_origins
1031            .iter()
1032            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1033    );
1034
1035    args.push(format!(
1036        "--secrets-controller={}",
1037        config.secrets_controller
1038    ));
1039
1040    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1041        if let Ok(cluster_replica_sizes) =
1042            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1043        {
1044            let cluster_replica_sizes: Vec<_> =
1045                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1046            args.push(format!(
1047                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1048                cluster_replica_sizes.join("', '")
1049            ));
1050        }
1051    }
1052    if !config.cloud_provider.is_cloud() {
1053        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1054    }
1055
1056    if config.enable_internal_statement_logging {
1057        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1058    }
1059
1060    if config.disable_statement_logging {
1061        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1062    }
1063
1064    if !mz.spec.enable_rbac {
1065        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1066    }
1067
1068    // Add persist arguments.
1069
1070    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1071    args.push("--persist-isolated-runtime-threads=-1".to_string());
1072
1073    // Add AWS arguments.
1074    if config.cloud_provider == CloudProvider::Aws {
1075        if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1076            for az in azs {
1077                args.push(format!("--availability-zone={az}"));
1078            }
1079        }
1080
1081        if let Some(environmentd_connection_role_arn) = mz
1082            .spec
1083            .environmentd_connection_role_arn
1084            .as_deref()
1085            .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1086        {
1087            args.push(format!(
1088                "--aws-connection-role-arn={}",
1089                environmentd_connection_role_arn
1090            ));
1091        }
1092        if let Some(account_id) = &config.aws_info.aws_account_id {
1093            args.push(format!("--aws-account-id={account_id}"));
1094        }
1095
1096        args.extend([format!(
1097            "--aws-secrets-controller-tags=Environment={}",
1098            mz.name_unchecked()
1099        )]);
1100        args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1101    }
1102
1103    // Add Kubernetes arguments.
1104    args.extend([
1105        "--orchestrator=kubernetes".into(),
1106        format!(
1107            "--orchestrator-kubernetes-service-account={}",
1108            &mz.service_account_name()
1109        ),
1110        format!(
1111            "--orchestrator-kubernetes-image-pull-policy={}",
1112            config.image_pull_policy.as_kebab_case_str(),
1113        ),
1114    ]);
1115    for selector in &config.clusterd_node_selector {
1116        args.push(format!(
1117            "--orchestrator-kubernetes-service-node-selector={}={}",
1118            selector.key, selector.value,
1119        ));
1120    }
1121    if mz.meets_minimum_version(&V144) {
1122        if let Some(affinity) = &config.clusterd_affinity {
1123            let affinity = serde_json::to_string(affinity).unwrap();
1124            args.push(format!(
1125                "--orchestrator-kubernetes-service-affinity={affinity}"
1126            ))
1127        }
1128        if let Some(tolerations) = &config.clusterd_tolerations {
1129            let tolerations = serde_json::to_string(tolerations).unwrap();
1130            args.push(format!(
1131                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1132            ))
1133        }
1134    }
1135    if let Some(scheduler_name) = &config.scheduler_name {
1136        args.push(format!(
1137            "--orchestrator-kubernetes-scheduler-name={}",
1138            scheduler_name
1139        ));
1140    }
1141    if mz.meets_minimum_version(&V154_DEV0) {
1142        args.extend(
1143            mz.spec
1144                .pod_annotations
1145                .as_ref()
1146                .map(|annotations| annotations.iter())
1147                .unwrap_or_default()
1148                .map(|(key, val)| {
1149                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1150                }),
1151        );
1152    }
1153    args.extend(
1154        mz.default_labels()
1155            .iter()
1156            .chain(
1157                mz.spec
1158                    .pod_labels
1159                    .as_ref()
1160                    .map(|labels| labels.iter())
1161                    .unwrap_or_default(),
1162            )
1163            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1164    );
1165    if let Some(status) = &mz.status {
1166        args.push(format!(
1167            "--orchestrator-kubernetes-name-prefix=mz{}-",
1168            status.resource_id
1169        ));
1170    }
1171
1172    // Add logging and tracing arguments.
1173    args.extend(["--log-format=json".into()]);
1174    if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1175        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1176    }
1177    // --opentelemetry-resource also configures sentry tags
1178    args.extend([
1179        format!(
1180            "--opentelemetry-resource=organization_id={}",
1181            mz.spec.environment_id
1182        ),
1183        format!(
1184            "--opentelemetry-resource=environment_name={}",
1185            mz.name_unchecked()
1186        ),
1187    ]);
1188
1189    if let Some(segment_api_key) = &config.segment_api_key {
1190        args.push(format!("--segment-api-key={}", segment_api_key));
1191        if config.segment_client_side {
1192            args.push("--segment-client-side".into());
1193        }
1194    }
1195
1196    let mut volumes = Vec::new();
1197    let mut volume_mounts = Vec::new();
1198    if issuer_ref_defined(
1199        &config.default_certificate_specs.internal,
1200        &mz.spec.internal_certificate_spec,
1201    ) {
1202        volumes.push(Volume {
1203            name: "certificate".to_owned(),
1204            secret: Some(SecretVolumeSource {
1205                default_mode: Some(0o400),
1206                secret_name: Some(mz.environmentd_certificate_secret_name()),
1207                items: None,
1208                optional: Some(false),
1209            }),
1210            ..Default::default()
1211        });
1212        volume_mounts.push(VolumeMount {
1213            name: "certificate".to_owned(),
1214            mount_path: "/etc/materialized".to_owned(),
1215            read_only: Some(true),
1216            ..Default::default()
1217        });
1218        args.extend([
1219            "--tls-mode=require".into(),
1220            "--tls-cert=/etc/materialized/tls.crt".into(),
1221            "--tls-key=/etc/materialized/tls.key".into(),
1222        ]);
1223    } else {
1224        args.push("--tls-mode=disable".to_string());
1225    }
1226    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1227        args.push(format!(
1228            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1229            ephemeral_volume_class
1230        ));
1231    }
1232    // The `materialize` user used by clusterd always has gid 999.
1233    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1234
1235    // Add Sentry arguments.
1236    if let Some(sentry_dsn) = &tracing.sentry_dsn {
1237        args.push(format!("--sentry-dsn={}", sentry_dsn));
1238        if let Some(sentry_environment) = &tracing.sentry_environment {
1239            args.push(format!("--sentry-environment={}", sentry_environment));
1240        }
1241        args.push(format!("--sentry-tag=region={}", config.region));
1242    }
1243
1244    // Add Persist PubSub arguments
1245    args.push(format!(
1246        "--persist-pubsub-url=http://{}:{}",
1247        mz.persist_pubsub_service_name(generation),
1248        config.environmentd_internal_persist_pubsub_port,
1249    ));
1250    args.push(format!(
1251        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1252        config.environmentd_internal_persist_pubsub_port
1253    ));
1254
1255    args.push(format!("--deploy-generation={}", generation));
1256
1257    // Add URL for internal user impersonation endpoint
1258    args.push(format!(
1259        "--internal-console-redirect-url={}",
1260        &config.internal_console_proxy_url,
1261    ));
1262
1263    if !config.collect_pod_metrics {
1264        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1265    }
1266    if config.enable_prometheus_scrape_annotations {
1267        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1268    }
1269
1270    // the --disable-license-key-checks environmentd flag only existed
1271    // between these versions
1272    if config.disable_license_key_checks {
1273        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1274            args.push("--disable-license-key-checks".into());
1275        }
1276    }
1277
1278    // as of version 0.153, the ability to disable license key checks was
1279    // removed, so we should always set up license keys in that case
1280    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1281        || mz.meets_minimum_version(&V153)
1282    {
1283        volume_mounts.push(VolumeMount {
1284            name: "license-key".to_string(),
1285            mount_path: "/license_key".to_string(),
1286            ..Default::default()
1287        });
1288        volumes.push(Volume {
1289            name: "license-key".to_string(),
1290            secret: Some(SecretVolumeSource {
1291                default_mode: Some(256),
1292                optional: Some(false),
1293                secret_name: Some(mz.backend_secret_name()),
1294                items: Some(vec![KeyToPath {
1295                    key: "license_key".to_string(),
1296                    path: "license_key".to_string(),
1297                    ..Default::default()
1298                }]),
1299                ..Default::default()
1300            }),
1301            ..Default::default()
1302        });
1303        env.push(EnvVar {
1304            name: "MZ_LICENSE_KEY".to_string(),
1305            value: Some("/license_key/license_key".to_string()),
1306            ..Default::default()
1307        });
1308    }
1309
1310    // Add user-specified extra arguments.
1311    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1312        args.extend(extra_args.iter().cloned());
1313    }
1314
1315    let probe = Probe {
1316        initial_delay_seconds: Some(1),
1317        failure_threshold: Some(12),
1318        http_get: Some(HTTPGetAction {
1319            path: Some("/api/readyz".to_string()),
1320            port: IntOrString::Int(config.environmentd_internal_http_port.into()),
1321            host: None,
1322            scheme: None,
1323            http_headers: None,
1324        }),
1325        ..Default::default()
1326    };
1327
1328    let security_context = if config.enable_security_context {
1329        // Since we want to adhere to the most restrictive security context, all
1330        // of these fields have to be set how they are.
1331        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1332        Some(SecurityContext {
1333            run_as_non_root: Some(true),
1334            capabilities: Some(Capabilities {
1335                drop: Some(vec!["ALL".to_string()]),
1336                ..Default::default()
1337            }),
1338            seccomp_profile: Some(SeccompProfile {
1339                type_: "RuntimeDefault".to_string(),
1340                ..Default::default()
1341            }),
1342            allow_privilege_escalation: Some(false),
1343            ..Default::default()
1344        })
1345    } else {
1346        None
1347    };
1348
1349    let ports = vec![
1350        ContainerPort {
1351            container_port: config.environmentd_sql_port.into(),
1352            name: Some("sql".to_owned()),
1353            ..Default::default()
1354        },
1355        ContainerPort {
1356            container_port: config.environmentd_internal_sql_port.into(),
1357            name: Some("internal-sql".to_owned()),
1358            ..Default::default()
1359        },
1360        ContainerPort {
1361            container_port: config.environmentd_http_port.into(),
1362            name: Some("http".to_owned()),
1363            ..Default::default()
1364        },
1365        ContainerPort {
1366            container_port: config.environmentd_internal_http_port.into(),
1367            name: Some("internal-http".to_owned()),
1368            ..Default::default()
1369        },
1370        ContainerPort {
1371            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1372            name: Some("persist-pubsub".to_owned()),
1373            ..Default::default()
1374        },
1375    ];
1376
1377    // Add networking arguments.
1378    if mz.meets_minimum_version(&V147_DEV0) {
1379        volume_mounts.push(VolumeMount {
1380            name: "listeners-configmap".to_string(),
1381            mount_path: "/listeners".to_string(),
1382            ..Default::default()
1383        });
1384        volumes.push(Volume {
1385            name: "listeners-configmap".to_string(),
1386            config_map: Some(ConfigMapVolumeSource {
1387                name: mz.listeners_configmap_name(generation),
1388                default_mode: Some(256),
1389                optional: Some(false),
1390                items: Some(vec![KeyToPath {
1391                    key: "listeners.json".to_string(),
1392                    path: "listeners.json".to_string(),
1393                    ..Default::default()
1394                }]),
1395            }),
1396            ..Default::default()
1397        });
1398        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1399        if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1400            args.push("--system-parameter-default=enable_password_auth=true".into());
1401            env.push(EnvVar {
1402                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1403                value_from: Some(EnvVarSource {
1404                    secret_key_ref: Some(SecretKeySelector {
1405                        name: mz.backend_secret_name(),
1406                        key: "external_login_password_mz_system".to_string(),
1407                        optional: Some(false),
1408                    }),
1409                    ..Default::default()
1410                }),
1411                ..Default::default()
1412            })
1413        }
1414    } else {
1415        args.extend([
1416            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1417            format!(
1418                "--http-listen-addr=0.0.0.0:{}",
1419                config.environmentd_http_port
1420            ),
1421            format!(
1422                "--internal-sql-listen-addr=0.0.0.0:{}",
1423                config.environmentd_internal_sql_port
1424            ),
1425            format!(
1426                "--internal-http-listen-addr=0.0.0.0:{}",
1427                config.environmentd_internal_http_port
1428            ),
1429        ]);
1430    }
1431
1432    let container = Container {
1433        name: "environmentd".to_owned(),
1434        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1435        image_pull_policy: Some(config.image_pull_policy.to_string()),
1436        ports: Some(ports),
1437        args: Some(args),
1438        env: Some(env),
1439        volume_mounts: Some(volume_mounts),
1440        liveness_probe: Some(probe.clone()),
1441        readiness_probe: Some(probe),
1442        resources: mz
1443            .spec
1444            .environmentd_resource_requirements
1445            .clone()
1446            .or_else(|| config.environmentd_default_resources.clone()),
1447        security_context: security_context.clone(),
1448        ..Default::default()
1449    };
1450
1451    let mut pod_template_labels = mz.default_labels();
1452    pod_template_labels.insert(
1453        "materialize.cloud/name".to_owned(),
1454        mz.environmentd_statefulset_name(generation),
1455    );
1456    pod_template_labels.insert(
1457        "materialize.cloud/app".to_owned(),
1458        mz.environmentd_app_name(),
1459    );
1460    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1461    pod_template_labels.extend(
1462        mz.spec
1463            .pod_labels
1464            .as_ref()
1465            .map(|labels| labels.iter())
1466            .unwrap_or_default()
1467            .map(|(key, value)| (key.clone(), value.clone())),
1468    );
1469
1470    let mut pod_template_annotations = btreemap! {
1471        // We can re-enable eviction once we have HA
1472        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1473
1474        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1475        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1476        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1477        "materialize.cloud/generation".to_owned() => generation.to_string(),
1478    };
1479    if config.enable_prometheus_scrape_annotations {
1480        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1481        pod_template_annotations.insert(
1482            "prometheus.io/port".to_owned(),
1483            config.environmentd_internal_http_port.to_string(),
1484        );
1485        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1486        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1487        pod_template_annotations.insert(
1488            "materialize.prometheus.io/mz_usage_path".to_owned(),
1489            "/metrics/mz_usage".to_string(),
1490        );
1491        pod_template_annotations.insert(
1492            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1493            "/metrics/mz_frontier".to_string(),
1494        );
1495        pod_template_annotations.insert(
1496            "materialize.prometheus.io/mz_compute_path".to_owned(),
1497            "/metrics/mz_compute".to_string(),
1498        );
1499        pod_template_annotations.insert(
1500            "materialize.prometheus.io/mz_storage_path".to_owned(),
1501            "/metrics/mz_storage".to_string(),
1502        );
1503    }
1504    pod_template_annotations.extend(
1505        mz.spec
1506            .pod_annotations
1507            .as_ref()
1508            .map(|annotations| annotations.iter())
1509            .unwrap_or_default()
1510            .map(|(key, value)| (key.clone(), value.clone())),
1511    );
1512
1513    let mut tolerations = vec![
1514        // When the node becomes `NotReady` it indicates there is a problem with the node,
1515        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1516        // But we want to limit this to 30s for faster recovery
1517        Toleration {
1518            effect: Some("NoExecute".into()),
1519            key: Some("node.kubernetes.io/not-ready".into()),
1520            operator: Some("Exists".into()),
1521            toleration_seconds: Some(30),
1522            value: None,
1523        },
1524        Toleration {
1525            effect: Some("NoExecute".into()),
1526            key: Some("node.kubernetes.io/unreachable".into()),
1527            operator: Some("Exists".into()),
1528            toleration_seconds: Some(30),
1529            value: None,
1530        },
1531    ];
1532    if let Some(user_tolerations) = &config.environmentd_tolerations {
1533        tolerations.extend(user_tolerations.iter().cloned());
1534    }
1535    let tolerations = Some(tolerations);
1536
1537    let pod_template_spec = PodTemplateSpec {
1538        // not using managed_resource_meta because the pod should be owned
1539        // by the statefulset, not the materialize instance
1540        metadata: Some(ObjectMeta {
1541            labels: Some(pod_template_labels),
1542            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1543            ..Default::default()
1544        }),
1545        spec: Some(PodSpec {
1546            containers: vec![container],
1547            node_selector: Some(
1548                config
1549                    .environmentd_node_selector
1550                    .iter()
1551                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1552                    .collect(),
1553            ),
1554            affinity: config.environmentd_affinity.clone(),
1555            scheduler_name: config.scheduler_name.clone(),
1556            service_account_name: Some(mz.service_account_name()),
1557            volumes: Some(volumes),
1558            security_context: Some(PodSecurityContext {
1559                fs_group: Some(999),
1560                run_as_user: Some(999),
1561                run_as_group: Some(999),
1562                ..Default::default()
1563            }),
1564            tolerations,
1565            // This (apparently) has the side effect of automatically starting a new pod
1566            // when the previous pod is currently terminating. This side steps the statefulset fencing
1567            // but allows for quicker recovery from a failed node
1568            // The Kubernetes documentation strongly advises against this
1569            // setting, as StatefulSets attempt to provide "at most once"
1570            // semantics [0]-- that is, the guarantee that for a given pod in a
1571            // StatefulSet there is *at most* one pod with that identity running
1572            // in the cluster
1573            //
1574            // Materialize, however, has been carefully designed to *not* rely
1575            // on this guarantee. (In fact, we do not believe that correct
1576            // distributed systems can meaningfully rely on Kubernetes's
1577            // guarantee--network packets from a pod can be arbitrarily delayed,
1578            // long past that pod's termination.) Running two `environmentd`
1579            // processes is safe: the newer process will safely and correctly
1580            // fence out the older process via CockroachDB. In the future,
1581            // we'll support running multiple `environmentd` processes in
1582            // parallel for high availability.
1583            //
1584            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1585            termination_grace_period_seconds: Some(0),
1586            ..Default::default()
1587        }),
1588    };
1589
1590    let mut match_labels = BTreeMap::new();
1591    match_labels.insert(
1592        "materialize.cloud/name".to_owned(),
1593        mz.environmentd_statefulset_name(generation),
1594    );
1595
1596    let statefulset_spec = StatefulSetSpec {
1597        replicas: Some(1),
1598        template: pod_template_spec,
1599        update_strategy: Some(StatefulSetUpdateStrategy {
1600            rolling_update: None,
1601            type_: Some("OnDelete".to_owned()),
1602        }),
1603        service_name: Some(mz.environmentd_service_name()),
1604        selector: LabelSelector {
1605            match_expressions: None,
1606            match_labels: Some(match_labels),
1607        },
1608        ..Default::default()
1609    };
1610
1611    StatefulSet {
1612        metadata: ObjectMeta {
1613            annotations: Some(btreemap! {
1614                "materialize.cloud/generation".to_owned() => generation.to_string(),
1615                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1616            }),
1617            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1618        },
1619        spec: Some(statefulset_spec),
1620        status: None,
1621    }
1622}
1623
1624fn create_connection_info(
1625    config: &super::MaterializeControllerArgs,
1626    mz: &Materialize,
1627    generation: u64,
1628) -> ConnectionInfo {
1629    let external_enable_tls = issuer_ref_defined(
1630        &config.default_certificate_specs.internal,
1631        &mz.spec.internal_certificate_spec,
1632    );
1633    let authenticator_kind = mz.spec.authenticator_kind;
1634    let mut listeners_config = ListenersConfig {
1635        sql: btreemap! {
1636            "external".to_owned() => SqlListenerConfig{
1637                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1638                authenticator_kind,
1639                allowed_roles: AllowedRoles::Normal,
1640                enable_tls: external_enable_tls,
1641            },
1642            "internal".to_owned() => SqlListenerConfig{
1643                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1644                authenticator_kind: AuthenticatorKind::None,
1645                // Should this just be Internal?
1646                allowed_roles: AllowedRoles::NormalAndInternal,
1647                enable_tls: false,
1648            },
1649        },
1650        http: btreemap! {
1651            "external".to_owned() => HttpListenerConfig{
1652                base: BaseListenerConfig {
1653                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1654                    authenticator_kind,
1655                    allowed_roles: AllowedRoles::Normal,
1656                    enable_tls: external_enable_tls,
1657                },
1658                routes: HttpRoutesEnabled{
1659                    base: true,
1660                    webhook: true,
1661                    internal: false,
1662                    metrics: false,
1663                    profiling: false,
1664                }
1665            },
1666            "internal".to_owned() => HttpListenerConfig{
1667                base: BaseListenerConfig {
1668                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1669                    authenticator_kind: AuthenticatorKind::None,
1670                    // Should this just be Internal?
1671                    allowed_roles: AllowedRoles::NormalAndInternal,
1672                    enable_tls: false,
1673                },
1674                routes: HttpRoutesEnabled{
1675                    base: true,
1676                    webhook: true,
1677                    internal: true,
1678                    metrics: true,
1679                    profiling: true,
1680                }
1681            },
1682        },
1683    };
1684    if authenticator_kind == AuthenticatorKind::Password {
1685        listeners_config.sql.remove("internal");
1686        listeners_config.http.remove("internal");
1687
1688        listeners_config.sql.get_mut("external").map(|listener| {
1689            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1690            listener
1691        });
1692        listeners_config.http.get_mut("external").map(|listener| {
1693            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1694            listener.routes.internal = true;
1695            listener.routes.profiling = true;
1696            listener
1697        });
1698
1699        listeners_config.http.insert(
1700            "metrics".to_owned(),
1701            HttpListenerConfig {
1702                base: BaseListenerConfig {
1703                    addr: SocketAddr::new(
1704                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1705                        config.environmentd_internal_http_port,
1706                    ),
1707                    authenticator_kind: AuthenticatorKind::None,
1708                    allowed_roles: AllowedRoles::NormalAndInternal,
1709                    enable_tls: false,
1710                },
1711                routes: HttpRoutesEnabled {
1712                    base: false,
1713                    webhook: false,
1714                    internal: false,
1715                    metrics: true,
1716                    profiling: false,
1717                },
1718            },
1719        );
1720    }
1721
1722    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1723    let listeners_configmap = ConfigMap {
1724        binary_data: None,
1725        data: Some(btreemap! {
1726            "listeners.json".to_owned() => listeners_json,
1727        }),
1728        immutable: None,
1729        metadata: ObjectMeta {
1730            annotations: Some(btreemap! {
1731                "materialize.cloud/generation".to_owned() => generation.to_string(),
1732            }),
1733            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1734        },
1735    };
1736
1737    let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1738        AuthenticatorKind::Password => {
1739            let scheme = if external_enable_tls { "https" } else { "http" };
1740            (
1741                scheme,
1742                config.environmentd_http_port,
1743                Some(mz.spec.backend_secret_name.clone()),
1744            )
1745        }
1746        _ => ("http", config.environmentd_internal_http_port, None),
1747    };
1748    let environmentd_url = format!(
1749        "{}://{}.{}.svc.cluster.local:{}",
1750        scheme,
1751        mz.environmentd_generation_service_name(generation),
1752        mz.namespace(),
1753        leader_api_port,
1754    );
1755    ConnectionInfo {
1756        environmentd_url,
1757        listeners_configmap,
1758        mz_system_secret_name,
1759    }
1760}
1761
1762// see materialize/src/environmentd/src/http.rs
1763#[derive(Debug, Deserialize, PartialEq, Eq)]
1764struct BecomeLeaderResponse {
1765    result: BecomeLeaderResult,
1766}
1767
1768#[derive(Debug, Deserialize, PartialEq, Eq)]
1769enum BecomeLeaderResult {
1770    Success,
1771    Failure { message: String },
1772}
1773
1774#[derive(Debug, Deserialize, PartialEq, Eq)]
1775struct SkipCatchupError {
1776    message: String,
1777}