Skip to main content

mz_orchestratord/controller/materialize/
generation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25        },
26    },
27    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33    ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51    major: 0,
52    minor: 140,
53    patch: 0,
54    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 147,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68    major: 0,
69    minor: 154,
70    patch: 0,
71    pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 26,
78    minor: 1,
79    patch: 0,
80    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84/// Describes the status of a deployment.
85///
86/// This is a simplified representation of `DeploymentState`, suitable for
87/// announcement to the external orchestrator.
88#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
89pub enum DeploymentStatus {
90    /// This deployment is not the leader. It is initializing and is not yet
91    /// ready to become the leader.
92    Initializing,
93    /// This deployment is not the leader, but it is ready to become the leader.
94    ReadyToPromote,
95    /// This deployment is in the process of becoming the leader.
96    Promoting,
97    /// This deployment is the leader.
98    IsLeader,
99}
100
101#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
102pub struct GetLeaderStatusResponse {
103    status: DeploymentStatus,
104}
105
106#[derive(Deserialize, Serialize)]
107pub struct LoginCredentials {
108    username: String,
109    // TODO Password type?
110    password: String,
111}
112
113#[derive(Debug, Serialize)]
114pub struct ConnectionInfo {
115    pub environmentd_url: String,
116    pub mz_system_secret_name: Option<String>,
117    pub listeners_configmap: ConfigMap,
118}
119
120#[derive(Debug, Serialize)]
121pub struct Resources {
122    pub generation: u64,
123    pub public_service: Box<Service>,
124    pub generation_service: Box<Service>,
125    pub persist_pubsub_service: Box<Service>,
126    pub environmentd_statefulset: Box<StatefulSet>,
127    pub connection_info: Box<ConnectionInfo>,
128}
129
130impl Resources {
131    pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
132        let public_service = Box::new(create_public_service_object(config, mz, generation));
133        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
134        let persist_pubsub_service =
135            Box::new(create_persist_pubsub_service(config, mz, generation));
136        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
137            config, mz, generation,
138        ));
139        let connection_info = Box::new(create_connection_info(config, mz, generation));
140
141        Self {
142            generation,
143            public_service,
144            generation_service,
145            persist_pubsub_service,
146            environmentd_statefulset,
147            connection_info,
148        }
149    }
150
151    #[instrument]
152    pub async fn apply(
153        &self,
154        client: &Client,
155        force_promote: bool,
156        namespace: &str,
157    ) -> Result<Option<Action>, Error> {
158        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
159        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
160        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
161
162        trace!("applying environmentd per-generation service");
163        apply_resource(&service_api, &*self.generation_service).await?;
164
165        trace!("creating persist pubsub service");
166        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
167
168        trace!("applying listeners configmap");
169        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
170
171        trace!("creating new environmentd statefulset");
172        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
173
174        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
175
176        let statefulset = get_resource(
177            &statefulset_api,
178            &self.environmentd_statefulset.name_unchecked(),
179        )
180        .await?;
181        if statefulset
182            .and_then(|statefulset| statefulset.status)
183            .and_then(|status| status.ready_replicas)
184            .unwrap_or(0)
185            == 0
186        {
187            trace!("environmentd statefulset is not ready yet...");
188            return Ok(Some(retry_action));
189        }
190
191        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
192            return Ok(Some(retry_action));
193        };
194        let status_url = reqwest::Url::parse(&format!(
195            "{}/api/leader/status",
196            self.connection_info.environmentd_url,
197        ))
198        .unwrap();
199
200        match http_client.get(status_url.clone()).send().await {
201            Ok(response) => {
202                let response: GetLeaderStatusResponse = match response.error_for_status() {
203                    Ok(response) => response.json().await?,
204                    Err(e) => {
205                        trace!("failed to get status of environmentd, retrying... ({e})");
206                        return Ok(Some(retry_action));
207                    }
208                };
209                if force_promote {
210                    trace!("skipping cluster catchup");
211                    let skip_catchup_url = reqwest::Url::parse(&format!(
212                        "{}/api/leader/skip-catchup",
213                        self.connection_info.environmentd_url,
214                    ))
215                    .unwrap();
216                    let response = http_client.post(skip_catchup_url).send().await?;
217                    if response.status() == StatusCode::BAD_REQUEST {
218                        let err: SkipCatchupError = response.json().await?;
219                        return Err(
220                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
221                        );
222                    }
223                } else {
224                    match response.status {
225                        DeploymentStatus::Initializing => {
226                            trace!("environmentd is still initializing, retrying...");
227                            return Ok(Some(retry_action));
228                        }
229                        DeploymentStatus::ReadyToPromote
230                        | DeploymentStatus::Promoting
231                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
232                    }
233                }
234            }
235            Err(e) => {
236                trace!("failed to connect to environmentd, retrying... ({e})");
237                return Ok(Some(retry_action));
238            }
239        }
240
241        Ok(None)
242    }
243
244    #[instrument]
245    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
246        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
247        Some(match &self.connection_info.mz_system_secret_name {
248            Some(mz_system_secret_name) => {
249                let http_client = reqwest::Client::builder()
250                    .timeout(std::time::Duration::from_secs(10))
251                    .cookie_store(true)
252                    // TODO add_root_certificate instead
253                    .danger_accept_invalid_certs(true)
254                    .build()
255                    .unwrap();
256
257                let secret = secret_api
258                    .get(mz_system_secret_name)
259                    .await
260                    .map_err(|e| {
261                        error!("Failed to get backend secret: {:?}", e);
262                        e
263                    })
264                    .ok()?;
265                if let Some(data) = secret.data {
266                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
267                        let password = String::from_utf8_lossy(&password.0).to_string();
268                        let login_url = reqwest::Url::parse(&format!(
269                            "{}/api/login",
270                            self.connection_info.environmentd_url,
271                        ))
272                        .unwrap();
273                        match http_client
274                            .post(login_url)
275                            .body(
276                                serde_json::to_string(&LoginCredentials {
277                                    username: "mz_system".to_owned(),
278                                    password,
279                                })
280                                .expect(
281                                    "Serializing a simple struct with utf8 strings doesn't fail.",
282                                ),
283                            )
284                            .header("Content-Type", "application/json")
285                            .send()
286                            .await
287                        {
288                            Ok(response) => {
289                                if let Err(e) = response.error_for_status() {
290                                    trace!("failed to login to environmentd, retrying... ({e})");
291                                    return None;
292                                }
293                            }
294                            Err(e) => {
295                                trace!("failed to connect to environmentd, retrying... ({e})");
296                                return None;
297                            }
298                        };
299                    }
300                };
301                http_client
302            }
303            None => reqwest::Client::builder()
304                .timeout(std::time::Duration::from_secs(10))
305                .build()
306                .unwrap(),
307        })
308    }
309
310    #[instrument]
311    pub async fn promote_services(
312        &self,
313        client: &Client,
314        namespace: &str,
315    ) -> Result<Option<Action>, Error> {
316        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
317        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
318
319        let promote_url = reqwest::Url::parse(&format!(
320            "{}/api/leader/promote",
321            self.connection_info.environmentd_url,
322        ))
323        .unwrap();
324
325        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
326            return Ok(Some(retry_action));
327        };
328
329        trace!("promoting new environmentd to leader");
330        let response = http_client.post(promote_url).send().await?;
331        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
332        if let BecomeLeaderResult::Failure { message } = response.result {
333            return Err(Error::Anyhow(anyhow::anyhow!(
334                "failed to promote new environmentd: {message}"
335            )));
336        }
337
338        // A successful POST to the promotion endpoint only indicates
339        // that the promotion process was kicked off. It does not
340        // guarantee that the environment will be successfully promoted
341        // (e.g., if the environment crashes immediately after responding
342        // to the request, but before executing the takeover, the
343        // promotion will be lost).
344        //
345        // To guarantee the environment has been promoted successfully,
346        // we must wait to see at least one `IsLeader` status returned
347        // from the environment.
348
349        let status_url = reqwest::Url::parse(&format!(
350            "{}/api/leader/status",
351            self.connection_info.environmentd_url,
352        ))
353        .unwrap();
354        match http_client.get(status_url.clone()).send().await {
355            Ok(response) => {
356                let response: GetLeaderStatusResponse = response.json().await?;
357                if response.status != DeploymentStatus::IsLeader {
358                    trace!(
359                        "environmentd is still promoting (status: {:?}), retrying...",
360                        response.status
361                    );
362                    return Ok(Some(retry_action));
363                } else {
364                    trace!("environmentd is ready");
365                }
366            }
367            Err(e) => {
368                trace!("failed to connect to environmentd, retrying... ({e})");
369                return Ok(Some(retry_action));
370            }
371        }
372
373        trace!("applying environmentd public service");
374        apply_resource(&service_api, &*self.public_service).await?;
375
376        Ok(None)
377    }
378
379    #[instrument]
380    pub async fn teardown_generation(
381        &self,
382        client: &Client,
383        mz: &Materialize,
384        generation: u64,
385    ) -> Result<(), anyhow::Error> {
386        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
387        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
388        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
389
390        trace!("deleting environmentd statefulset for generation {generation}");
391        delete_resource(
392            &statefulset_api,
393            &mz.environmentd_statefulset_name(generation),
394        )
395        .await?;
396
397        trace!("deleting persist pubsub service for generation {generation}");
398        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
399
400        trace!("deleting environmentd per-generation service for generation {generation}");
401        delete_resource(
402            &service_api,
403            &mz.environmentd_generation_service_name(generation),
404        )
405        .await?;
406
407        trace!("deleting listeners configmap for generation {generation}");
408        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
409
410        Ok(())
411    }
412
413    // ideally we would just be able to hash the objects directly, but the
414    // generated kubernetes objects don't implement the Hash trait
415    pub fn generate_hash(&self) -> String {
416        let mut hasher = Sha256::new();
417        hasher.update(&serde_json::to_string(self).unwrap());
418        format!("{:x}", hasher.finalize())
419    }
420}
421
422fn create_public_service_object(
423    config: &super::Config,
424    mz: &Materialize,
425    generation: u64,
426) -> Service {
427    create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
428}
429
430fn create_generation_service_object(
431    config: &super::Config,
432    mz: &Materialize,
433    generation: u64,
434) -> Service {
435    create_base_service_object(
436        config,
437        mz,
438        generation,
439        &mz.environmentd_generation_service_name(generation),
440    )
441}
442
443fn create_base_service_object(
444    config: &super::Config,
445    mz: &Materialize,
446    generation: u64,
447    service_name: &str,
448) -> Service {
449    let ports = vec![
450        ServicePort {
451            port: config.environmentd_sql_port.into(),
452            protocol: Some("TCP".to_string()),
453            name: Some("sql".to_string()),
454            ..Default::default()
455        },
456        ServicePort {
457            port: config.environmentd_http_port.into(),
458            protocol: Some("TCP".to_string()),
459            name: Some("https".to_string()),
460            ..Default::default()
461        },
462        ServicePort {
463            port: config.environmentd_internal_sql_port.into(),
464            protocol: Some("TCP".to_string()),
465            name: Some("internal-sql".to_string()),
466            ..Default::default()
467        },
468        ServicePort {
469            port: config.environmentd_internal_http_port.into(),
470            protocol: Some("TCP".to_string()),
471            name: Some("internal-http".to_string()),
472            ..Default::default()
473        },
474    ];
475
476    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
477
478    let spec = ServiceSpec {
479        type_: Some("ClusterIP".to_string()),
480        cluster_ip: Some("None".to_string()),
481        selector: Some(selector),
482        ports: Some(ports),
483        ..Default::default()
484    };
485
486    Service {
487        metadata: mz.managed_resource_meta(service_name.to_string()),
488        spec: Some(spec),
489        status: None,
490    }
491}
492
493fn create_persist_pubsub_service(
494    config: &super::Config,
495    mz: &Materialize,
496    generation: u64,
497) -> Service {
498    Service {
499        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
500        spec: Some(ServiceSpec {
501            type_: Some("ClusterIP".to_string()),
502            cluster_ip: Some("None".to_string()),
503            selector: Some(btreemap! {
504                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
505            }),
506            ports: Some(vec![ServicePort {
507                name: Some("grpc".to_string()),
508                protocol: Some("TCP".to_string()),
509                port: config.environmentd_internal_persist_pubsub_port.into(),
510                ..Default::default()
511            }]),
512            ..Default::default()
513        }),
514        status: None,
515    }
516}
517
518fn create_environmentd_statefulset_object(
519    config: &super::Config,
520    mz: &Materialize,
521    generation: u64,
522) -> StatefulSet {
523    // IMPORTANT: Only pass secrets via environment variables. All other
524    // parameters should be passed as command line arguments, possibly gated
525    // with a `meets_minimum_version` call. This ensures typos cause
526    // `environmentd` to fail to start, rather than silently ignoring the
527    // configuration.
528    //
529    // When passing a secret, use a `SecretKeySelector` to forward a secret into
530    // the pod. Do *not* hardcode a secret as the value directly, as doing so
531    // will leak the secret to anyone with permission to describe the pod.
532    let mut env = vec![
533        EnvVar {
534            name: "MZ_METADATA_BACKEND_URL".to_string(),
535            value_from: Some(EnvVarSource {
536                secret_key_ref: Some(SecretKeySelector {
537                    name: mz.backend_secret_name(),
538                    key: "metadata_backend_url".to_string(),
539                    optional: Some(false),
540                }),
541                ..Default::default()
542            }),
543            ..Default::default()
544        },
545        EnvVar {
546            name: "MZ_PERSIST_BLOB_URL".to_string(),
547            value_from: Some(EnvVarSource {
548                secret_key_ref: Some(SecretKeySelector {
549                    name: mz.backend_secret_name(),
550                    key: "persist_backend_url".to_string(),
551                    optional: Some(false),
552                }),
553                ..Default::default()
554            }),
555            ..Default::default()
556        },
557    ];
558
559    env.push(EnvVar {
560        name: "AWS_REGION".to_string(),
561        value: Some(config.region.clone()),
562        ..Default::default()
563    });
564
565    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
566
567    let mut args = vec![];
568
569    if let Some(helm_chart_version) = &config.helm_chart_version {
570        args.push(format!("--helm-chart-version={helm_chart_version}"));
571    }
572
573    // Add environment ID argument.
574    args.push(format!(
575        "--environment-id={}",
576        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
577    ));
578
579    // Add clusterd image argument based on environmentd tag.
580    args.push(format!(
581        "--clusterd-image={}",
582        matching_image_from_environmentd_image_ref(
583            &mz.spec.environmentd_image_ref,
584            "clusterd",
585            None
586        )
587    ));
588
589    // Add cluster and storage host size arguments.
590    args.extend(
591        [
592            config
593                .environmentd_cluster_replica_sizes
594                .as_ref()
595                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
596            config
597                .bootstrap_default_cluster_replica_size
598                .as_ref()
599                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
600            config
601                .bootstrap_builtin_system_cluster_replica_size
602                .as_ref()
603                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
604            config
605                .bootstrap_builtin_probe_cluster_replica_size
606                .as_ref()
607                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
608            config
609                .bootstrap_builtin_support_cluster_replica_size
610                .as_ref()
611                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
612            config
613                .bootstrap_builtin_catalog_server_cluster_replica_size
614                .as_ref()
615                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
616            config
617                .bootstrap_builtin_analytics_cluster_replica_size
618                .as_ref()
619                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
620            config
621                .bootstrap_builtin_system_cluster_replication_factor
622                .as_ref()
623                .map(|replication_factor| {
624                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
625                }),
626            config
627                .bootstrap_builtin_probe_cluster_replication_factor
628                .as_ref()
629                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
630            config
631                .bootstrap_builtin_support_cluster_replication_factor
632                .as_ref()
633                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
634            config
635                .bootstrap_builtin_analytics_cluster_replication_factor
636                .as_ref()
637                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
638        ]
639        .into_iter()
640        .flatten(),
641    );
642
643    args.extend(
644        config
645            .environmentd_allowed_origins
646            .iter()
647            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
648    );
649
650    args.push(format!(
651        "--secrets-controller={}",
652        config.secrets_controller
653    ));
654
655    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
656        if let Ok(cluster_replica_sizes) =
657            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
658        {
659            let cluster_replica_sizes: Vec<_> =
660                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
661            args.push(format!(
662                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
663                cluster_replica_sizes.join("', '")
664            ));
665        }
666    }
667    if !config.cloud_provider.is_cloud() {
668        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
669    }
670
671    if config.enable_internal_statement_logging {
672        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
673    }
674
675    if config.disable_statement_logging {
676        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
677    }
678
679    if !mz.spec.enable_rbac {
680        args.push("--system-parameter-default=enable_rbac_checks=false".into());
681    }
682
683    // Add persist arguments.
684
685    // Configure the Persist Isolated Runtime to use one less thread than the total available.
686    args.push("--persist-isolated-runtime-threads=-1".to_string());
687
688    // Add AWS arguments.
689    if config.cloud_provider == CloudProvider::Aws {
690        if let Some(azs) = config.environmentd_availability_zones.as_ref() {
691            for az in azs {
692                args.push(format!("--availability-zone={az}"));
693            }
694        }
695
696        if let Some(environmentd_connection_role_arn) = mz
697            .spec
698            .environmentd_connection_role_arn
699            .as_deref()
700            .or(config.environmentd_connection_role_arn.as_deref())
701        {
702            args.push(format!(
703                "--aws-connection-role-arn={}",
704                environmentd_connection_role_arn
705            ));
706        }
707        if let Some(account_id) = &config.aws_account_id {
708            args.push(format!("--aws-account-id={account_id}"));
709        }
710
711        args.extend([format!(
712            "--aws-secrets-controller-tags=Environment={}",
713            mz.name_unchecked()
714        )]);
715        args.extend_from_slice(&config.aws_secrets_controller_tags);
716    }
717
718    // Add Kubernetes arguments.
719    args.extend([
720        "--orchestrator=kubernetes".into(),
721        format!(
722            "--orchestrator-kubernetes-service-account={}",
723            &mz.service_account_name()
724        ),
725        format!(
726            "--orchestrator-kubernetes-image-pull-policy={}",
727            config.image_pull_policy.as_kebab_case_str(),
728        ),
729    ]);
730    for selector in &config.clusterd_node_selector {
731        args.push(format!(
732            "--orchestrator-kubernetes-service-node-selector={}={}",
733            selector.key, selector.value,
734        ));
735    }
736    if mz.meets_minimum_version(&V144) {
737        if let Some(affinity) = &config.clusterd_affinity {
738            let affinity = serde_json::to_string(affinity).unwrap();
739            args.push(format!(
740                "--orchestrator-kubernetes-service-affinity={affinity}"
741            ))
742        }
743        if let Some(tolerations) = &config.clusterd_tolerations {
744            let tolerations = serde_json::to_string(tolerations).unwrap();
745            args.push(format!(
746                "--orchestrator-kubernetes-service-tolerations={tolerations}"
747            ))
748        }
749    }
750    if let Some(scheduler_name) = &config.scheduler_name {
751        args.push(format!(
752            "--orchestrator-kubernetes-scheduler-name={}",
753            scheduler_name
754        ));
755    }
756    if mz.meets_minimum_version(&V154_DEV0) {
757        args.extend(
758            mz.spec
759                .pod_annotations
760                .as_ref()
761                .map(|annotations| annotations.iter())
762                .unwrap_or_default()
763                .map(|(key, val)| {
764                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
765                }),
766        );
767    }
768    args.extend(
769        mz.default_labels()
770            .iter()
771            .chain(
772                mz.spec
773                    .pod_labels
774                    .as_ref()
775                    .map(|labels| labels.iter())
776                    .unwrap_or_default(),
777            )
778            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
779    );
780    if let Some(status) = &mz.status {
781        args.push(format!(
782            "--orchestrator-kubernetes-name-prefix=mz{}-",
783            status.resource_id
784        ));
785    }
786
787    // Add logging and tracing arguments.
788    args.extend(["--log-format=json".into()]);
789    if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
790        args.push(format!("--opentelemetry-endpoint={}", endpoint));
791    }
792    // --opentelemetry-resource also configures sentry tags
793    args.extend([
794        format!(
795            "--opentelemetry-resource=organization_id={}",
796            mz.spec.environment_id
797        ),
798        format!(
799            "--opentelemetry-resource=environment_name={}",
800            mz.name_unchecked()
801        ),
802    ]);
803
804    if let Some(segment_api_key) = &config.segment_api_key {
805        args.push(format!("--segment-api-key={}", segment_api_key));
806        if config.segment_client_side {
807            args.push("--segment-client-side".into());
808        }
809    }
810
811    let mut volumes = Vec::new();
812    let mut volume_mounts = Vec::new();
813    if issuer_ref_defined(
814        &config.default_certificate_specs.internal,
815        &mz.spec.internal_certificate_spec,
816    ) {
817        volumes.push(Volume {
818            name: "certificate".to_owned(),
819            secret: Some(SecretVolumeSource {
820                default_mode: Some(0o400),
821                secret_name: Some(mz.environmentd_certificate_secret_name()),
822                items: None,
823                optional: Some(false),
824            }),
825            ..Default::default()
826        });
827        volume_mounts.push(VolumeMount {
828            name: "certificate".to_owned(),
829            mount_path: "/etc/materialized".to_owned(),
830            read_only: Some(true),
831            ..Default::default()
832        });
833        args.extend([
834            "--tls-mode=require".into(),
835            "--tls-cert=/etc/materialized/tls.crt".into(),
836            "--tls-key=/etc/materialized/tls.key".into(),
837        ]);
838    } else {
839        args.push("--tls-mode=disable".to_string());
840    }
841    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
842        args.push(format!(
843            "--orchestrator-kubernetes-ephemeral-volume-class={}",
844            ephemeral_volume_class
845        ));
846    }
847    // The `materialize` user used by clusterd always has gid 999.
848    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
849
850    // Add system_param configmap
851    // This feature was enabled in 0.163 but did not have testing until after 0.164.
852    // 0.165 should work with anything greater than 0.164 including v26 and v25.
853    if mz.meets_minimum_version(&V26_1_0) {
854        if let Some(ref name) = mz.spec.system_parameter_configmap_name {
855            volumes.push(Volume {
856                name: "system-params".to_string(),
857                config_map: Some(ConfigMapVolumeSource {
858                    default_mode: Some(0o400),
859                    name: name.to_owned(),
860                    items: None,
861                    optional: Some(true),
862                }),
863                ..Default::default()
864            });
865            volume_mounts.push(VolumeMount {
866                name: "system-params".to_string(),
867                // The user must write to the `system-params.json` entry in the config map
868                mount_path: "/system-params".to_owned(),
869                read_only: Some(true),
870                ..Default::default()
871            });
872            args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
873            args.push("--config-sync-loop-interval=1s".to_string());
874        }
875    }
876
877    // Add Sentry arguments.
878    if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
879        args.push(format!("--sentry-dsn={}", sentry_dsn));
880        if let Some(sentry_environment) = &config.tracing.sentry_environment {
881            args.push(format!("--sentry-environment={}", sentry_environment));
882        }
883        args.push(format!("--sentry-tag=region={}", config.region));
884    }
885
886    // Add Persist PubSub arguments
887    args.push(format!(
888        "--persist-pubsub-url=http://{}:{}",
889        mz.persist_pubsub_service_name(generation),
890        config.environmentd_internal_persist_pubsub_port,
891    ));
892    args.push(format!(
893        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
894        config.environmentd_internal_persist_pubsub_port
895    ));
896
897    args.push(format!("--deploy-generation={}", generation));
898
899    // Add URL for internal user impersonation endpoint
900    args.push(format!(
901        "--internal-console-redirect-url={}",
902        &config.internal_console_proxy_url,
903    ));
904
905    if !config.collect_pod_metrics {
906        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
907    }
908    if config.enable_prometheus_scrape_annotations {
909        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
910    }
911
912    // the --disable-license-key-checks environmentd flag only existed
913    // between these versions
914    if config.disable_license_key_checks {
915        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
916            args.push("--disable-license-key-checks".into());
917        }
918    }
919
920    // as of version 0.153, the ability to disable license key checks was
921    // removed, so we should always set up license keys in that case
922    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
923        || mz.meets_minimum_version(&V153)
924    {
925        volume_mounts.push(VolumeMount {
926            name: "license-key".to_string(),
927            mount_path: "/license_key".to_string(),
928            ..Default::default()
929        });
930        volumes.push(Volume {
931            name: "license-key".to_string(),
932            secret: Some(SecretVolumeSource {
933                default_mode: Some(256),
934                optional: Some(false),
935                secret_name: Some(mz.backend_secret_name()),
936                items: Some(vec![KeyToPath {
937                    key: "license_key".to_string(),
938                    path: "license_key".to_string(),
939                    ..Default::default()
940                }]),
941                ..Default::default()
942            }),
943            ..Default::default()
944        });
945        env.push(EnvVar {
946            name: "MZ_LICENSE_KEY".to_string(),
947            value: Some("/license_key/license_key".to_string()),
948            ..Default::default()
949        });
950    }
951
952    // Add user-specified extra arguments.
953    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
954        args.extend(extra_args.iter().cloned());
955    }
956
957    let probe = Probe {
958        initial_delay_seconds: Some(1),
959        failure_threshold: Some(12),
960        tcp_socket: Some(TCPSocketAction {
961            host: None,
962            port: IntOrString::Int(config.environmentd_sql_port.into()),
963        }),
964        ..Default::default()
965    };
966
967    let security_context = if config.enable_security_context {
968        // Since we want to adhere to the most restrictive security context, all
969        // of these fields have to be set how they are.
970        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
971        Some(SecurityContext {
972            run_as_non_root: Some(true),
973            capabilities: Some(Capabilities {
974                drop: Some(vec!["ALL".to_string()]),
975                ..Default::default()
976            }),
977            seccomp_profile: Some(SeccompProfile {
978                type_: "RuntimeDefault".to_string(),
979                ..Default::default()
980            }),
981            allow_privilege_escalation: Some(false),
982            ..Default::default()
983        })
984    } else {
985        None
986    };
987
988    let ports = vec![
989        ContainerPort {
990            container_port: config.environmentd_sql_port.into(),
991            name: Some("sql".to_owned()),
992            ..Default::default()
993        },
994        ContainerPort {
995            container_port: config.environmentd_internal_sql_port.into(),
996            name: Some("internal-sql".to_owned()),
997            ..Default::default()
998        },
999        ContainerPort {
1000            container_port: config.environmentd_http_port.into(),
1001            name: Some("http".to_owned()),
1002            ..Default::default()
1003        },
1004        ContainerPort {
1005            container_port: config.environmentd_internal_http_port.into(),
1006            name: Some("internal-http".to_owned()),
1007            ..Default::default()
1008        },
1009        ContainerPort {
1010            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1011            name: Some("persist-pubsub".to_owned()),
1012            ..Default::default()
1013        },
1014    ];
1015
1016    // Add networking arguments.
1017    if mz.meets_minimum_version(&V147_DEV0) {
1018        volume_mounts.push(VolumeMount {
1019            name: "listeners-configmap".to_string(),
1020            mount_path: "/listeners".to_string(),
1021            ..Default::default()
1022        });
1023        volumes.push(Volume {
1024            name: "listeners-configmap".to_string(),
1025            config_map: Some(ConfigMapVolumeSource {
1026                name: mz.listeners_configmap_name(generation),
1027                default_mode: Some(256),
1028                optional: Some(false),
1029                items: Some(vec![KeyToPath {
1030                    key: "listeners.json".to_string(),
1031                    path: "listeners.json".to_string(),
1032                    ..Default::default()
1033                }]),
1034            }),
1035            ..Default::default()
1036        });
1037        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1038        if matches!(
1039            mz.spec.authenticator_kind,
1040            AuthenticatorKind::Password | AuthenticatorKind::Sasl
1041        ) {
1042            args.push("--system-parameter-default=enable_password_auth=true".into());
1043            env.push(EnvVar {
1044                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1045                value_from: Some(EnvVarSource {
1046                    secret_key_ref: Some(SecretKeySelector {
1047                        name: mz.backend_secret_name(),
1048                        key: "external_login_password_mz_system".to_string(),
1049                        optional: Some(false),
1050                    }),
1051                    ..Default::default()
1052                }),
1053                ..Default::default()
1054            })
1055        }
1056    } else {
1057        args.extend([
1058            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1059            format!(
1060                "--http-listen-addr=0.0.0.0:{}",
1061                config.environmentd_http_port
1062            ),
1063            format!(
1064                "--internal-sql-listen-addr=0.0.0.0:{}",
1065                config.environmentd_internal_sql_port
1066            ),
1067            format!(
1068                "--internal-http-listen-addr=0.0.0.0:{}",
1069                config.environmentd_internal_http_port
1070            ),
1071        ]);
1072    }
1073
1074    let container = Container {
1075        name: "environmentd".to_owned(),
1076        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1077        image_pull_policy: Some(config.image_pull_policy.to_string()),
1078        ports: Some(ports),
1079        args: Some(args),
1080        env: Some(env),
1081        volume_mounts: Some(volume_mounts),
1082        liveness_probe: Some(probe.clone()),
1083        readiness_probe: Some(probe),
1084        resources: mz
1085            .spec
1086            .environmentd_resource_requirements
1087            .clone()
1088            .or_else(|| config.environmentd_default_resources.clone()),
1089        security_context: security_context.clone(),
1090        ..Default::default()
1091    };
1092
1093    let mut pod_template_labels = mz.default_labels();
1094    pod_template_labels.insert(
1095        "materialize.cloud/name".to_owned(),
1096        mz.environmentd_statefulset_name(generation),
1097    );
1098    pod_template_labels.insert(
1099        "materialize.cloud/app".to_owned(),
1100        mz.environmentd_app_name(),
1101    );
1102    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1103    pod_template_labels.extend(
1104        mz.spec
1105            .pod_labels
1106            .as_ref()
1107            .map(|labels| labels.iter())
1108            .unwrap_or_default()
1109            .map(|(key, value)| (key.clone(), value.clone())),
1110    );
1111
1112    let mut pod_template_annotations = btreemap! {
1113        // We can re-enable eviction once we have HA
1114        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1115
1116        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1117        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1118        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1119        "materialize.cloud/generation".to_owned() => generation.to_string(),
1120    };
1121    if config.enable_prometheus_scrape_annotations {
1122        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1123        pod_template_annotations.insert(
1124            "prometheus.io/port".to_owned(),
1125            config.environmentd_internal_http_port.to_string(),
1126        );
1127        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1128        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1129        pod_template_annotations.insert(
1130            "materialize.prometheus.io/mz_usage_path".to_owned(),
1131            "/metrics/mz_usage".to_string(),
1132        );
1133        pod_template_annotations.insert(
1134            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1135            "/metrics/mz_frontier".to_string(),
1136        );
1137        pod_template_annotations.insert(
1138            "materialize.prometheus.io/mz_compute_path".to_owned(),
1139            "/metrics/mz_compute".to_string(),
1140        );
1141        pod_template_annotations.insert(
1142            "materialize.prometheus.io/mz_storage_path".to_owned(),
1143            "/metrics/mz_storage".to_string(),
1144        );
1145    }
1146    pod_template_annotations.extend(
1147        mz.spec
1148            .pod_annotations
1149            .as_ref()
1150            .map(|annotations| annotations.iter())
1151            .unwrap_or_default()
1152            .map(|(key, value)| (key.clone(), value.clone())),
1153    );
1154
1155    let mut tolerations = vec![
1156        // When the node becomes `NotReady` it indicates there is a problem with the node,
1157        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1158        // But we want to limit this to 30s for faster recovery
1159        Toleration {
1160            effect: Some("NoExecute".into()),
1161            key: Some("node.kubernetes.io/not-ready".into()),
1162            operator: Some("Exists".into()),
1163            toleration_seconds: Some(30),
1164            value: None,
1165        },
1166        Toleration {
1167            effect: Some("NoExecute".into()),
1168            key: Some("node.kubernetes.io/unreachable".into()),
1169            operator: Some("Exists".into()),
1170            toleration_seconds: Some(30),
1171            value: None,
1172        },
1173    ];
1174    if let Some(user_tolerations) = &config.environmentd_tolerations {
1175        tolerations.extend(user_tolerations.iter().cloned());
1176    }
1177    let tolerations = Some(tolerations);
1178
1179    let pod_template_spec = PodTemplateSpec {
1180        // not using managed_resource_meta because the pod should be owned
1181        // by the statefulset, not the materialize instance
1182        metadata: Some(ObjectMeta {
1183            labels: Some(pod_template_labels),
1184            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1185            ..Default::default()
1186        }),
1187        spec: Some(PodSpec {
1188            containers: vec![container],
1189            node_selector: Some(
1190                config
1191                    .environmentd_node_selector
1192                    .iter()
1193                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1194                    .collect(),
1195            ),
1196            affinity: config.environmentd_affinity.clone(),
1197            scheduler_name: config.scheduler_name.clone(),
1198            service_account_name: Some(mz.service_account_name()),
1199            volumes: Some(volumes),
1200            security_context: Some(PodSecurityContext {
1201                fs_group: Some(999),
1202                run_as_user: Some(999),
1203                run_as_group: Some(999),
1204                ..Default::default()
1205            }),
1206            tolerations,
1207            // This (apparently) has the side effect of automatically starting a new pod
1208            // when the previous pod is currently terminating. This side steps the statefulset fencing
1209            // but allows for quicker recovery from a failed node
1210            // The Kubernetes documentation strongly advises against this
1211            // setting, as StatefulSets attempt to provide "at most once"
1212            // semantics [0]-- that is, the guarantee that for a given pod in a
1213            // StatefulSet there is *at most* one pod with that identity running
1214            // in the cluster
1215            //
1216            // Materialize, however, has been carefully designed to *not* rely
1217            // on this guarantee. (In fact, we do not believe that correct
1218            // distributed systems can meaningfully rely on Kubernetes's
1219            // guarantee--network packets from a pod can be arbitrarily delayed,
1220            // long past that pod's termination.) Running two `environmentd`
1221            // processes is safe: the newer process will safely and correctly
1222            // fence out the older process via CockroachDB. In the future,
1223            // we'll support running multiple `environmentd` processes in
1224            // parallel for high availability.
1225            //
1226            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1227            termination_grace_period_seconds: Some(0),
1228            ..Default::default()
1229        }),
1230    };
1231
1232    let mut match_labels = BTreeMap::new();
1233    match_labels.insert(
1234        "materialize.cloud/name".to_owned(),
1235        mz.environmentd_statefulset_name(generation),
1236    );
1237
1238    let statefulset_spec = StatefulSetSpec {
1239        replicas: Some(1),
1240        template: pod_template_spec,
1241        service_name: Some(mz.environmentd_service_name()),
1242        selector: LabelSelector {
1243            match_expressions: None,
1244            match_labels: Some(match_labels),
1245        },
1246        ..Default::default()
1247    };
1248
1249    StatefulSet {
1250        metadata: ObjectMeta {
1251            annotations: Some(btreemap! {
1252                "materialize.cloud/generation".to_owned() => generation.to_string(),
1253                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1254            }),
1255            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1256        },
1257        spec: Some(statefulset_spec),
1258        status: None,
1259    }
1260}
1261
1262fn create_connection_info(
1263    config: &super::Config,
1264    mz: &Materialize,
1265    generation: u64,
1266) -> ConnectionInfo {
1267    let external_enable_tls = issuer_ref_defined(
1268        &config.default_certificate_specs.internal,
1269        &mz.spec.internal_certificate_spec,
1270    );
1271    let authenticator_kind = mz.spec.authenticator_kind;
1272
1273    let mut listeners_config = ListenersConfig {
1274        sql: btreemap! {
1275            "external".to_owned() => SqlListenerConfig{
1276                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1277                authenticator_kind,
1278                allowed_roles: AllowedRoles::Normal,
1279                enable_tls: external_enable_tls,
1280            },
1281            "internal".to_owned() => SqlListenerConfig{
1282                addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1283                authenticator_kind: AuthenticatorKind::None,
1284                // Should this just be Internal?
1285                allowed_roles: AllowedRoles::NormalAndInternal,
1286                enable_tls: false,
1287            },
1288        },
1289        http: btreemap! {
1290            "external".to_owned() => HttpListenerConfig{
1291                base: BaseListenerConfig {
1292                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1293                    // SASL authentication is only supported for SQL (PostgreSQL wire protocol).
1294                    // HTTP listeners must use Password authentication when SASL is enabled.
1295                    // This is validated at environmentd startup via ListenerConfig::validate().
1296                    authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1297                        AuthenticatorKind::Password
1298                    } else {
1299                        authenticator_kind
1300                    },
1301                    allowed_roles: AllowedRoles::Normal,
1302                    enable_tls: external_enable_tls,
1303                },
1304                routes: HttpRoutesEnabled{
1305                    base: true,
1306                    webhook: true,
1307                    internal: false,
1308                    metrics: false,
1309                    profiling: false,
1310                }
1311            },
1312            "internal".to_owned() => HttpListenerConfig{
1313                base: BaseListenerConfig {
1314                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1315                    authenticator_kind: AuthenticatorKind::None,
1316                    // Should this just be Internal?
1317                    allowed_roles: AllowedRoles::NormalAndInternal,
1318                    enable_tls: false,
1319                },
1320                routes: HttpRoutesEnabled{
1321                    base: true,
1322                    webhook: true,
1323                    internal: true,
1324                    metrics: true,
1325                    profiling: true,
1326                }
1327            },
1328        },
1329    };
1330
1331    if matches!(
1332        authenticator_kind,
1333        AuthenticatorKind::Password | AuthenticatorKind::Sasl
1334    ) {
1335        listeners_config.sql.remove("internal");
1336        listeners_config.http.remove("internal");
1337
1338        listeners_config.sql.get_mut("external").map(|listener| {
1339            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1340            listener
1341        });
1342        listeners_config.http.get_mut("external").map(|listener| {
1343            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1344            listener.routes.internal = true;
1345            listener.routes.profiling = true;
1346            listener
1347        });
1348
1349        listeners_config.http.insert(
1350            "metrics".to_owned(),
1351            HttpListenerConfig {
1352                base: BaseListenerConfig {
1353                    addr: SocketAddr::new(
1354                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1355                        config.environmentd_internal_http_port,
1356                    ),
1357                    authenticator_kind: AuthenticatorKind::None,
1358                    allowed_roles: AllowedRoles::NormalAndInternal,
1359                    enable_tls: false,
1360                },
1361                routes: HttpRoutesEnabled {
1362                    base: false,
1363                    webhook: false,
1364                    internal: false,
1365                    metrics: true,
1366                    profiling: false,
1367                },
1368            },
1369        );
1370    }
1371
1372    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1373    let listeners_configmap = ConfigMap {
1374        binary_data: None,
1375        data: Some(btreemap! {
1376            "listeners.json".to_owned() => listeners_json,
1377        }),
1378        immutable: None,
1379        metadata: ObjectMeta {
1380            annotations: Some(btreemap! {
1381                "materialize.cloud/generation".to_owned() => generation.to_string(),
1382            }),
1383            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1384        },
1385    };
1386
1387    let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1388        AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1389            let scheme = if external_enable_tls { "https" } else { "http" };
1390            (
1391                scheme,
1392                config.environmentd_http_port,
1393                Some(mz.spec.backend_secret_name.clone()),
1394            )
1395        }
1396        _ => ("http", config.environmentd_internal_http_port, None),
1397    };
1398    let environmentd_url = format!(
1399        "{}://{}.{}.svc.cluster.local:{}",
1400        scheme,
1401        mz.environmentd_generation_service_name(generation),
1402        mz.namespace(),
1403        leader_api_port,
1404    );
1405    ConnectionInfo {
1406        environmentd_url,
1407        listeners_configmap,
1408        mz_system_secret_name,
1409    }
1410}
1411
1412// see materialize/src/environmentd/src/http.rs
1413#[derive(Debug, Deserialize, PartialEq, Eq)]
1414struct BecomeLeaderResponse {
1415    result: BecomeLeaderResult,
1416}
1417
1418#[derive(Debug, Deserialize, PartialEq, Eq)]
1419enum BecomeLeaderResult {
1420    Success,
1421    Failure { message: String },
1422}
1423
1424#[derive(Debug, Deserialize, PartialEq, Eq)]
1425struct SkipCatchupError {
1426    message: String,
1427}