mz_orchestratord/controller/materialize/
environmentd.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use anyhow::bail;
18use k8s_openapi::{
19    api::{
20        apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetUpdateStrategy},
21        core::v1::{
22            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
23            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
24            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
25            Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume,
26            VolumeMount,
27        },
28        networking::v1::{
29            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
30            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
31        },
32        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
33    },
34    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
35};
36use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
37use maplit::btreemap;
38use mz_server_core::listeners::{
39    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
40    ListenersConfig, SqlListenerConfig,
41};
42use rand::{Rng, thread_rng};
43use reqwest::StatusCode;
44use semver::{BuildMetadata, Prerelease, Version};
45use serde::{Deserialize, Serialize};
46use sha2::{Digest, Sha256};
47use tracing::{trace, warn};
48
49use super::matching_image_from_environmentd_image_ref;
50use crate::controller::materialize::tls::{create_certificate, issuer_ref_defined};
51use crate::k8s::{apply_resource, delete_resource, get_resource};
52use mz_cloud_provider::CloudProvider;
53use mz_cloud_resources::crd::generated::cert_manager::certificates::{
54    Certificate, CertificatePrivateKeyAlgorithm,
55};
56use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
57use mz_orchestrator_tracing::TracingCliArgs;
58use mz_ore::instrument;
59
60static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
61    major: 0,
62    minor: 140,
63    patch: 0,
64    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
65    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
66});
67const V143: Version = Version::new(0, 143, 0);
68const V144: Version = Version::new(0, 144, 0);
69static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
70    major: 0,
71    minor: 147,
72    patch: 0,
73    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
74    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
75});
76const V153: Version = Version::new(0, 153, 0);
77static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
78    major: 0,
79    minor: 154,
80    patch: 0,
81    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
82    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
83});
84
85/// Describes the status of a deployment.
86///
87/// This is a simplified representation of `DeploymentState`, suitable for
88/// announcement to the external orchestrator.
89#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
90pub enum DeploymentStatus {
91    /// This deployment is not the leader. It is initializing and is not yet
92    /// ready to become the leader.
93    Initializing,
94    /// This deployment is not the leader, but it is ready to become the leader.
95    ReadyToPromote,
96    /// This deployment is in the process of becoming the leader.
97    Promoting,
98    /// This deployment is the leader.
99    IsLeader,
100}
101
102#[derive(Deserialize, Serialize)]
103pub struct LoginCredentials {
104    username: String,
105    // TODO Password type?
106    password: String,
107}
108
109#[derive(Debug, Serialize)]
110pub struct ConnectionInfo {
111    pub environmentd_url: String,
112    pub mz_system_secret_name: Option<String>,
113    pub listeners_configmap: ConfigMap,
114}
115
116#[derive(Debug, Serialize)]
117pub struct Resources {
118    pub generation: u64,
119    pub environmentd_network_policies: Vec<NetworkPolicy>,
120    pub service_account: Box<Option<ServiceAccount>>,
121    pub role: Box<Role>,
122    pub role_binding: Box<RoleBinding>,
123    pub public_service: Box<Service>,
124    pub generation_service: Box<Service>,
125    pub persist_pubsub_service: Box<Service>,
126    pub environmentd_certificate: Box<Option<Certificate>>,
127    pub environmentd_statefulset: Box<StatefulSet>,
128    pub connection_info: Box<ConnectionInfo>,
129}
130
131impl Resources {
132    pub fn new(
133        config: &super::MaterializeControllerArgs,
134        tracing: &TracingCliArgs,
135        orchestratord_namespace: &str,
136        mz: &Materialize,
137        generation: u64,
138    ) -> Self {
139        let environmentd_network_policies =
140            create_environmentd_network_policies(config, mz, orchestratord_namespace);
141
142        let service_account = Box::new(create_service_account_object(config, mz));
143        let role = Box::new(create_role_object(mz));
144        let role_binding = Box::new(create_role_binding_object(mz));
145        let public_service = Box::new(create_public_service_object(config, mz, generation));
146        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
147        let persist_pubsub_service =
148            Box::new(create_persist_pubsub_service(config, mz, generation));
149        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
150        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
151            config, tracing, mz, generation,
152        ));
153        let connection_info = Box::new(create_connection_info(config, mz, generation));
154
155        Self {
156            generation,
157            environmentd_network_policies,
158            service_account,
159            role,
160            role_binding,
161            public_service,
162            generation_service,
163            persist_pubsub_service,
164            environmentd_certificate,
165            environmentd_statefulset,
166            connection_info,
167        }
168    }
169
170    #[instrument]
171    pub async fn apply(
172        &self,
173        client: &Client,
174        force_promote: bool,
175        namespace: &str,
176    ) -> Result<Option<Action>, anyhow::Error> {
177        let environmentd_network_policy_api: Api<NetworkPolicy> =
178            Api::namespaced(client.clone(), namespace);
179        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
180        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
181        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
182        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
183        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
184        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
185        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
186        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
187
188        for policy in &self.environmentd_network_policies {
189            trace!("applying network policy {}", policy.name_unchecked());
190            apply_resource(&environmentd_network_policy_api, policy).await?;
191        }
192
193        if let Some(service_account) = &*self.service_account {
194            trace!("applying environmentd service account");
195            apply_resource(&service_account_api, service_account).await?;
196        }
197
198        trace!("applying environmentd role");
199        apply_resource(&role_api, &*self.role).await?;
200
201        trace!("applying environmentd role binding");
202        apply_resource(&role_binding_api, &*self.role_binding).await?;
203
204        trace!("applying environmentd per-generation service");
205        apply_resource(&service_api, &*self.generation_service).await?;
206
207        trace!("creating persist pubsub service");
208        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
209
210        if let Some(certificate) = &*self.environmentd_certificate {
211            trace!("creating new environmentd certificate");
212            apply_resource(&certificate_api, certificate).await?;
213        }
214
215        trace!("applying listeners configmap");
216        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
217
218        trace!("creating new environmentd statefulset");
219        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
220
221        let retry_action = Action::requeue(Duration::from_secs(thread_rng().gen_range(5..10)));
222
223        let statefulset = get_resource(
224            &statefulset_api,
225            &self.environmentd_statefulset.name_unchecked(),
226        )
227        .await?;
228        if statefulset
229            .and_then(|statefulset| statefulset.status)
230            .and_then(|status| status.ready_replicas)
231            .unwrap_or(0)
232            == 0
233        {
234            trace!("environmentd statefulset is not ready yet...");
235            return Ok(Some(retry_action));
236        }
237
238        let http_client = match &self.connection_info.mz_system_secret_name {
239            Some(mz_system_secret_name) => {
240                let http_client = reqwest::Client::builder()
241                    .timeout(std::time::Duration::from_secs(10))
242                    .cookie_store(true)
243                    // TODO add_root_certificate instead
244                    .danger_accept_invalid_certs(true)
245                    .build()
246                    .unwrap();
247                if let Some(data) = secret_api.get(mz_system_secret_name).await?.data {
248                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
249                        let password = String::from_utf8_lossy(&password.0).to_string();
250                        let login_url = reqwest::Url::parse(&format!(
251                            "{}/api/login",
252                            self.connection_info.environmentd_url,
253                        ))
254                        .unwrap();
255                        match http_client
256                            .post(login_url)
257                            .body(serde_json::to_string(&LoginCredentials {
258                                username: "mz_system".to_owned(),
259                                password,
260                            })?)
261                            .header("Content-Type", "application/json")
262                            .send()
263                            .await
264                        {
265                            Ok(response) => {
266                                if let Err(e) = response.error_for_status() {
267                                    trace!("failed to login to environmentd, retrying... ({e})");
268                                    return Ok(Some(retry_action));
269                                }
270                            }
271                            Err(e) => {
272                                trace!("failed to connect to environmentd, retrying... ({e})");
273                                return Ok(Some(retry_action));
274                            }
275                        };
276                    }
277                };
278                http_client
279            }
280            None => reqwest::Client::builder()
281                .timeout(std::time::Duration::from_secs(10))
282                .build()
283                .unwrap(),
284        };
285        let status_url = reqwest::Url::parse(&format!(
286            "{}/api/leader/status",
287            self.connection_info.environmentd_url,
288        ))
289        .unwrap();
290
291        match http_client.get(status_url.clone()).send().await {
292            Ok(response) => {
293                let response: BTreeMap<String, DeploymentStatus> = match response.error_for_status()
294                {
295                    Ok(response) => response.json().await?,
296                    Err(e) => {
297                        trace!("failed to get status of environmentd, retrying... ({e})");
298                        return Ok(Some(retry_action));
299                    }
300                };
301                if force_promote {
302                    trace!("skipping cluster catchup");
303                    let skip_catchup_url = reqwest::Url::parse(&format!(
304                        "{}/api/leader/skip-catchup",
305                        self.connection_info.environmentd_url,
306                    ))
307                    .unwrap();
308                    let response = http_client.post(skip_catchup_url).send().await?;
309                    if response.status() == StatusCode::BAD_REQUEST {
310                        let err: SkipCatchupError = response.json().await?;
311                        bail!("failed to skip catchup: {}", err.message);
312                    }
313                } else if response["status"] == DeploymentStatus::Initializing {
314                    trace!("environmentd is still initializing, retrying...");
315                    return Ok(Some(retry_action));
316                } else {
317                    trace!("environmentd is ready");
318                }
319            }
320            Err(e) => {
321                trace!("failed to connect to environmentd, retrying... ({e})");
322                return Ok(Some(retry_action));
323            }
324        }
325
326        let promote_url = reqwest::Url::parse(&format!(
327            "{}/api/leader/promote",
328            self.connection_info.environmentd_url,
329        ))
330        .unwrap();
331
332        // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
333        // It is absolutely critical that this promotion is done last!
334        //
335        // If there are any failures in this method, the error handler in
336        // the caller will attempt to revert and delete the new environmentd.
337        // After promotion, the new environmentd is active, so that would
338        // cause an outage!
339        // !!!!!!!!!!!!!!!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
340        trace!("promoting new environmentd to leader");
341        let response = http_client.post(promote_url).send().await?;
342        let response: BecomeLeaderResponse = match response.error_for_status() {
343            Ok(response) => response.json().await?,
344            Err(e) => {
345                trace!("failed to promote environmentd, retrying... ({e})");
346                return Ok(Some(retry_action));
347            }
348        };
349        if let BecomeLeaderResult::Failure { message } = response.result {
350            bail!("failed to promote new environmentd: {message}");
351        }
352
353        // A successful POST to the promotion endpoint only indicates
354        // that the promotion process was kicked off. It does not
355        // guarantee that the environment will be successfully promoted
356        // (e.g., if the environment crashes immediately after responding
357        // to the request, but before executing the takeover, the
358        // promotion will be lost).
359        //
360        // To guarantee the environment has been promoted successfully,
361        // we must wait to see at least one `IsLeader` status returned
362        // from the environment.
363
364        match http_client.get(status_url.clone()).send().await {
365            Ok(response) => {
366                let response: BTreeMap<String, DeploymentStatus> = response.json().await?;
367                if response["status"] != DeploymentStatus::IsLeader {
368                    trace!(
369                        "environmentd is still promoting (status: {:?}), retrying...",
370                        response["status"]
371                    );
372                    return Ok(Some(retry_action));
373                } else {
374                    trace!("environmentd is ready");
375                }
376            }
377            Err(e) => {
378                trace!("failed to connect to environmentd, retrying... ({e})");
379                return Ok(Some(retry_action));
380            }
381        }
382
383        Ok(None)
384    }
385
386    #[instrument]
387    pub async fn promote_services(
388        &self,
389        client: &Client,
390        namespace: &str,
391    ) -> Result<(), anyhow::Error> {
392        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
393
394        trace!("applying environmentd public service");
395        apply_resource(&service_api, &*self.public_service).await?;
396
397        Ok(())
398    }
399
400    #[instrument]
401    pub async fn teardown_generation(
402        &self,
403        client: &Client,
404        mz: &Materialize,
405        generation: u64,
406    ) -> Result<(), anyhow::Error> {
407        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
408        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
409        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
410
411        trace!("deleting environmentd statefulset for generation {generation}");
412        delete_resource(
413            &statefulset_api,
414            &mz.environmentd_statefulset_name(generation),
415        )
416        .await?;
417
418        trace!("deleting persist pubsub service for generation {generation}");
419        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
420
421        trace!("deleting environmentd per-generation service for generation {generation}");
422        delete_resource(
423            &service_api,
424            &mz.environmentd_generation_service_name(generation),
425        )
426        .await?;
427
428        trace!("deleting listeners configmap for generation {generation}");
429        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
430
431        Ok(())
432    }
433
434    // ideally we would just be able to hash the objects directly, but the
435    // generated kubernetes objects don't implement the Hash trait
436    pub fn generate_hash(&self) -> String {
437        let mut hasher = Sha256::new();
438        hasher.update(&serde_json::to_string(self).unwrap());
439        format!("{:x}", hasher.finalize())
440    }
441}
442
443fn create_environmentd_network_policies(
444    config: &super::MaterializeControllerArgs,
445    mz: &Materialize,
446    orchestratord_namespace: &str,
447) -> Vec<NetworkPolicy> {
448    let mut network_policies = Vec::new();
449    if config.network_policies.internal_enabled {
450        let environmentd_label_selector = LabelSelector {
451            match_labels: Some(
452                mz.default_labels()
453                    .into_iter()
454                    .chain([(
455                        "materialize.cloud/app".to_owned(),
456                        mz.environmentd_app_name(),
457                    )])
458                    .collect(),
459            ),
460            ..Default::default()
461        };
462        let orchestratord_label_selector = LabelSelector {
463            match_labels: Some(
464                config
465                    .orchestratord_pod_selector_labels
466                    .iter()
467                    .cloned()
468                    .map(|kv| (kv.key, kv.value))
469                    .collect(),
470            ),
471            ..Default::default()
472        };
473        // TODO (Alex) filter to just clusterd and environmentd,
474        // once we get a consistent set of labels for both.
475        let all_pods_label_selector = LabelSelector {
476            match_labels: Some(mz.default_labels()),
477            ..Default::default()
478        };
479        network_policies.extend([
480            // Allow all clusterd/environmentd traffic (between pods in the
481            // same environment)
482            NetworkPolicy {
483                metadata: mz
484                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
485                spec: Some(NetworkPolicySpec {
486                    egress: Some(vec![NetworkPolicyEgressRule {
487                        to: Some(vec![NetworkPolicyPeer {
488                            pod_selector: Some(all_pods_label_selector.clone()),
489                            ..Default::default()
490                        }]),
491                        ..Default::default()
492                    }]),
493                    ingress: Some(vec![NetworkPolicyIngressRule {
494                        from: Some(vec![NetworkPolicyPeer {
495                            pod_selector: Some(all_pods_label_selector.clone()),
496                            ..Default::default()
497                        }]),
498                        ..Default::default()
499                    }]),
500                    pod_selector: Some(all_pods_label_selector.clone()),
501                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
502                    ..Default::default()
503                }),
504            },
505            // Allow traffic from orchestratord to environmentd in order to hit
506            // the promotion endpoints during upgrades
507            NetworkPolicy {
508                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
509                spec: Some(NetworkPolicySpec {
510                    ingress: Some(vec![NetworkPolicyIngressRule {
511                        from: Some(vec![NetworkPolicyPeer {
512                            namespace_selector: Some(LabelSelector {
513                                match_labels: Some(btreemap! {
514                                    "kubernetes.io/metadata.name".into()
515                                        => orchestratord_namespace.into(),
516                                }),
517                                ..Default::default()
518                            }),
519                            pod_selector: Some(orchestratord_label_selector),
520                            ..Default::default()
521                        }]),
522                        ports: Some(vec![
523                            NetworkPolicyPort {
524                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
525                                protocol: Some("TCP".to_string()),
526                                ..Default::default()
527                            },
528                            NetworkPolicyPort {
529                                port: Some(IntOrString::Int(
530                                    config.environmentd_internal_http_port.into(),
531                                )),
532                                protocol: Some("TCP".to_string()),
533                                ..Default::default()
534                            },
535                        ]),
536                        ..Default::default()
537                    }]),
538                    pod_selector: Some(environmentd_label_selector),
539                    policy_types: Some(vec!["Ingress".to_owned()]),
540                    ..Default::default()
541                }),
542            },
543        ]);
544    }
545    if config.network_policies.ingress_enabled {
546        let mut ingress_label_selector = mz.default_labels();
547        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
548        network_policies.extend([NetworkPolicy {
549            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
550            spec: Some(NetworkPolicySpec {
551                ingress: Some(vec![NetworkPolicyIngressRule {
552                    from: Some(
553                        config
554                            .network_policies
555                            .ingress_cidrs
556                            .iter()
557                            .map(|cidr| NetworkPolicyPeer {
558                                ip_block: Some(IPBlock {
559                                    cidr: cidr.to_owned(),
560                                    except: None,
561                                }),
562                                ..Default::default()
563                            })
564                            .collect(),
565                    ),
566                    ports: Some(vec![
567                        NetworkPolicyPort {
568                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
569                            protocol: Some("TCP".to_string()),
570                            ..Default::default()
571                        },
572                        NetworkPolicyPort {
573                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
574                            protocol: Some("TCP".to_string()),
575                            ..Default::default()
576                        },
577                    ]),
578                    ..Default::default()
579                }]),
580                pod_selector: Some(LabelSelector {
581                    match_expressions: None,
582                    match_labels: Some(ingress_label_selector),
583                }),
584                policy_types: Some(vec!["Ingress".to_owned()]),
585                ..Default::default()
586            }),
587        }]);
588    }
589    if config.network_policies.egress_enabled {
590        network_policies.extend([NetworkPolicy {
591            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
592            spec: Some(NetworkPolicySpec {
593                egress: Some(vec![NetworkPolicyEgressRule {
594                    to: Some(
595                        config
596                            .network_policies
597                            .egress_cidrs
598                            .iter()
599                            .map(|cidr| NetworkPolicyPeer {
600                                ip_block: Some(IPBlock {
601                                    cidr: cidr.to_owned(),
602                                    except: None,
603                                }),
604                                ..Default::default()
605                            })
606                            .collect(),
607                    ),
608                    ..Default::default()
609                }]),
610                pod_selector: Some(LabelSelector {
611                    match_expressions: None,
612                    match_labels: Some(mz.default_labels()),
613                }),
614                policy_types: Some(vec!["Egress".to_owned()]),
615                ..Default::default()
616            }),
617        }]);
618    }
619    network_policies
620}
621
622fn create_service_account_object(
623    config: &super::MaterializeControllerArgs,
624    mz: &Materialize,
625) -> Option<ServiceAccount> {
626    if mz.create_service_account() {
627        let mut annotations: BTreeMap<String, String> = mz
628            .spec
629            .service_account_annotations
630            .clone()
631            .unwrap_or_default();
632        if let (CloudProvider::Aws, Some(role_arn)) = (
633            config.cloud_provider,
634            mz.spec
635                .environmentd_iam_role_arn
636                .as_deref()
637                .or(config.aws_info.environmentd_iam_role_arn.as_deref()),
638        ) {
639            warn!(
640                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
641            );
642            annotations.insert(
643                "eks.amazonaws.com/role-arn".to_string(),
644                role_arn.to_string(),
645            );
646        };
647
648        let mut labels = mz.default_labels();
649        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
650
651        Some(ServiceAccount {
652            metadata: ObjectMeta {
653                annotations: Some(annotations),
654                labels: Some(labels),
655                ..mz.managed_resource_meta(mz.service_account_name())
656            },
657            ..Default::default()
658        })
659    } else {
660        None
661    }
662}
663
664fn create_role_object(mz: &Materialize) -> Role {
665    Role {
666        metadata: mz.managed_resource_meta(mz.role_name()),
667        rules: Some(vec![
668            PolicyRule {
669                api_groups: Some(vec!["apps".to_string()]),
670                resources: Some(vec!["statefulsets".to_string()]),
671                verbs: vec![
672                    "get".to_string(),
673                    "list".to_string(),
674                    "watch".to_string(),
675                    "create".to_string(),
676                    "update".to_string(),
677                    "patch".to_string(),
678                    "delete".to_string(),
679                ],
680                ..Default::default()
681            },
682            PolicyRule {
683                api_groups: Some(vec!["".to_string()]),
684                resources: Some(vec![
685                    "persistentvolumeclaims".to_string(),
686                    "pods".to_string(),
687                    "secrets".to_string(),
688                    "services".to_string(),
689                ]),
690                verbs: vec![
691                    "get".to_string(),
692                    "list".to_string(),
693                    "watch".to_string(),
694                    "create".to_string(),
695                    "update".to_string(),
696                    "patch".to_string(),
697                    "delete".to_string(),
698                ],
699                ..Default::default()
700            },
701            PolicyRule {
702                api_groups: Some(vec!["".to_string()]),
703                resources: Some(vec!["configmaps".to_string()]),
704                verbs: vec!["get".to_string()],
705                ..Default::default()
706            },
707            PolicyRule {
708                api_groups: Some(vec!["materialize.cloud".to_string()]),
709                resources: Some(vec!["vpcendpoints".to_string()]),
710                verbs: vec![
711                    "get".to_string(),
712                    "list".to_string(),
713                    "watch".to_string(),
714                    "create".to_string(),
715                    "update".to_string(),
716                    "patch".to_string(),
717                    "delete".to_string(),
718                ],
719                ..Default::default()
720            },
721            PolicyRule {
722                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
723                resources: Some(vec!["pods".to_string()]),
724                verbs: vec!["get".to_string(), "list".to_string()],
725                ..Default::default()
726            },
727            PolicyRule {
728                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
729                resources: Some(vec![
730                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
731                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
732                ]),
733                verbs: vec!["get".to_string()],
734                ..Default::default()
735            },
736        ]),
737    }
738}
739
740fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
741    RoleBinding {
742        metadata: mz.managed_resource_meta(mz.role_binding_name()),
743        role_ref: RoleRef {
744            api_group: "".to_string(),
745            kind: "Role".to_string(),
746            name: mz.role_name(),
747        },
748        subjects: Some(vec![Subject {
749            api_group: Some("".to_string()),
750            kind: "ServiceAccount".to_string(),
751            name: mz.service_account_name(),
752            namespace: Some(mz.namespace()),
753        }]),
754    }
755}
756
757fn create_public_service_object(
758    config: &super::MaterializeControllerArgs,
759    mz: &Materialize,
760    generation: u64,
761) -> Service {
762    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
763}
764
765fn create_generation_service_object(
766    config: &super::MaterializeControllerArgs,
767    mz: &Materialize,
768    generation: u64,
769) -> Service {
770    create_base_service_object(
771        config,
772        mz,
773        generation,
774        &mz.environmentd_generation_service_name(generation),
775    )
776}
777
778fn create_base_service_object(
779    config: &super::MaterializeControllerArgs,
780    mz: &Materialize,
781    generation: u64,
782    service_name: &str,
783) -> Service {
784    let ports = vec![
785        ServicePort {
786            port: config.environmentd_sql_port.into(),
787            protocol: Some("TCP".to_string()),
788            name: Some("sql".to_string()),
789            ..Default::default()
790        },
791        ServicePort {
792            port: config.environmentd_http_port.into(),
793            protocol: Some("TCP".to_string()),
794            name: Some("https".to_string()),
795            ..Default::default()
796        },
797        ServicePort {
798            port: config.environmentd_internal_sql_port.into(),
799            protocol: Some("TCP".to_string()),
800            name: Some("internal-sql".to_string()),
801            ..Default::default()
802        },
803        ServicePort {
804            port: config.environmentd_internal_http_port.into(),
805            protocol: Some("TCP".to_string()),
806            name: Some("internal-http".to_string()),
807            ..Default::default()
808        },
809    ];
810
811    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
812
813    let spec = ServiceSpec {
814        type_: Some("ClusterIP".to_string()),
815        cluster_ip: Some("None".to_string()),
816        selector: Some(selector),
817        ports: Some(ports),
818        ..Default::default()
819    };
820
821    Service {
822        metadata: mz.managed_resource_meta(service_name.to_string()),
823        spec: Some(spec),
824        status: None,
825    }
826}
827
828fn create_persist_pubsub_service(
829    config: &super::MaterializeControllerArgs,
830    mz: &Materialize,
831    generation: u64,
832) -> Service {
833    Service {
834        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
835        spec: Some(ServiceSpec {
836            type_: Some("ClusterIP".to_string()),
837            cluster_ip: Some("None".to_string()),
838            selector: Some(btreemap! {
839                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
840            }),
841            ports: Some(vec![ServicePort {
842                name: Some("grpc".to_string()),
843                protocol: Some("TCP".to_string()),
844                port: config.environmentd_internal_persist_pubsub_port.into(),
845                ..Default::default()
846            }]),
847            ..Default::default()
848        }),
849        status: None,
850    }
851}
852
853fn create_environmentd_certificate(
854    config: &super::MaterializeControllerArgs,
855    mz: &Materialize,
856) -> Option<Certificate> {
857    create_certificate(
858        config.default_certificate_specs.internal.clone(),
859        mz,
860        mz.spec.internal_certificate_spec.clone(),
861        mz.environmentd_certificate_name(),
862        mz.environmentd_certificate_secret_name(),
863        Some(vec![
864            mz.environmentd_service_name(),
865            mz.environmentd_service_internal_fqdn(),
866        ]),
867        CertificatePrivateKeyAlgorithm::Ed25519,
868        None,
869    )
870}
871
872fn create_environmentd_statefulset_object(
873    config: &super::MaterializeControllerArgs,
874    tracing: &TracingCliArgs,
875    mz: &Materialize,
876    generation: u64,
877) -> StatefulSet {
878    // IMPORTANT: Only pass secrets via environment variables. All other
879    // parameters should be passed as command line arguments, possibly gated
880    // with a `meets_minimum_version` call. This ensures typos cause
881    // `environmentd` to fail to start, rather than silently ignoring the
882    // configuration.
883    //
884    // When passing a secret, use a `SecretKeySelector` to forward a secret into
885    // the pod. Do *not* hardcode a secret as the value directly, as doing so
886    // will leak the secret to anyone with permission to describe the pod.
887    let mut env = vec![
888        EnvVar {
889            name: "MZ_METADATA_BACKEND_URL".to_string(),
890            value_from: Some(EnvVarSource {
891                secret_key_ref: Some(SecretKeySelector {
892                    name: mz.backend_secret_name(),
893                    key: "metadata_backend_url".to_string(),
894                    optional: Some(false),
895                }),
896                ..Default::default()
897            }),
898            ..Default::default()
899        },
900        EnvVar {
901            name: "MZ_PERSIST_BLOB_URL".to_string(),
902            value_from: Some(EnvVarSource {
903                secret_key_ref: Some(SecretKeySelector {
904                    name: mz.backend_secret_name(),
905                    key: "persist_backend_url".to_string(),
906                    optional: Some(false),
907                }),
908                ..Default::default()
909            }),
910            ..Default::default()
911        },
912    ];
913
914    env.push(EnvVar {
915        name: "AWS_REGION".to_string(),
916        value: Some(config.region.clone()),
917        ..Default::default()
918    });
919
920    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
921
922    let mut args = vec![];
923
924    if let Some(helm_chart_version) = &config.helm_chart_version {
925        args.push(format!("--helm-chart-version={helm_chart_version}"));
926    }
927
928    // Add environment ID argument.
929    args.push(format!(
930        "--environment-id={}",
931        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
932    ));
933
934    // Add clusterd image argument based on environmentd tag.
935    args.push(format!(
936        "--clusterd-image={}",
937        matching_image_from_environmentd_image_ref(
938            &mz.spec.environmentd_image_ref,
939            "clusterd",
940            None
941        )
942    ));
943
944    // Add cluster and storage host size arguments.
945    args.extend(
946        [
947            config
948                .environmentd_cluster_replica_sizes
949                .as_ref()
950                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
951            config
952                .bootstrap_default_cluster_replica_size
953                .as_ref()
954                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
955            config
956                .bootstrap_builtin_system_cluster_replica_size
957                .as_ref()
958                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
959            config
960                .bootstrap_builtin_probe_cluster_replica_size
961                .as_ref()
962                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
963            config
964                .bootstrap_builtin_support_cluster_replica_size
965                .as_ref()
966                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
967            config
968                .bootstrap_builtin_catalog_server_cluster_replica_size
969                .as_ref()
970                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
971            config
972                .bootstrap_builtin_analytics_cluster_replica_size
973                .as_ref()
974                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
975            config
976                .bootstrap_builtin_system_cluster_replication_factor
977                .as_ref()
978                .map(|replication_factor| {
979                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
980                }),
981            config
982                .bootstrap_builtin_probe_cluster_replication_factor
983                .as_ref()
984                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
985            config
986                .bootstrap_builtin_support_cluster_replication_factor
987                .as_ref()
988                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
989            config
990                .bootstrap_builtin_analytics_cluster_replication_factor
991                .as_ref()
992                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
993        ]
994        .into_iter()
995        .flatten(),
996    );
997
998    args.extend(
999        config
1000            .environmentd_allowed_origins
1001            .iter()
1002            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
1003    );
1004
1005    args.push(format!(
1006        "--secrets-controller={}",
1007        config.secrets_controller
1008    ));
1009
1010    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
1011        if let Ok(cluster_replica_sizes) =
1012            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
1013        {
1014            let cluster_replica_sizes: Vec<_> =
1015                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
1016            args.push(format!(
1017                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
1018                cluster_replica_sizes.join("', '")
1019            ));
1020        }
1021    }
1022    if !config.cloud_provider.is_cloud() {
1023        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
1024    }
1025
1026    if config.enable_internal_statement_logging {
1027        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
1028    }
1029
1030    if config.disable_statement_logging {
1031        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
1032    }
1033
1034    if !mz.spec.enable_rbac {
1035        args.push("--system-parameter-default=enable_rbac_checks=false".into());
1036    }
1037
1038    // Add persist arguments.
1039
1040    // Configure the Persist Isolated Runtime to use one less thread than the total available.
1041    args.push("--persist-isolated-runtime-threads=-1".to_string());
1042
1043    // Add AWS arguments.
1044    if config.cloud_provider == CloudProvider::Aws {
1045        if let Some(azs) = config.aws_info.environmentd_availability_zones.as_ref() {
1046            for az in azs {
1047                args.push(format!("--availability-zone={az}"));
1048            }
1049        }
1050
1051        if let Some(environmentd_connection_role_arn) = mz
1052            .spec
1053            .environmentd_connection_role_arn
1054            .as_deref()
1055            .or(config.aws_info.environmentd_connection_role_arn.as_deref())
1056        {
1057            args.push(format!(
1058                "--aws-connection-role-arn={}",
1059                environmentd_connection_role_arn
1060            ));
1061        }
1062        if let Some(account_id) = &config.aws_info.aws_account_id {
1063            args.push(format!("--aws-account-id={account_id}"));
1064        }
1065
1066        args.extend([format!(
1067            "--aws-secrets-controller-tags=Environment={}",
1068            mz.name_unchecked()
1069        )]);
1070        args.extend_from_slice(&config.aws_info.aws_secrets_controller_tags);
1071    }
1072
1073    // Add Kubernetes arguments.
1074    args.extend([
1075        "--orchestrator=kubernetes".into(),
1076        format!(
1077            "--orchestrator-kubernetes-service-account={}",
1078            &mz.service_account_name()
1079        ),
1080        format!(
1081            "--orchestrator-kubernetes-image-pull-policy={}",
1082            config.image_pull_policy.as_kebab_case_str(),
1083        ),
1084    ]);
1085    for selector in &config.clusterd_node_selector {
1086        args.push(format!(
1087            "--orchestrator-kubernetes-service-node-selector={}={}",
1088            selector.key, selector.value,
1089        ));
1090    }
1091    if mz.meets_minimum_version(&V144) {
1092        if let Some(affinity) = &config.clusterd_affinity {
1093            let affinity = serde_json::to_string(affinity).unwrap();
1094            args.push(format!(
1095                "--orchestrator-kubernetes-service-affinity={affinity}"
1096            ))
1097        }
1098        if let Some(tolerations) = &config.clusterd_tolerations {
1099            let tolerations = serde_json::to_string(tolerations).unwrap();
1100            args.push(format!(
1101                "--orchestrator-kubernetes-service-tolerations={tolerations}"
1102            ))
1103        }
1104    }
1105    if let Some(scheduler_name) = &config.scheduler_name {
1106        args.push(format!(
1107            "--orchestrator-kubernetes-scheduler-name={}",
1108            scheduler_name
1109        ));
1110    }
1111    if mz.meets_minimum_version(&V154_DEV0) {
1112        args.extend(
1113            mz.spec
1114                .pod_annotations
1115                .as_ref()
1116                .map(|annotations| annotations.iter())
1117                .unwrap_or_default()
1118                .map(|(key, val)| {
1119                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
1120                }),
1121        );
1122    }
1123    args.extend(
1124        mz.default_labels()
1125            .iter()
1126            .chain(
1127                mz.spec
1128                    .pod_labels
1129                    .as_ref()
1130                    .map(|labels| labels.iter())
1131                    .unwrap_or_default(),
1132            )
1133            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
1134    );
1135    if let Some(status) = &mz.status {
1136        args.push(format!(
1137            "--orchestrator-kubernetes-name-prefix=mz{}-",
1138            status.resource_id
1139        ));
1140    }
1141
1142    // Add logging and tracing arguments.
1143    args.extend(["--log-format=json".into()]);
1144    if let Some(endpoint) = &tracing.opentelemetry_endpoint {
1145        args.push(format!("--opentelemetry-endpoint={}", endpoint));
1146    }
1147    // --opentelemetry-resource also configures sentry tags
1148    args.extend([
1149        format!(
1150            "--opentelemetry-resource=organization_id={}",
1151            mz.spec.environment_id
1152        ),
1153        format!(
1154            "--opentelemetry-resource=environment_name={}",
1155            mz.name_unchecked()
1156        ),
1157    ]);
1158
1159    if let Some(segment_api_key) = &config.segment_api_key {
1160        args.push(format!("--segment-api-key={}", segment_api_key));
1161        if config.segment_client_side {
1162            args.push("--segment-client-side".into());
1163        }
1164    }
1165
1166    let mut volumes = Vec::new();
1167    let mut volume_mounts = Vec::new();
1168    if issuer_ref_defined(
1169        &config.default_certificate_specs.internal,
1170        &mz.spec.internal_certificate_spec,
1171    ) {
1172        volumes.push(Volume {
1173            name: "certificate".to_owned(),
1174            secret: Some(SecretVolumeSource {
1175                default_mode: Some(0o400),
1176                secret_name: Some(mz.environmentd_certificate_secret_name()),
1177                items: None,
1178                optional: Some(false),
1179            }),
1180            ..Default::default()
1181        });
1182        volume_mounts.push(VolumeMount {
1183            name: "certificate".to_owned(),
1184            mount_path: "/etc/materialized".to_owned(),
1185            read_only: Some(true),
1186            ..Default::default()
1187        });
1188        args.extend([
1189            "--tls-mode=require".into(),
1190            "--tls-cert=/etc/materialized/tls.crt".into(),
1191            "--tls-key=/etc/materialized/tls.key".into(),
1192        ]);
1193    } else {
1194        args.push("--tls-mode=disable".to_string());
1195    }
1196    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
1197        args.push(format!(
1198            "--orchestrator-kubernetes-ephemeral-volume-class={}",
1199            ephemeral_volume_class
1200        ));
1201    }
1202    // The `materialize` user used by clusterd always has gid 999.
1203    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
1204
1205    // Add Sentry arguments.
1206    if let Some(sentry_dsn) = &tracing.sentry_dsn {
1207        args.push(format!("--sentry-dsn={}", sentry_dsn));
1208        if let Some(sentry_environment) = &tracing.sentry_environment {
1209            args.push(format!("--sentry-environment={}", sentry_environment));
1210        }
1211        args.push(format!("--sentry-tag=region={}", config.region));
1212    }
1213
1214    // Add Persist PubSub arguments
1215    args.push(format!(
1216        "--persist-pubsub-url=http://{}:{}",
1217        mz.persist_pubsub_service_name(generation),
1218        config.environmentd_internal_persist_pubsub_port,
1219    ));
1220    args.push(format!(
1221        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
1222        config.environmentd_internal_persist_pubsub_port
1223    ));
1224
1225    args.push(format!("--deploy-generation={}", generation));
1226
1227    // Add URL for internal user impersonation endpoint
1228    args.push(format!(
1229        "--internal-console-redirect-url={}",
1230        &config.internal_console_proxy_url,
1231    ));
1232
1233    if !config.collect_pod_metrics {
1234        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
1235    }
1236    if config.enable_prometheus_scrape_annotations {
1237        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
1238    }
1239
1240    // the --disable-license-key-checks environmentd flag only existed
1241    // between these versions
1242    if config.disable_license_key_checks {
1243        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
1244            args.push("--disable-license-key-checks".into());
1245        }
1246    }
1247
1248    // as of version 0.153, the ability to disable license key checks was
1249    // removed, so we should always set up license keys in that case
1250    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
1251        || mz.meets_minimum_version(&V153)
1252    {
1253        volume_mounts.push(VolumeMount {
1254            name: "license-key".to_string(),
1255            mount_path: "/license_key".to_string(),
1256            ..Default::default()
1257        });
1258        volumes.push(Volume {
1259            name: "license-key".to_string(),
1260            secret: Some(SecretVolumeSource {
1261                default_mode: Some(256),
1262                optional: Some(false),
1263                secret_name: Some(mz.backend_secret_name()),
1264                items: Some(vec![KeyToPath {
1265                    key: "license_key".to_string(),
1266                    path: "license_key".to_string(),
1267                    ..Default::default()
1268                }]),
1269                ..Default::default()
1270            }),
1271            ..Default::default()
1272        });
1273        env.push(EnvVar {
1274            name: "MZ_LICENSE_KEY".to_string(),
1275            value: Some("/license_key/license_key".to_string()),
1276            ..Default::default()
1277        });
1278    }
1279
1280    // Add user-specified extra arguments.
1281    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
1282        args.extend(extra_args.iter().cloned());
1283    }
1284
1285    let probe = Probe {
1286        initial_delay_seconds: Some(1),
1287        failure_threshold: Some(12),
1288        tcp_socket: Some(TCPSocketAction {
1289            host: None,
1290            port: IntOrString::Int(config.environmentd_sql_port.into()),
1291        }),
1292        ..Default::default()
1293    };
1294
1295    let security_context = if config.enable_security_context {
1296        // Since we want to adhere to the most restrictive security context, all
1297        // of these fields have to be set how they are.
1298        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1299        Some(SecurityContext {
1300            run_as_non_root: Some(true),
1301            capabilities: Some(Capabilities {
1302                drop: Some(vec!["ALL".to_string()]),
1303                ..Default::default()
1304            }),
1305            seccomp_profile: Some(SeccompProfile {
1306                type_: "RuntimeDefault".to_string(),
1307                ..Default::default()
1308            }),
1309            allow_privilege_escalation: Some(false),
1310            ..Default::default()
1311        })
1312    } else {
1313        None
1314    };
1315
1316    let ports = vec![
1317        ContainerPort {
1318            container_port: config.environmentd_sql_port.into(),
1319            name: Some("sql".to_owned()),
1320            ..Default::default()
1321        },
1322        ContainerPort {
1323            container_port: config.environmentd_internal_sql_port.into(),
1324            name: Some("internal-sql".to_owned()),
1325            ..Default::default()
1326        },
1327        ContainerPort {
1328            container_port: config.environmentd_http_port.into(),
1329            name: Some("http".to_owned()),
1330            ..Default::default()
1331        },
1332        ContainerPort {
1333            container_port: config.environmentd_internal_http_port.into(),
1334            name: Some("internal-http".to_owned()),
1335            ..Default::default()
1336        },
1337        ContainerPort {
1338            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1339            name: Some("persist-pubsub".to_owned()),
1340            ..Default::default()
1341        },
1342    ];
1343
1344    // Add networking arguments.
1345    if mz.meets_minimum_version(&V147_DEV0) {
1346        volume_mounts.push(VolumeMount {
1347            name: "listeners-configmap".to_string(),
1348            mount_path: "/listeners".to_string(),
1349            ..Default::default()
1350        });
1351        volumes.push(Volume {
1352            name: "listeners-configmap".to_string(),
1353            config_map: Some(ConfigMapVolumeSource {
1354                name: mz.listeners_configmap_name(generation),
1355                default_mode: Some(256),
1356                optional: Some(false),
1357                items: Some(vec![KeyToPath {
1358                    key: "listeners.json".to_string(),
1359                    path: "listeners.json".to_string(),
1360                    ..Default::default()
1361                }]),
1362            }),
1363            ..Default::default()
1364        });
1365        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1366        if mz.spec.authenticator_kind == AuthenticatorKind::Password {
1367            args.push("--system-parameter-default=enable_password_auth=true".into());
1368            env.push(EnvVar {
1369                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1370                value_from: Some(EnvVarSource {
1371                    secret_key_ref: Some(SecretKeySelector {
1372                        name: mz.backend_secret_name(),
1373                        key: "external_login_password_mz_system".to_string(),
1374                        optional: Some(false),
1375                    }),
1376                    ..Default::default()
1377                }),
1378                ..Default::default()
1379            })
1380        }
1381    } else {
1382        args.extend([
1383            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1384            format!(
1385                "--http-listen-addr=0.0.0.0:{}",
1386                config.environmentd_http_port
1387            ),
1388            format!(
1389                "--internal-sql-listen-addr=0.0.0.0:{}",
1390                config.environmentd_internal_sql_port
1391            ),
1392            format!(
1393                "--internal-http-listen-addr=0.0.0.0:{}",
1394                config.environmentd_internal_http_port
1395            ),
1396        ]);
1397    }
1398
1399    let container = Container {
1400        name: "environmentd".to_owned(),
1401        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1402        image_pull_policy: Some(config.image_pull_policy.to_string()),
1403        ports: Some(ports),
1404        args: Some(args),
1405        env: Some(env),
1406        volume_mounts: Some(volume_mounts),
1407        liveness_probe: Some(probe.clone()),
1408        readiness_probe: Some(probe),
1409        resources: mz
1410            .spec
1411            .environmentd_resource_requirements
1412            .clone()
1413            .or_else(|| config.environmentd_default_resources.clone()),
1414        security_context: security_context.clone(),
1415        ..Default::default()
1416    };
1417
1418    let mut pod_template_labels = mz.default_labels();
1419    pod_template_labels.insert(
1420        "materialize.cloud/name".to_owned(),
1421        mz.environmentd_statefulset_name(generation),
1422    );
1423    pod_template_labels.insert(
1424        "materialize.cloud/app".to_owned(),
1425        mz.environmentd_app_name(),
1426    );
1427    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1428    pod_template_labels.extend(
1429        mz.spec
1430            .pod_labels
1431            .as_ref()
1432            .map(|labels| labels.iter())
1433            .unwrap_or_default()
1434            .map(|(key, value)| (key.clone(), value.clone())),
1435    );
1436
1437    let mut pod_template_annotations = btreemap! {
1438        // We can re-enable eviction once we have HA
1439        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1440
1441        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1442        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1443        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1444        "materialize.cloud/generation".to_owned() => generation.to_string(),
1445    };
1446    if config.enable_prometheus_scrape_annotations {
1447        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1448        pod_template_annotations.insert(
1449            "prometheus.io/port".to_owned(),
1450            config.environmentd_internal_http_port.to_string(),
1451        );
1452        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1453        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1454        pod_template_annotations.insert(
1455            "materialize.prometheus.io/mz_usage_path".to_owned(),
1456            "/metrics/mz_usage".to_string(),
1457        );
1458        pod_template_annotations.insert(
1459            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1460            "/metrics/mz_frontier".to_string(),
1461        );
1462        pod_template_annotations.insert(
1463            "materialize.prometheus.io/mz_compute_path".to_owned(),
1464            "/metrics/mz_compute".to_string(),
1465        );
1466        pod_template_annotations.insert(
1467            "materialize.prometheus.io/mz_storage_path".to_owned(),
1468            "/metrics/mz_storage".to_string(),
1469        );
1470    }
1471    pod_template_annotations.extend(
1472        mz.spec
1473            .pod_annotations
1474            .as_ref()
1475            .map(|annotations| annotations.iter())
1476            .unwrap_or_default()
1477            .map(|(key, value)| (key.clone(), value.clone())),
1478    );
1479
1480    let mut tolerations = vec![
1481        // When the node becomes `NotReady` it indicates there is a problem with the node,
1482        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1483        // But we want to limit this to 30s for faster recovery
1484        Toleration {
1485            effect: Some("NoExecute".into()),
1486            key: Some("node.kubernetes.io/not-ready".into()),
1487            operator: Some("Exists".into()),
1488            toleration_seconds: Some(30),
1489            value: None,
1490        },
1491        Toleration {
1492            effect: Some("NoExecute".into()),
1493            key: Some("node.kubernetes.io/unreachable".into()),
1494            operator: Some("Exists".into()),
1495            toleration_seconds: Some(30),
1496            value: None,
1497        },
1498    ];
1499    if let Some(user_tolerations) = &config.environmentd_tolerations {
1500        tolerations.extend(user_tolerations.iter().cloned());
1501    }
1502    let tolerations = Some(tolerations);
1503
1504    let pod_template_spec = PodTemplateSpec {
1505        // not using managed_resource_meta because the pod should be owned
1506        // by the statefulset, not the materialize instance
1507        metadata: Some(ObjectMeta {
1508            labels: Some(pod_template_labels),
1509            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1510            ..Default::default()
1511        }),
1512        spec: Some(PodSpec {
1513            containers: vec![container],
1514            node_selector: Some(
1515                config
1516                    .environmentd_node_selector
1517                    .iter()
1518                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1519                    .collect(),
1520            ),
1521            affinity: config.environmentd_affinity.clone(),
1522            scheduler_name: config.scheduler_name.clone(),
1523            service_account_name: Some(mz.service_account_name()),
1524            volumes: Some(volumes),
1525            security_context: Some(PodSecurityContext {
1526                fs_group: Some(999),
1527                run_as_user: Some(999),
1528                run_as_group: Some(999),
1529                ..Default::default()
1530            }),
1531            tolerations,
1532            // This (apparently) has the side effect of automatically starting a new pod
1533            // when the previous pod is currently terminating. This side steps the statefulset fencing
1534            // but allows for quicker recovery from a failed node
1535            // The Kubernetes documentation strongly advises against this
1536            // setting, as StatefulSets attempt to provide "at most once"
1537            // semantics [0]-- that is, the guarantee that for a given pod in a
1538            // StatefulSet there is *at most* one pod with that identity running
1539            // in the cluster
1540            //
1541            // Materialize, however, has been carefully designed to *not* rely
1542            // on this guarantee. (In fact, we do not believe that correct
1543            // distributed systems can meaningfully rely on Kubernetes's
1544            // guarantee--network packets from a pod can be arbitrarily delayed,
1545            // long past that pod's termination.) Running two `environmentd`
1546            // processes is safe: the newer process will safely and correctly
1547            // fence out the older process via CockroachDB. In the future,
1548            // we'll support running multiple `environmentd` processes in
1549            // parallel for high availability.
1550            //
1551            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1552            termination_grace_period_seconds: Some(0),
1553            ..Default::default()
1554        }),
1555    };
1556
1557    let mut match_labels = BTreeMap::new();
1558    match_labels.insert(
1559        "materialize.cloud/name".to_owned(),
1560        mz.environmentd_statefulset_name(generation),
1561    );
1562
1563    let statefulset_spec = StatefulSetSpec {
1564        replicas: Some(1),
1565        template: pod_template_spec,
1566        update_strategy: Some(StatefulSetUpdateStrategy {
1567            rolling_update: None,
1568            type_: Some("OnDelete".to_owned()),
1569        }),
1570        service_name: Some(mz.environmentd_service_name()),
1571        selector: LabelSelector {
1572            match_expressions: None,
1573            match_labels: Some(match_labels),
1574        },
1575        ..Default::default()
1576    };
1577
1578    StatefulSet {
1579        metadata: ObjectMeta {
1580            annotations: Some(btreemap! {
1581                "materialize.cloud/generation".to_owned() => generation.to_string(),
1582                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1583            }),
1584            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1585        },
1586        spec: Some(statefulset_spec),
1587        status: None,
1588    }
1589}
1590
1591fn create_connection_info(
1592    config: &super::MaterializeControllerArgs,
1593    mz: &Materialize,
1594    generation: u64,
1595) -> ConnectionInfo {
1596    let external_enable_tls = issuer_ref_defined(
1597        &config.default_certificate_specs.internal,
1598        &mz.spec.internal_certificate_spec,
1599    );
1600    let authenticator_kind = mz.spec.authenticator_kind;
1601    let mut listeners_config = ListenersConfig {
1602        sql: btreemap! {
1603            "external".to_owned() => SqlListenerConfig{
1604                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1605                authenticator_kind,
1606                allowed_roles: AllowedRoles::Normal,
1607                enable_tls: external_enable_tls,
1608            },
1609            "internal".to_owned() => SqlListenerConfig{
1610                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1611                authenticator_kind: AuthenticatorKind::None,
1612                // Should this just be Internal?
1613                allowed_roles: AllowedRoles::NormalAndInternal,
1614                enable_tls: false,
1615            },
1616        },
1617        http: btreemap! {
1618            "external".to_owned() => HttpListenerConfig{
1619                base: BaseListenerConfig {
1620                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1621                    authenticator_kind,
1622                    allowed_roles: AllowedRoles::Normal,
1623                    enable_tls: external_enable_tls,
1624                },
1625                routes: HttpRoutesEnabled{
1626                    base: true,
1627                    webhook: true,
1628                    internal: false,
1629                    metrics: false,
1630                    profiling: false,
1631                }
1632            },
1633            "internal".to_owned() => HttpListenerConfig{
1634                base: BaseListenerConfig {
1635                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1636                    authenticator_kind: AuthenticatorKind::None,
1637                    // Should this just be Internal?
1638                    allowed_roles: AllowedRoles::NormalAndInternal,
1639                    enable_tls: false,
1640                },
1641                routes: HttpRoutesEnabled{
1642                    base: true,
1643                    webhook: true,
1644                    internal: true,
1645                    metrics: true,
1646                    profiling: true,
1647                }
1648            },
1649        },
1650    };
1651    if authenticator_kind == AuthenticatorKind::Password {
1652        listeners_config.sql.remove("internal");
1653        listeners_config.http.remove("internal");
1654
1655        listeners_config.sql.get_mut("external").map(|listener| {
1656            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1657            listener
1658        });
1659        listeners_config.http.get_mut("external").map(|listener| {
1660            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1661            listener.routes.internal = true;
1662            listener.routes.profiling = true;
1663            listener
1664        });
1665
1666        listeners_config.http.insert(
1667            "metrics".to_owned(),
1668            HttpListenerConfig {
1669                base: BaseListenerConfig {
1670                    addr: SocketAddr::new(
1671                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1672                        config.environmentd_internal_http_port,
1673                    ),
1674                    authenticator_kind: AuthenticatorKind::None,
1675                    allowed_roles: AllowedRoles::NormalAndInternal,
1676                    enable_tls: false,
1677                },
1678                routes: HttpRoutesEnabled {
1679                    base: false,
1680                    webhook: false,
1681                    internal: false,
1682                    metrics: true,
1683                    profiling: false,
1684                },
1685            },
1686        );
1687    }
1688
1689    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1690    let listeners_configmap = ConfigMap {
1691        binary_data: None,
1692        data: Some(btreemap! {
1693            "listeners.json".to_owned() => listeners_json,
1694        }),
1695        immutable: None,
1696        metadata: ObjectMeta {
1697            annotations: Some(btreemap! {
1698                "materialize.cloud/generation".to_owned() => generation.to_string(),
1699            }),
1700            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1701        },
1702    };
1703
1704    let (scheme, leader_api_port, mz_system_secret_name) = match mz.spec.authenticator_kind {
1705        AuthenticatorKind::Password => {
1706            let scheme = if external_enable_tls { "https" } else { "http" };
1707            (
1708                scheme,
1709                config.environmentd_http_port,
1710                Some(mz.spec.backend_secret_name.clone()),
1711            )
1712        }
1713        _ => ("http", config.environmentd_internal_http_port, None),
1714    };
1715    let environmentd_url = format!(
1716        "{}://{}.{}.svc.cluster.local:{}",
1717        scheme,
1718        mz.environmentd_generation_service_name(generation),
1719        mz.namespace(),
1720        leader_api_port,
1721    );
1722    ConnectionInfo {
1723        environmentd_url,
1724        listeners_configmap,
1725        mz_system_secret_name,
1726    }
1727}
1728
1729// see materialize/src/environmentd/src/http.rs
1730#[derive(Debug, Deserialize, PartialEq, Eq)]
1731struct BecomeLeaderResponse {
1732    result: BecomeLeaderResult,
1733}
1734
1735#[derive(Debug, Deserialize, PartialEq, Eq)]
1736enum BecomeLeaderResult {
1737    Success,
1738    Failure { message: String },
1739}
1740
1741#[derive(Debug, Deserialize, PartialEq, Eq)]
1742struct SkipCatchupError {
1743    message: String,
1744}