Skip to main content

mz_orchestratord/controller/materialize/
generation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25        },
26    },
27    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33    ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51    major: 0,
52    minor: 140,
53    patch: 0,
54    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 147,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68    major: 0,
69    minor: 154,
70    patch: 0,
71    pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 26,
78    minor: 1,
79    patch: 0,
80    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84/// Describes the status of a deployment.
85///
86/// This is a simplified representation of `DeploymentState`, suitable for
87/// announcement to the external orchestrator.
88#[derive(
89    Debug,
90    Serialize,
91    Deserialize,
92    Clone,
93    Copy,
94    PartialEq,
95    Eq,
96    PartialOrd,
97    Ord
98)]
99pub enum DeploymentStatus {
100    /// This deployment is not the leader. It is initializing and is not yet
101    /// ready to become the leader.
102    Initializing,
103    /// This deployment is not the leader, but it is ready to become the leader.
104    ReadyToPromote,
105    /// This deployment is in the process of becoming the leader.
106    Promoting,
107    /// This deployment is the leader.
108    IsLeader,
109}
110
111#[derive(
112    Debug,
113    Serialize,
114    Deserialize,
115    Clone,
116    Copy,
117    PartialEq,
118    Eq,
119    PartialOrd,
120    Ord
121)]
122pub struct GetLeaderStatusResponse {
123    status: DeploymentStatus,
124}
125
126#[derive(Deserialize, Serialize)]
127pub struct LoginCredentials {
128    username: String,
129    // TODO Password type?
130    password: String,
131}
132
133#[derive(Debug, Serialize)]
134pub struct ConnectionInfo {
135    pub environmentd_url: String,
136    pub mz_system_secret_name: Option<String>,
137    pub listeners_configmap: ConfigMap,
138}
139
140#[derive(Debug, Serialize)]
141pub struct Resources {
142    pub generation: u64,
143    pub public_service: Box<Service>,
144    pub generation_service: Box<Service>,
145    pub persist_pubsub_service: Box<Service>,
146    pub environmentd_statefulset: Box<StatefulSet>,
147    pub connection_info: Box<ConnectionInfo>,
148}
149
150impl Resources {
151    pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
152        let public_service = Box::new(create_public_service_object(config, mz, generation));
153        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
154        let persist_pubsub_service =
155            Box::new(create_persist_pubsub_service(config, mz, generation));
156        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157            config, mz, generation,
158        ));
159        let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161        Self {
162            generation,
163            public_service,
164            generation_service,
165            persist_pubsub_service,
166            environmentd_statefulset,
167            connection_info,
168        }
169    }
170
171    #[instrument]
172    pub async fn apply(
173        &self,
174        client: &Client,
175        force_promote: bool,
176        namespace: &str,
177    ) -> Result<Option<Action>, Error> {
178        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
179        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
180        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
181
182        trace!("applying environmentd per-generation service");
183        apply_resource(&service_api, &*self.generation_service).await?;
184
185        trace!("creating persist pubsub service");
186        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
187
188        trace!("applying listeners configmap");
189        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
190
191        trace!("creating new environmentd statefulset");
192        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
193
194        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
195
196        let statefulset = get_resource(
197            &statefulset_api,
198            &self.environmentd_statefulset.name_unchecked(),
199        )
200        .await?;
201        if statefulset
202            .and_then(|statefulset| statefulset.status)
203            .and_then(|status| status.ready_replicas)
204            .unwrap_or(0)
205            == 0
206        {
207            trace!("environmentd statefulset is not ready yet...");
208            return Ok(Some(retry_action));
209        }
210
211        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
212            return Ok(Some(retry_action));
213        };
214        let status_url = reqwest::Url::parse(&format!(
215            "{}/api/leader/status",
216            self.connection_info.environmentd_url,
217        ))
218        .unwrap();
219
220        match http_client.get(status_url.clone()).send().await {
221            Ok(response) => {
222                let response: GetLeaderStatusResponse = match response.error_for_status() {
223                    Ok(response) => response.json().await?,
224                    Err(e) => {
225                        trace!("failed to get status of environmentd, retrying... ({e})");
226                        return Ok(Some(retry_action));
227                    }
228                };
229                if force_promote {
230                    trace!("skipping cluster catchup");
231                    let skip_catchup_url = reqwest::Url::parse(&format!(
232                        "{}/api/leader/skip-catchup",
233                        self.connection_info.environmentd_url,
234                    ))
235                    .unwrap();
236                    let response = http_client.post(skip_catchup_url).send().await?;
237                    if response.status() == StatusCode::BAD_REQUEST {
238                        let err: SkipCatchupError = response.json().await?;
239                        return Err(
240                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
241                        );
242                    }
243                } else {
244                    match response.status {
245                        DeploymentStatus::Initializing => {
246                            trace!("environmentd is still initializing, retrying...");
247                            return Ok(Some(retry_action));
248                        }
249                        DeploymentStatus::ReadyToPromote
250                        | DeploymentStatus::Promoting
251                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
252                    }
253                }
254            }
255            Err(e) => {
256                trace!("failed to connect to environmentd, retrying... ({e})");
257                return Ok(Some(retry_action));
258            }
259        }
260
261        Ok(None)
262    }
263
264    #[instrument]
265    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
266        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
267        Some(match &self.connection_info.mz_system_secret_name {
268            Some(mz_system_secret_name) => {
269                let http_client = reqwest::Client::builder()
270                    .timeout(std::time::Duration::from_secs(10))
271                    .cookie_store(true)
272                    // TODO add_root_certificate instead
273                    .danger_accept_invalid_certs(true)
274                    .build()
275                    .unwrap();
276
277                let secret = secret_api
278                    .get(mz_system_secret_name)
279                    .await
280                    .map_err(|e| {
281                        error!("Failed to get backend secret: {:?}", e);
282                        e
283                    })
284                    .ok()?;
285                if let Some(data) = secret.data {
286                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
287                        let password = String::from_utf8_lossy(&password.0).to_string();
288                        let login_url = reqwest::Url::parse(&format!(
289                            "{}/api/login",
290                            self.connection_info.environmentd_url,
291                        ))
292                        .unwrap();
293                        match http_client
294                            .post(login_url)
295                            .body(
296                                serde_json::to_string(&LoginCredentials {
297                                    username: "mz_system".to_owned(),
298                                    password,
299                                })
300                                .expect(
301                                    "Serializing a simple struct with utf8 strings doesn't fail.",
302                                ),
303                            )
304                            .header("Content-Type", "application/json")
305                            .send()
306                            .await
307                        {
308                            Ok(response) => {
309                                if let Err(e) = response.error_for_status() {
310                                    trace!("failed to login to environmentd, retrying... ({e})");
311                                    return None;
312                                }
313                            }
314                            Err(e) => {
315                                trace!("failed to connect to environmentd, retrying... ({e})");
316                                return None;
317                            }
318                        };
319                    }
320                };
321                http_client
322            }
323            None => reqwest::Client::builder()
324                .timeout(std::time::Duration::from_secs(10))
325                .build()
326                .unwrap(),
327        })
328    }
329
330    #[instrument]
331    pub async fn promote_services(
332        &self,
333        client: &Client,
334        namespace: &str,
335    ) -> Result<Option<Action>, Error> {
336        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
337        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
338
339        let promote_url = reqwest::Url::parse(&format!(
340            "{}/api/leader/promote",
341            self.connection_info.environmentd_url,
342        ))
343        .unwrap();
344
345        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
346            return Ok(Some(retry_action));
347        };
348
349        trace!("promoting new environmentd to leader");
350        let response = http_client.post(promote_url).send().await?;
351        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
352        if let BecomeLeaderResult::Failure { message } = response.result {
353            return Err(Error::Anyhow(anyhow::anyhow!(
354                "failed to promote new environmentd: {message}"
355            )));
356        }
357
358        // A successful POST to the promotion endpoint only indicates
359        // that the promotion process was kicked off. It does not
360        // guarantee that the environment will be successfully promoted
361        // (e.g., if the environment crashes immediately after responding
362        // to the request, but before executing the takeover, the
363        // promotion will be lost).
364        //
365        // To guarantee the environment has been promoted successfully,
366        // we must wait to see at least one `IsLeader` status returned
367        // from the environment.
368
369        let status_url = reqwest::Url::parse(&format!(
370            "{}/api/leader/status",
371            self.connection_info.environmentd_url,
372        ))
373        .unwrap();
374        match http_client.get(status_url.clone()).send().await {
375            Ok(response) => {
376                let response: GetLeaderStatusResponse = response.json().await?;
377                if response.status != DeploymentStatus::IsLeader {
378                    trace!(
379                        "environmentd is still promoting (status: {:?}), retrying...",
380                        response.status
381                    );
382                    return Ok(Some(retry_action));
383                } else {
384                    trace!("environmentd is ready");
385                }
386            }
387            Err(e) => {
388                trace!("failed to connect to environmentd, retrying... ({e})");
389                return Ok(Some(retry_action));
390            }
391        }
392
393        trace!("applying environmentd public service");
394        apply_resource(&service_api, &*self.public_service).await?;
395
396        Ok(None)
397    }
398
399    #[instrument]
400    pub async fn teardown_generation(
401        &self,
402        client: &Client,
403        mz: &Materialize,
404        generation: u64,
405    ) -> Result<(), anyhow::Error> {
406        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
407        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
408        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
409
410        trace!("deleting environmentd statefulset for generation {generation}");
411        delete_resource(
412            &statefulset_api,
413            &mz.environmentd_statefulset_name(generation),
414        )
415        .await?;
416
417        trace!("deleting persist pubsub service for generation {generation}");
418        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
419
420        trace!("deleting environmentd per-generation service for generation {generation}");
421        delete_resource(
422            &service_api,
423            &mz.environmentd_generation_service_name(generation),
424        )
425        .await?;
426
427        trace!("deleting listeners configmap for generation {generation}");
428        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
429
430        Ok(())
431    }
432
433    // ideally we would just be able to hash the objects directly, but the
434    // generated kubernetes objects don't implement the Hash trait
435    pub fn generate_hash(&self) -> String {
436        let mut hasher = Sha256::new();
437        hasher.update(&serde_json::to_string(self).unwrap());
438        format!("{:x}", hasher.finalize())
439    }
440}
441
442fn create_public_service_object(
443    config: &super::Config,
444    mz: &Materialize,
445    generation: u64,
446) -> Service {
447    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
448}
449
450fn create_generation_service_object(
451    config: &super::Config,
452    mz: &Materialize,
453    generation: u64,
454) -> Service {
455    create_base_service_object(
456        config,
457        mz,
458        generation,
459        &mz.environmentd_generation_service_name(generation),
460    )
461}
462
463fn create_base_service_object(
464    config: &super::Config,
465    mz: &Materialize,
466    generation: u64,
467    service_name: &str,
468) -> Service {
469    let ports = vec![
470        ServicePort {
471            port: config.environmentd_sql_port.into(),
472            protocol: Some("TCP".to_string()),
473            name: Some("sql".to_string()),
474            ..Default::default()
475        },
476        ServicePort {
477            port: config.environmentd_http_port.into(),
478            protocol: Some("TCP".to_string()),
479            name: Some("https".to_string()),
480            ..Default::default()
481        },
482        ServicePort {
483            port: config.environmentd_internal_sql_port.into(),
484            protocol: Some("TCP".to_string()),
485            name: Some("internal-sql".to_string()),
486            ..Default::default()
487        },
488        ServicePort {
489            port: config.environmentd_internal_http_port.into(),
490            protocol: Some("TCP".to_string()),
491            name: Some("internal-http".to_string()),
492            ..Default::default()
493        },
494    ];
495
496    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
497
498    let spec = ServiceSpec {
499        type_: Some("ClusterIP".to_string()),
500        cluster_ip: Some("None".to_string()),
501        selector: Some(selector),
502        ports: Some(ports),
503        ..Default::default()
504    };
505
506    Service {
507        metadata: mz.managed_resource_meta(service_name.to_string()),
508        spec: Some(spec),
509        status: None,
510    }
511}
512
513fn create_persist_pubsub_service(
514    config: &super::Config,
515    mz: &Materialize,
516    generation: u64,
517) -> Service {
518    Service {
519        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
520        spec: Some(ServiceSpec {
521            type_: Some("ClusterIP".to_string()),
522            cluster_ip: Some("None".to_string()),
523            selector: Some(btreemap! {
524                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
525            }),
526            ports: Some(vec![ServicePort {
527                name: Some("grpc".to_string()),
528                protocol: Some("TCP".to_string()),
529                port: config.environmentd_internal_persist_pubsub_port.into(),
530                ..Default::default()
531            }]),
532            ..Default::default()
533        }),
534        status: None,
535    }
536}
537
538fn create_environmentd_statefulset_object(
539    config: &super::Config,
540    mz: &Materialize,
541    generation: u64,
542) -> StatefulSet {
543    // IMPORTANT: Only pass secrets via environment variables. All other
544    // parameters should be passed as command line arguments, possibly gated
545    // with a `meets_minimum_version` call. This ensures typos cause
546    // `environmentd` to fail to start, rather than silently ignoring the
547    // configuration.
548    //
549    // When passing a secret, use a `SecretKeySelector` to forward a secret into
550    // the pod. Do *not* hardcode a secret as the value directly, as doing so
551    // will leak the secret to anyone with permission to describe the pod.
552    let mut env = vec![
553        EnvVar {
554            name: "MZ_METADATA_BACKEND_URL".to_string(),
555            value_from: Some(EnvVarSource {
556                secret_key_ref: Some(SecretKeySelector {
557                    name: mz.backend_secret_name(),
558                    key: "metadata_backend_url".to_string(),
559                    optional: Some(false),
560                }),
561                ..Default::default()
562            }),
563            ..Default::default()
564        },
565        EnvVar {
566            name: "MZ_PERSIST_BLOB_URL".to_string(),
567            value_from: Some(EnvVarSource {
568                secret_key_ref: Some(SecretKeySelector {
569                    name: mz.backend_secret_name(),
570                    key: "persist_backend_url".to_string(),
571                    optional: Some(false),
572                }),
573                ..Default::default()
574            }),
575            ..Default::default()
576        },
577    ];
578
579    env.push(EnvVar {
580        name: "AWS_REGION".to_string(),
581        value: Some(config.region.clone()),
582        ..Default::default()
583    });
584
585    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
586
587    let mut args = vec![];
588
589    if let Some(helm_chart_version) = &config.helm_chart_version {
590        args.push(format!("--helm-chart-version={helm_chart_version}"));
591    }
592
593    // Add environment ID argument.
594    args.push(format!(
595        "--environment-id={}",
596        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
597    ));
598
599    // Add clusterd image argument based on environmentd tag.
600    args.push(format!(
601        "--clusterd-image={}",
602        matching_image_from_environmentd_image_ref(
603            &mz.spec.environmentd_image_ref,
604            "clusterd",
605            None
606        )
607    ));
608
609    // Add cluster and storage host size arguments.
610    args.extend(
611        [
612            config
613                .environmentd_cluster_replica_sizes
614                .as_ref()
615                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
616            config
617                .bootstrap_default_cluster_replica_size
618                .as_ref()
619                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
620            config
621                .bootstrap_builtin_system_cluster_replica_size
622                .as_ref()
623                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
624            config
625                .bootstrap_builtin_probe_cluster_replica_size
626                .as_ref()
627                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
628            config
629                .bootstrap_builtin_support_cluster_replica_size
630                .as_ref()
631                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
632            config
633                .bootstrap_builtin_catalog_server_cluster_replica_size
634                .as_ref()
635                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
636            config
637                .bootstrap_builtin_analytics_cluster_replica_size
638                .as_ref()
639                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
640            config
641                .bootstrap_builtin_system_cluster_replication_factor
642                .as_ref()
643                .map(|replication_factor| {
644                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
645                }),
646            config
647                .bootstrap_builtin_probe_cluster_replication_factor
648                .as_ref()
649                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
650            config
651                .bootstrap_builtin_support_cluster_replication_factor
652                .as_ref()
653                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
654            config
655                .bootstrap_builtin_analytics_cluster_replication_factor
656                .as_ref()
657                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
658        ]
659        .into_iter()
660        .flatten(),
661    );
662
663    args.extend(
664        config
665            .environmentd_allowed_origins
666            .iter()
667            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
668    );
669
670    args.push(format!(
671        "--secrets-controller={}",
672        config.secrets_controller
673    ));
674
675    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
676        if let Ok(cluster_replica_sizes) =
677            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
678        {
679            let cluster_replica_sizes: Vec<_> =
680                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
681            args.push(format!(
682                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
683                cluster_replica_sizes.join("', '")
684            ));
685        }
686    }
687    if !config.cloud_provider.is_cloud() {
688        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
689    }
690
691    if config.enable_internal_statement_logging {
692        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
693    }
694
695    if config.disable_statement_logging {
696        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
697    }
698
699    if !mz.spec.enable_rbac {
700        args.push("--system-parameter-default=enable_rbac_checks=false".into());
701    }
702
703    // Add persist arguments.
704
705    // Configure the Persist Isolated Runtime to use one less thread than the total available.
706    args.push("--persist-isolated-runtime-threads=-1".to_string());
707
708    // Add AWS arguments.
709    if config.cloud_provider == CloudProvider::Aws {
710        if let Some(azs) = config.environmentd_availability_zones.as_ref() {
711            for az in azs {
712                args.push(format!("--availability-zone={az}"));
713            }
714        }
715
716        if let Some(environmentd_connection_role_arn) = mz
717            .spec
718            .environmentd_connection_role_arn
719            .as_deref()
720            .or(config.environmentd_connection_role_arn.as_deref())
721        {
722            args.push(format!(
723                "--aws-connection-role-arn={}",
724                environmentd_connection_role_arn
725            ));
726        }
727        if let Some(account_id) = &config.aws_account_id {
728            args.push(format!("--aws-account-id={account_id}"));
729        }
730
731        args.extend([format!(
732            "--aws-secrets-controller-tags=Environment={}",
733            mz.name_unchecked()
734        )]);
735        args.extend_from_slice(&config.aws_secrets_controller_tags);
736    }
737
738    // Add Kubernetes arguments.
739    args.extend([
740        "--orchestrator=kubernetes".into(),
741        format!(
742            "--orchestrator-kubernetes-service-account={}",
743            &mz.service_account_name()
744        ),
745        format!(
746            "--orchestrator-kubernetes-image-pull-policy={}",
747            config.image_pull_policy.as_kebab_case_str(),
748        ),
749    ]);
750    for selector in &config.clusterd_node_selector {
751        args.push(format!(
752            "--orchestrator-kubernetes-service-node-selector={}={}",
753            selector.key, selector.value,
754        ));
755    }
756    if mz.meets_minimum_version(&V144) {
757        if let Some(affinity) = &config.clusterd_affinity {
758            let affinity = serde_json::to_string(affinity).unwrap();
759            args.push(format!(
760                "--orchestrator-kubernetes-service-affinity={affinity}"
761            ))
762        }
763        if let Some(tolerations) = &config.clusterd_tolerations {
764            let tolerations = serde_json::to_string(tolerations).unwrap();
765            args.push(format!(
766                "--orchestrator-kubernetes-service-tolerations={tolerations}"
767            ))
768        }
769    }
770    if let Some(scheduler_name) = &config.scheduler_name {
771        args.push(format!(
772            "--orchestrator-kubernetes-scheduler-name={}",
773            scheduler_name
774        ));
775    }
776    if mz.meets_minimum_version(&V154_DEV0) {
777        args.extend(
778            mz.spec
779                .pod_annotations
780                .as_ref()
781                .map(|annotations| annotations.iter())
782                .unwrap_or_default()
783                .map(|(key, val)| {
784                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
785                }),
786        );
787    }
788    args.extend(
789        mz.default_labels()
790            .iter()
791            .chain(
792                mz.spec
793                    .pod_labels
794                    .as_ref()
795                    .map(|labels| labels.iter())
796                    .unwrap_or_default(),
797            )
798            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
799    );
800    if let Some(status) = &mz.status {
801        args.push(format!(
802            "--orchestrator-kubernetes-name-prefix=mz{}-",
803            status.resource_id
804        ));
805    }
806
807    // Add logging and tracing arguments.
808    args.extend(["--log-format=json".into()]);
809    if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
810        args.push(format!("--opentelemetry-endpoint={}", endpoint));
811    }
812    // --opentelemetry-resource also configures sentry tags
813    args.extend([
814        format!(
815            "--opentelemetry-resource=organization_id={}",
816            mz.spec.environment_id
817        ),
818        format!(
819            "--opentelemetry-resource=environment_name={}",
820            mz.name_unchecked()
821        ),
822    ]);
823
824    if let Some(segment_api_key) = &config.segment_api_key {
825        args.push(format!("--segment-api-key={}", segment_api_key));
826        if config.segment_client_side {
827            args.push("--segment-client-side".into());
828        }
829    }
830
831    let mut volumes = Vec::new();
832    let mut volume_mounts = Vec::new();
833    if issuer_ref_defined(
834        &config.default_certificate_specs.internal,
835        &mz.spec.internal_certificate_spec,
836    ) {
837        volumes.push(Volume {
838            name: "certificate".to_owned(),
839            secret: Some(SecretVolumeSource {
840                default_mode: Some(0o400),
841                secret_name: Some(mz.environmentd_certificate_secret_name()),
842                items: None,
843                optional: Some(false),
844            }),
845            ..Default::default()
846        });
847        volume_mounts.push(VolumeMount {
848            name: "certificate".to_owned(),
849            mount_path: "/etc/materialized".to_owned(),
850            read_only: Some(true),
851            ..Default::default()
852        });
853        args.extend([
854            "--tls-mode=require".into(),
855            "--tls-cert=/etc/materialized/tls.crt".into(),
856            "--tls-key=/etc/materialized/tls.key".into(),
857        ]);
858    } else {
859        args.push("--tls-mode=disable".to_string());
860    }
861    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
862        args.push(format!(
863            "--orchestrator-kubernetes-ephemeral-volume-class={}",
864            ephemeral_volume_class
865        ));
866    }
867    // The `materialize` user used by clusterd always has gid 999.
868    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
869
870    // Add system_param configmap
871    // This feature was enabled in 0.163 but did not have testing until after 0.164.
872    // 0.165 should work with anything greater than 0.164 including v26 and v25.
873    if mz.meets_minimum_version(&V26_1_0) {
874        if let Some(ref name) = mz.spec.system_parameter_configmap_name {
875            volumes.push(Volume {
876                name: "system-params".to_string(),
877                config_map: Some(ConfigMapVolumeSource {
878                    default_mode: Some(0o400),
879                    name: name.to_owned(),
880                    items: None,
881                    optional: Some(true),
882                }),
883                ..Default::default()
884            });
885            volume_mounts.push(VolumeMount {
886                name: "system-params".to_string(),
887                // The user must write to the `system-params.json` entry in the config map
888                mount_path: "/system-params".to_owned(),
889                read_only: Some(true),
890                ..Default::default()
891            });
892            args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
893            args.push("--config-sync-loop-interval=1s".to_string());
894        }
895    }
896
897    // Add Sentry arguments.
898    if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
899        args.push(format!("--sentry-dsn={}", sentry_dsn));
900        if let Some(sentry_environment) = &config.tracing.sentry_environment {
901            args.push(format!("--sentry-environment={}", sentry_environment));
902        }
903        args.push(format!("--sentry-tag=region={}", config.region));
904    }
905
906    // Add Persist PubSub arguments
907    args.push(format!(
908        "--persist-pubsub-url=http://{}:{}",
909        mz.persist_pubsub_service_name(generation),
910        config.environmentd_internal_persist_pubsub_port,
911    ));
912    args.push(format!(
913        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
914        config.environmentd_internal_persist_pubsub_port
915    ));
916
917    args.push(format!("--deploy-generation={}", generation));
918
919    // Add URL for internal user impersonation endpoint
920    args.push(format!(
921        "--internal-console-redirect-url={}",
922        &config.internal_console_proxy_url,
923    ));
924
925    if !config.collect_pod_metrics {
926        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
927    }
928    if config.enable_prometheus_scrape_annotations {
929        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
930    }
931
932    // the --disable-license-key-checks environmentd flag only existed
933    // between these versions
934    if config.disable_license_key_checks {
935        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
936            args.push("--disable-license-key-checks".into());
937        }
938    }
939
940    // as of version 0.153, the ability to disable license key checks was
941    // removed, so we should always set up license keys in that case
942    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
943        || mz.meets_minimum_version(&V153)
944    {
945        volume_mounts.push(VolumeMount {
946            name: "license-key".to_string(),
947            mount_path: "/license_key".to_string(),
948            ..Default::default()
949        });
950        volumes.push(Volume {
951            name: "license-key".to_string(),
952            secret: Some(SecretVolumeSource {
953                default_mode: Some(256),
954                optional: Some(false),
955                secret_name: Some(mz.backend_secret_name()),
956                items: Some(vec![KeyToPath {
957                    key: "license_key".to_string(),
958                    path: "license_key".to_string(),
959                    ..Default::default()
960                }]),
961                ..Default::default()
962            }),
963            ..Default::default()
964        });
965        env.push(EnvVar {
966            name: "MZ_LICENSE_KEY".to_string(),
967            value: Some("/license_key/license_key".to_string()),
968            ..Default::default()
969        });
970    }
971
972    // Add user-specified extra arguments.
973    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
974        args.extend(extra_args.iter().cloned());
975    }
976
977    let probe = Probe {
978        initial_delay_seconds: Some(1),
979        failure_threshold: Some(12),
980        tcp_socket: Some(TCPSocketAction {
981            host: None,
982            port: IntOrString::Int(config.environmentd_sql_port.into()),
983        }),
984        ..Default::default()
985    };
986
987    let security_context = if config.enable_security_context {
988        // Since we want to adhere to the most restrictive security context, all
989        // of these fields have to be set how they are.
990        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
991        Some(SecurityContext {
992            run_as_non_root: Some(true),
993            capabilities: Some(Capabilities {
994                drop: Some(vec!["ALL".to_string()]),
995                ..Default::default()
996            }),
997            seccomp_profile: Some(SeccompProfile {
998                type_: "RuntimeDefault".to_string(),
999                ..Default::default()
1000            }),
1001            allow_privilege_escalation: Some(false),
1002            ..Default::default()
1003        })
1004    } else {
1005        None
1006    };
1007
1008    let ports = vec![
1009        ContainerPort {
1010            container_port: config.environmentd_sql_port.into(),
1011            name: Some("sql".to_owned()),
1012            ..Default::default()
1013        },
1014        ContainerPort {
1015            container_port: config.environmentd_internal_sql_port.into(),
1016            name: Some("internal-sql".to_owned()),
1017            ..Default::default()
1018        },
1019        ContainerPort {
1020            container_port: config.environmentd_http_port.into(),
1021            name: Some("http".to_owned()),
1022            ..Default::default()
1023        },
1024        ContainerPort {
1025            container_port: config.environmentd_internal_http_port.into(),
1026            name: Some("internal-http".to_owned()),
1027            ..Default::default()
1028        },
1029        ContainerPort {
1030            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1031            name: Some("persist-pubsub".to_owned()),
1032            ..Default::default()
1033        },
1034    ];
1035
1036    // Add networking arguments.
1037    if mz.meets_minimum_version(&V147_DEV0) {
1038        volume_mounts.push(VolumeMount {
1039            name: "listeners-configmap".to_string(),
1040            mount_path: "/listeners".to_string(),
1041            ..Default::default()
1042        });
1043        volumes.push(Volume {
1044            name: "listeners-configmap".to_string(),
1045            config_map: Some(ConfigMapVolumeSource {
1046                name: mz.listeners_configmap_name(generation),
1047                default_mode: Some(256),
1048                optional: Some(false),
1049                items: Some(vec![KeyToPath {
1050                    key: "listeners.json".to_string(),
1051                    path: "listeners.json".to_string(),
1052                    ..Default::default()
1053                }]),
1054            }),
1055            ..Default::default()
1056        });
1057        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1058        if matches!(
1059            mz.spec.authenticator_kind,
1060            AuthenticatorKind::Password | AuthenticatorKind::Sasl
1061        ) {
1062            args.push("--system-parameter-default=enable_password_auth=true".into());
1063            env.push(EnvVar {
1064                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1065                value_from: Some(EnvVarSource {
1066                    secret_key_ref: Some(SecretKeySelector {
1067                        name: mz.backend_secret_name(),
1068                        key: "external_login_password_mz_system".to_string(),
1069                        optional: Some(false),
1070                    }),
1071                    ..Default::default()
1072                }),
1073                ..Default::default()
1074            })
1075        }
1076    } else {
1077        args.extend([
1078            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1079            format!(
1080                "--http-listen-addr=0.0.0.0:{}",
1081                config.environmentd_http_port
1082            ),
1083            format!(
1084                "--internal-sql-listen-addr=0.0.0.0:{}",
1085                config.environmentd_internal_sql_port
1086            ),
1087            format!(
1088                "--internal-http-listen-addr=0.0.0.0:{}",
1089                config.environmentd_internal_http_port
1090            ),
1091        ]);
1092    }
1093
1094    let container = Container {
1095        name: "environmentd".to_owned(),
1096        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1097        image_pull_policy: Some(config.image_pull_policy.to_string()),
1098        ports: Some(ports),
1099        args: Some(args),
1100        env: Some(env),
1101        volume_mounts: Some(volume_mounts),
1102        liveness_probe: Some(probe.clone()),
1103        readiness_probe: Some(probe),
1104        resources: mz
1105            .spec
1106            .environmentd_resource_requirements
1107            .clone()
1108            .or_else(|| config.environmentd_default_resources.clone()),
1109        security_context: security_context.clone(),
1110        ..Default::default()
1111    };
1112
1113    let mut pod_template_labels = mz.default_labels();
1114    pod_template_labels.insert(
1115        "materialize.cloud/name".to_owned(),
1116        mz.environmentd_statefulset_name(generation),
1117    );
1118    pod_template_labels.insert(
1119        "materialize.cloud/app".to_owned(),
1120        mz.environmentd_app_name(),
1121    );
1122    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1123    pod_template_labels.extend(
1124        mz.spec
1125            .pod_labels
1126            .as_ref()
1127            .map(|labels| labels.iter())
1128            .unwrap_or_default()
1129            .map(|(key, value)| (key.clone(), value.clone())),
1130    );
1131
1132    let mut pod_template_annotations = btreemap! {
1133        // We can re-enable eviction once we have HA
1134        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1135
1136        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1137        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1138        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1139        "materialize.cloud/generation".to_owned() => generation.to_string(),
1140    };
1141    if config.enable_prometheus_scrape_annotations {
1142        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1143        pod_template_annotations.insert(
1144            "prometheus.io/port".to_owned(),
1145            config.environmentd_internal_http_port.to_string(),
1146        );
1147        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1148        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1149        pod_template_annotations.insert(
1150            "materialize.prometheus.io/mz_usage_path".to_owned(),
1151            "/metrics/mz_usage".to_string(),
1152        );
1153        pod_template_annotations.insert(
1154            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1155            "/metrics/mz_frontier".to_string(),
1156        );
1157        pod_template_annotations.insert(
1158            "materialize.prometheus.io/mz_compute_path".to_owned(),
1159            "/metrics/mz_compute".to_string(),
1160        );
1161        pod_template_annotations.insert(
1162            "materialize.prometheus.io/mz_storage_path".to_owned(),
1163            "/metrics/mz_storage".to_string(),
1164        );
1165    }
1166    pod_template_annotations.extend(
1167        mz.spec
1168            .pod_annotations
1169            .as_ref()
1170            .map(|annotations| annotations.iter())
1171            .unwrap_or_default()
1172            .map(|(key, value)| (key.clone(), value.clone())),
1173    );
1174
1175    let mut tolerations = vec![
1176        // When the node becomes `NotReady` it indicates there is a problem with the node,
1177        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1178        // But we want to limit this to 30s for faster recovery
1179        Toleration {
1180            effect: Some("NoExecute".into()),
1181            key: Some("node.kubernetes.io/not-ready".into()),
1182            operator: Some("Exists".into()),
1183            toleration_seconds: Some(30),
1184            value: None,
1185        },
1186        Toleration {
1187            effect: Some("NoExecute".into()),
1188            key: Some("node.kubernetes.io/unreachable".into()),
1189            operator: Some("Exists".into()),
1190            toleration_seconds: Some(30),
1191            value: None,
1192        },
1193    ];
1194    if let Some(user_tolerations) = &config.environmentd_tolerations {
1195        tolerations.extend(user_tolerations.iter().cloned());
1196    }
1197    let tolerations = Some(tolerations);
1198
1199    let pod_template_spec = PodTemplateSpec {
1200        // not using managed_resource_meta because the pod should be owned
1201        // by the statefulset, not the materialize instance
1202        metadata: Some(ObjectMeta {
1203            labels: Some(pod_template_labels),
1204            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1205            ..Default::default()
1206        }),
1207        spec: Some(PodSpec {
1208            containers: vec![container],
1209            node_selector: Some(
1210                config
1211                    .environmentd_node_selector
1212                    .iter()
1213                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1214                    .collect(),
1215            ),
1216            affinity: config.environmentd_affinity.clone(),
1217            scheduler_name: config.scheduler_name.clone(),
1218            service_account_name: Some(mz.service_account_name()),
1219            volumes: Some(volumes),
1220            security_context: Some(PodSecurityContext {
1221                fs_group: Some(999),
1222                run_as_user: Some(999),
1223                run_as_group: Some(999),
1224                ..Default::default()
1225            }),
1226            tolerations,
1227            // This (apparently) has the side effect of automatically starting a new pod
1228            // when the previous pod is currently terminating. This side steps the statefulset fencing
1229            // but allows for quicker recovery from a failed node
1230            // The Kubernetes documentation strongly advises against this
1231            // setting, as StatefulSets attempt to provide "at most once"
1232            // semantics [0]-- that is, the guarantee that for a given pod in a
1233            // StatefulSet there is *at most* one pod with that identity running
1234            // in the cluster
1235            //
1236            // Materialize, however, has been carefully designed to *not* rely
1237            // on this guarantee. (In fact, we do not believe that correct
1238            // distributed systems can meaningfully rely on Kubernetes's
1239            // guarantee--network packets from a pod can be arbitrarily delayed,
1240            // long past that pod's termination.) Running two `environmentd`
1241            // processes is safe: the newer process will safely and correctly
1242            // fence out the older process via CockroachDB. In the future,
1243            // we'll support running multiple `environmentd` processes in
1244            // parallel for high availability.
1245            //
1246            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1247            termination_grace_period_seconds: Some(0),
1248            ..Default::default()
1249        }),
1250    };
1251
1252    let mut match_labels = BTreeMap::new();
1253    match_labels.insert(
1254        "materialize.cloud/name".to_owned(),
1255        mz.environmentd_statefulset_name(generation),
1256    );
1257
1258    let statefulset_spec = StatefulSetSpec {
1259        replicas: Some(1),
1260        template: pod_template_spec,
1261        service_name: Some(mz.environmentd_service_name()),
1262        selector: LabelSelector {
1263            match_expressions: None,
1264            match_labels: Some(match_labels),
1265        },
1266        ..Default::default()
1267    };
1268
1269    StatefulSet {
1270        metadata: ObjectMeta {
1271            annotations: Some(btreemap! {
1272                "materialize.cloud/generation".to_owned() => generation.to_string(),
1273                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1274            }),
1275            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1276        },
1277        spec: Some(statefulset_spec),
1278        status: None,
1279    }
1280}
1281
1282fn create_connection_info(
1283    config: &super::Config,
1284    mz: &Materialize,
1285    generation: u64,
1286) -> ConnectionInfo {
1287    let external_enable_tls = issuer_ref_defined(
1288        &config.default_certificate_specs.internal,
1289        &mz.spec.internal_certificate_spec,
1290    );
1291    let authenticator_kind = mz.spec.authenticator_kind;
1292
1293    let mut listeners_config = ListenersConfig {
1294        sql: btreemap! {
1295            "external".to_owned() => SqlListenerConfig{
1296                addr: SocketAddr::new(
1297                    IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1298                    config.environmentd_sql_port,
1299                ),
1300                authenticator_kind,
1301                allowed_roles: AllowedRoles::Normal,
1302                enable_tls: external_enable_tls,
1303            },
1304            "internal".to_owned() => SqlListenerConfig{
1305                addr: SocketAddr::new(
1306                    IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1307                    config.environmentd_internal_sql_port,
1308                ),
1309                authenticator_kind: AuthenticatorKind::None,
1310                // Should this just be Internal?
1311                allowed_roles: AllowedRoles::NormalAndInternal,
1312                enable_tls: false,
1313            },
1314        },
1315        http: btreemap! {
1316            "external".to_owned() => HttpListenerConfig{
1317                base: BaseListenerConfig {
1318                    addr: SocketAddr::new(
1319                        IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1320                        config.environmentd_http_port,
1321                    ),
1322                    // SASL authentication is only supported for SQL (PostgreSQL wire protocol).
1323                    // HTTP listeners must use Password authentication when SASL is enabled.
1324                    // This is validated at environmentd startup via ListenerConfig::validate().
1325                    authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1326                        AuthenticatorKind::Password
1327                    } else {
1328                        authenticator_kind
1329                    },
1330                    allowed_roles: AllowedRoles::Normal,
1331                    enable_tls: external_enable_tls,
1332                },
1333                routes: HttpRoutesEnabled{
1334                    base: true,
1335                    webhook: true,
1336                    internal: false,
1337                    metrics: false,
1338                    profiling: false,
1339                }
1340            },
1341            "internal".to_owned() => HttpListenerConfig{
1342                base: BaseListenerConfig {
1343                    addr: SocketAddr::new(
1344                        IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1345                        config.environmentd_internal_http_port,
1346                    ),
1347                    authenticator_kind: AuthenticatorKind::None,
1348                    // Should this just be Internal?
1349                    allowed_roles: AllowedRoles::NormalAndInternal,
1350                    enable_tls: false,
1351                },
1352                routes: HttpRoutesEnabled{
1353                    base: true,
1354                    webhook: true,
1355                    internal: true,
1356                    metrics: true,
1357                    profiling: true,
1358                }
1359            },
1360        },
1361    };
1362
1363    if matches!(
1364        authenticator_kind,
1365        AuthenticatorKind::Password | AuthenticatorKind::Sasl
1366    ) {
1367        listeners_config.sql.remove("internal");
1368        listeners_config.http.remove("internal");
1369
1370        listeners_config.sql.get_mut("external").map(|listener| {
1371            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1372            listener
1373        });
1374        listeners_config.http.get_mut("external").map(|listener| {
1375            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1376            listener.routes.internal = true;
1377            listener.routes.profiling = true;
1378            listener
1379        });
1380
1381        listeners_config.http.insert(
1382            "metrics".to_owned(),
1383            HttpListenerConfig {
1384                base: BaseListenerConfig {
1385                    addr: SocketAddr::new(
1386                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1387                        config.environmentd_internal_http_port,
1388                    ),
1389                    authenticator_kind: AuthenticatorKind::None,
1390                    allowed_roles: AllowedRoles::NormalAndInternal,
1391                    enable_tls: false,
1392                },
1393                routes: HttpRoutesEnabled {
1394                    base: false,
1395                    webhook: false,
1396                    internal: false,
1397                    metrics: true,
1398                    profiling: false,
1399                },
1400            },
1401        );
1402    }
1403
1404    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1405    let listeners_configmap = ConfigMap {
1406        binary_data: None,
1407        data: Some(btreemap! {
1408            "listeners.json".to_owned() => listeners_json,
1409        }),
1410        immutable: None,
1411        metadata: ObjectMeta {
1412            annotations: Some(btreemap! {
1413                "materialize.cloud/generation".to_owned() => generation.to_string(),
1414            }),
1415            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1416        },
1417    };
1418
1419    let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1420        AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1421            let scheme = if external_enable_tls { "https" } else { "http" };
1422            (
1423                scheme,
1424                config.environmentd_http_port,
1425                Some(mz.spec.backend_secret_name.clone()),
1426            )
1427        }
1428        _ => ("http", config.environmentd_internal_http_port, None),
1429    };
1430    let environmentd_url = format!(
1431        "{}://{}.{}.svc.cluster.local:{}",
1432        scheme,
1433        mz.environmentd_generation_service_name(generation),
1434        mz.namespace(),
1435        leader_api_port,
1436    );
1437    ConnectionInfo {
1438        environmentd_url,
1439        listeners_configmap,
1440        mz_system_secret_name,
1441    }
1442}
1443
1444// see materialize/src/environmentd/src/http.rs
1445#[derive(Debug, Deserialize, PartialEq, Eq)]
1446struct BecomeLeaderResponse {
1447    result: BecomeLeaderResult,
1448}
1449
1450#[derive(Debug, Deserialize, PartialEq, Eq)]
1451enum BecomeLeaderResult {
1452    Success,
1453    Failure { message: String },
1454}
1455
1456#[derive(Debug, Deserialize, PartialEq, Eq)]
1457struct SkipCatchupError {
1458    message: String,
1459}