Skip to main content

mz_orchestratord/controller/materialize/
generation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeMap,
12    net::{IpAddr, Ipv4Addr, SocketAddr},
13    sync::LazyLock,
14    time::Duration,
15};
16
17use k8s_openapi::{
18    api::{
19        apps::v1::{StatefulSet, StatefulSetSpec},
20        core::v1::{
21            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22            EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23            SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24            Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25        },
26    },
27    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33    ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51    major: 0,
52    minor: 140,
53    patch: 0,
54    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60    major: 0,
61    minor: 147,
62    patch: 0,
63    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68    major: 0,
69    minor: 154,
70    patch: 0,
71    pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77    major: 26,
78    minor: 1,
79    patch: 0,
80    pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81    build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84/// Describes the status of a deployment.
85///
86/// This is a simplified representation of `DeploymentState`, suitable for
87/// announcement to the external orchestrator.
88#[derive(
89    Debug,
90    Serialize,
91    Deserialize,
92    Clone,
93    Copy,
94    PartialEq,
95    Eq,
96    PartialOrd,
97    Ord
98)]
99pub enum DeploymentStatus {
100    /// This deployment is not the leader. It is initializing and is not yet
101    /// ready to become the leader.
102    Initializing,
103    /// This deployment is not the leader, but it is ready to become the leader.
104    ReadyToPromote,
105    /// This deployment is in the process of becoming the leader.
106    Promoting,
107    /// This deployment is the leader.
108    IsLeader,
109}
110
111#[derive(
112    Debug,
113    Serialize,
114    Deserialize,
115    Clone,
116    Copy,
117    PartialEq,
118    Eq,
119    PartialOrd,
120    Ord
121)]
122pub struct GetLeaderStatusResponse {
123    status: DeploymentStatus,
124}
125
126#[derive(Deserialize, Serialize)]
127pub struct LoginCredentials {
128    username: String,
129    // TODO Password type?
130    password: String,
131}
132
133#[derive(Debug, Serialize)]
134pub struct ConnectionInfo {
135    pub environmentd_url: String,
136    pub mz_system_secret_name: Option<String>,
137    pub listeners_configmap: ConfigMap,
138}
139
140#[derive(Debug, Serialize)]
141pub struct Resources {
142    pub generation: u64,
143    pub public_service: Box<Service>,
144    pub generation_service: Box<Service>,
145    pub persist_pubsub_service: Box<Service>,
146    pub environmentd_statefulset: Box<StatefulSet>,
147    pub connection_info: Box<ConnectionInfo>,
148}
149
150impl Resources {
151    pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
152        let public_service = Box::new(create_public_service_object(config, mz, generation));
153        let generation_service = Box::new(create_generation_service_object(config, mz, generation));
154        let persist_pubsub_service =
155            Box::new(create_persist_pubsub_service(config, mz, generation));
156        let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157            config, mz, generation,
158        ));
159        let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161        Self {
162            generation,
163            public_service,
164            generation_service,
165            persist_pubsub_service,
166            environmentd_statefulset,
167            connection_info,
168        }
169    }
170
171    #[instrument]
172    pub async fn apply(
173        &self,
174        client: &Client,
175        force_promote: bool,
176        namespace: &str,
177    ) -> Result<Option<Action>, Error> {
178        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
179        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
180        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
181
182        trace!("applying environmentd per-generation service");
183        apply_resource(&service_api, &*self.generation_service).await?;
184
185        trace!("creating persist pubsub service");
186        apply_resource(&service_api, &*self.persist_pubsub_service).await?;
187
188        trace!("applying listeners configmap");
189        apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
190
191        trace!("creating new environmentd statefulset");
192        apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
193
194        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
195
196        let statefulset = get_resource(
197            &statefulset_api,
198            &self.environmentd_statefulset.name_unchecked(),
199        )
200        .await?;
201        if statefulset
202            .and_then(|statefulset| statefulset.status)
203            .and_then(|status| status.ready_replicas)
204            .unwrap_or(0)
205            == 0
206        {
207            trace!("environmentd statefulset is not ready yet...");
208            return Ok(Some(retry_action));
209        }
210
211        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
212            return Ok(Some(retry_action));
213        };
214        let status_url = reqwest::Url::parse(&format!(
215            "{}/api/leader/status",
216            self.connection_info.environmentd_url,
217        ))
218        .unwrap();
219
220        match http_client.get(status_url.clone()).send().await {
221            Ok(response) => {
222                let response: GetLeaderStatusResponse = match response.error_for_status() {
223                    Ok(response) => response.json().await?,
224                    Err(e) => {
225                        trace!("failed to get status of environmentd, retrying... ({e})");
226                        return Ok(Some(retry_action));
227                    }
228                };
229                if force_promote {
230                    trace!("skipping cluster catchup");
231                    let skip_catchup_url = reqwest::Url::parse(&format!(
232                        "{}/api/leader/skip-catchup",
233                        self.connection_info.environmentd_url,
234                    ))
235                    .unwrap();
236                    let response = http_client.post(skip_catchup_url).send().await?;
237                    if response.status() == StatusCode::BAD_REQUEST {
238                        let err: SkipCatchupError = response.json().await?;
239                        return Err(
240                            anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
241                        );
242                    }
243                } else {
244                    match response.status {
245                        DeploymentStatus::Initializing => {
246                            trace!("environmentd is still initializing, retrying...");
247                            return Ok(Some(retry_action));
248                        }
249                        DeploymentStatus::ReadyToPromote
250                        | DeploymentStatus::Promoting
251                        | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
252                    }
253                }
254            }
255            Err(e) => {
256                trace!("failed to connect to environmentd, retrying... ({e})");
257                return Ok(Some(retry_action));
258            }
259        }
260
261        Ok(None)
262    }
263
264    #[instrument]
265    async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
266        let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
267        Some(match &self.connection_info.mz_system_secret_name {
268            Some(mz_system_secret_name) => {
269                let http_client = reqwest::Client::builder()
270                    .timeout(std::time::Duration::from_secs(10))
271                    .cookie_store(true)
272                    // TODO add_root_certificate instead
273                    .danger_accept_invalid_certs(true)
274                    .build()
275                    .unwrap();
276
277                let secret = secret_api
278                    .get(mz_system_secret_name)
279                    .await
280                    .map_err(|e| {
281                        error!("Failed to get backend secret: {:?}", e);
282                        e
283                    })
284                    .ok()?;
285                if let Some(data) = secret.data {
286                    if let Some(password) = data.get("external_login_password_mz_system").cloned() {
287                        let password = String::from_utf8_lossy(&password.0).to_string();
288                        let login_url = reqwest::Url::parse(&format!(
289                            "{}/api/login",
290                            self.connection_info.environmentd_url,
291                        ))
292                        .unwrap();
293                        match http_client
294                            .post(login_url)
295                            .body(
296                                serde_json::to_string(&LoginCredentials {
297                                    username: "mz_system".to_owned(),
298                                    password,
299                                })
300                                .expect(
301                                    "Serializing a simple struct with utf8 strings doesn't fail.",
302                                ),
303                            )
304                            .header("Content-Type", "application/json")
305                            .send()
306                            .await
307                        {
308                            Ok(response) => {
309                                if let Err(e) = response.error_for_status() {
310                                    trace!("failed to login to environmentd, retrying... ({e})");
311                                    return None;
312                                }
313                            }
314                            Err(e) => {
315                                trace!("failed to connect to environmentd, retrying... ({e})");
316                                return None;
317                            }
318                        };
319                    }
320                };
321                http_client
322            }
323            None => reqwest::Client::builder()
324                .timeout(std::time::Duration::from_secs(10))
325                .build()
326                .unwrap(),
327        })
328    }
329
330    #[instrument]
331    pub async fn promote_services(
332        &self,
333        client: &Client,
334        namespace: &str,
335    ) -> Result<Option<Action>, Error> {
336        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
337        let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
338
339        let promote_url = reqwest::Url::parse(&format!(
340            "{}/api/leader/promote",
341            self.connection_info.environmentd_url,
342        ))
343        .unwrap();
344
345        let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
346            return Ok(Some(retry_action));
347        };
348
349        trace!("promoting new environmentd to leader");
350        let response = http_client.post(promote_url).send().await?;
351        let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
352        if let BecomeLeaderResult::Failure { message } = response.result {
353            return Err(Error::Anyhow(anyhow::anyhow!(
354                "failed to promote new environmentd: {message}"
355            )));
356        }
357
358        // A successful POST to the promotion endpoint only indicates
359        // that the promotion process was kicked off. It does not
360        // guarantee that the environment will be successfully promoted
361        // (e.g., if the environment crashes immediately after responding
362        // to the request, but before executing the takeover, the
363        // promotion will be lost).
364        //
365        // To guarantee the environment has been promoted successfully,
366        // we must wait to see at least one `IsLeader` status returned
367        // from the environment.
368
369        let status_url = reqwest::Url::parse(&format!(
370            "{}/api/leader/status",
371            self.connection_info.environmentd_url,
372        ))
373        .unwrap();
374        match http_client.get(status_url.clone()).send().await {
375            Ok(response) => {
376                let response: GetLeaderStatusResponse = response.json().await?;
377                if response.status != DeploymentStatus::IsLeader {
378                    trace!(
379                        "environmentd is still promoting (status: {:?}), retrying...",
380                        response.status
381                    );
382                    return Ok(Some(retry_action));
383                } else {
384                    trace!("environmentd is ready");
385                }
386            }
387            Err(e) => {
388                trace!("failed to connect to environmentd, retrying... ({e})");
389                return Ok(Some(retry_action));
390            }
391        }
392
393        trace!("applying environmentd public service");
394        apply_resource(&service_api, &*self.public_service).await?;
395
396        Ok(None)
397    }
398
399    #[instrument]
400    pub async fn teardown_generation(
401        &self,
402        client: &Client,
403        mz: &Materialize,
404        generation: u64,
405    ) -> Result<(), anyhow::Error> {
406        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
407        let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
408        let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
409
410        trace!("deleting environmentd statefulset for generation {generation}");
411        delete_resource(
412            &statefulset_api,
413            &mz.environmentd_statefulset_name(generation),
414        )
415        .await?;
416
417        trace!("deleting persist pubsub service for generation {generation}");
418        delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
419
420        trace!("deleting environmentd per-generation service for generation {generation}");
421        delete_resource(
422            &service_api,
423            &mz.environmentd_generation_service_name(generation),
424        )
425        .await?;
426
427        trace!("deleting listeners configmap for generation {generation}");
428        delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
429
430        Ok(())
431    }
432
433    // ideally we would just be able to hash the objects directly, but the
434    // generated kubernetes objects don't implement the Hash trait
435    pub fn generate_hash(&self) -> String {
436        let mut hasher = Sha256::new();
437        hasher.update(&serde_json::to_string(self).unwrap());
438        format!("{:x}", hasher.finalize())
439    }
440}
441
442fn create_public_service_object(
443    config: &super::Config,
444    mz: &Materialize,
445    generation: u64,
446) -> Service {
447    create_base_service_object(
448        config,
449        mz,
450        generation,
451        &mz.environmentd_service_name(),
452        true,
453    )
454}
455
456fn create_generation_service_object(
457    config: &super::Config,
458    mz: &Materialize,
459    generation: u64,
460) -> Service {
461    create_base_service_object(
462        config,
463        mz,
464        generation,
465        &mz.environmentd_generation_service_name(generation),
466        false,
467    )
468}
469
470fn create_base_service_object(
471    config: &super::Config,
472    mz: &Materialize,
473    generation: u64,
474    service_name: &str,
475    headless: bool,
476) -> Service {
477    let ports = vec![
478        ServicePort {
479            port: config.environmentd_sql_port.into(),
480            protocol: Some("TCP".to_string()),
481            name: Some("sql".to_string()),
482            ..Default::default()
483        },
484        ServicePort {
485            port: config.environmentd_http_port.into(),
486            protocol: Some("TCP".to_string()),
487            name: Some("https".to_string()),
488            ..Default::default()
489        },
490        ServicePort {
491            port: config.environmentd_internal_sql_port.into(),
492            protocol: Some("TCP".to_string()),
493            name: Some("internal-sql".to_string()),
494            ..Default::default()
495        },
496        ServicePort {
497            port: config.environmentd_internal_http_port.into(),
498            protocol: Some("TCP".to_string()),
499            name: Some("internal-http".to_string()),
500            ..Default::default()
501        },
502    ];
503
504    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
505
506    let spec = ServiceSpec {
507        type_: Some("ClusterIP".to_string()),
508        cluster_ip: if headless {
509            Some("None".to_string())
510        } else {
511            None
512        },
513        selector: Some(selector),
514        ports: Some(ports),
515        ..Default::default()
516    };
517
518    Service {
519        metadata: mz.managed_resource_meta(service_name.to_string()),
520        spec: Some(spec),
521        status: None,
522    }
523}
524
525fn create_persist_pubsub_service(
526    config: &super::Config,
527    mz: &Materialize,
528    generation: u64,
529) -> Service {
530    Service {
531        metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
532        spec: Some(ServiceSpec {
533            type_: Some("ClusterIP".to_string()),
534            cluster_ip: Some("None".to_string()),
535            selector: Some(btreemap! {
536                "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
537            }),
538            ports: Some(vec![ServicePort {
539                name: Some("grpc".to_string()),
540                protocol: Some("TCP".to_string()),
541                port: config.environmentd_internal_persist_pubsub_port.into(),
542                ..Default::default()
543            }]),
544            ..Default::default()
545        }),
546        status: None,
547    }
548}
549
550fn create_environmentd_statefulset_object(
551    config: &super::Config,
552    mz: &Materialize,
553    generation: u64,
554) -> StatefulSet {
555    // IMPORTANT: Only pass secrets via environment variables. All other
556    // parameters should be passed as command line arguments, possibly gated
557    // with a `meets_minimum_version` call. This ensures typos cause
558    // `environmentd` to fail to start, rather than silently ignoring the
559    // configuration.
560    //
561    // When passing a secret, use a `SecretKeySelector` to forward a secret into
562    // the pod. Do *not* hardcode a secret as the value directly, as doing so
563    // will leak the secret to anyone with permission to describe the pod.
564    let mut env = vec![
565        EnvVar {
566            name: "MZ_METADATA_BACKEND_URL".to_string(),
567            value_from: Some(EnvVarSource {
568                secret_key_ref: Some(SecretKeySelector {
569                    name: mz.backend_secret_name(),
570                    key: "metadata_backend_url".to_string(),
571                    optional: Some(false),
572                }),
573                ..Default::default()
574            }),
575            ..Default::default()
576        },
577        EnvVar {
578            name: "MZ_PERSIST_BLOB_URL".to_string(),
579            value_from: Some(EnvVarSource {
580                secret_key_ref: Some(SecretKeySelector {
581                    name: mz.backend_secret_name(),
582                    key: "persist_backend_url".to_string(),
583                    optional: Some(false),
584                }),
585                ..Default::default()
586            }),
587            ..Default::default()
588        },
589    ];
590
591    env.push(EnvVar {
592        name: "AWS_REGION".to_string(),
593        value: Some(config.region.clone()),
594        ..Default::default()
595    });
596
597    env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
598
599    let mut args = vec![];
600
601    if let Some(helm_chart_version) = &config.helm_chart_version {
602        args.push(format!("--helm-chart-version={helm_chart_version}"));
603    }
604
605    // Add environment ID argument.
606    args.push(format!(
607        "--environment-id={}",
608        mz.environment_id(&config.cloud_provider.to_string(), &config.region)
609    ));
610
611    // Add clusterd image argument based on environmentd tag.
612    args.push(format!(
613        "--clusterd-image={}",
614        matching_image_from_environmentd_image_ref(
615            &mz.spec.environmentd_image_ref,
616            "clusterd",
617            None
618        )
619    ));
620
621    // Add cluster and storage host size arguments.
622    args.extend(
623        [
624            config
625                .environmentd_cluster_replica_sizes
626                .as_ref()
627                .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
628            config
629                .bootstrap_default_cluster_replica_size
630                .as_ref()
631                .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
632            config
633                .bootstrap_builtin_system_cluster_replica_size
634                .as_ref()
635                .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
636            config
637                .bootstrap_builtin_probe_cluster_replica_size
638                .as_ref()
639                .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
640            config
641                .bootstrap_builtin_support_cluster_replica_size
642                .as_ref()
643                .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
644            config
645                .bootstrap_builtin_catalog_server_cluster_replica_size
646                .as_ref()
647                .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
648            config
649                .bootstrap_builtin_analytics_cluster_replica_size
650                .as_ref()
651                .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
652            config
653                .bootstrap_builtin_system_cluster_replication_factor
654                .as_ref()
655                .map(|replication_factor| {
656                    format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
657                }),
658            config
659                .bootstrap_builtin_probe_cluster_replication_factor
660                .as_ref()
661                .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
662            config
663                .bootstrap_builtin_support_cluster_replication_factor
664                .as_ref()
665                .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
666            config
667                .bootstrap_builtin_analytics_cluster_replication_factor
668                .as_ref()
669                .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
670        ]
671        .into_iter()
672        .flatten(),
673    );
674
675    args.extend(
676        config
677            .environmentd_allowed_origins
678            .iter()
679            .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
680    );
681
682    args.push(format!(
683        "--secrets-controller={}",
684        config.secrets_controller
685    ));
686
687    if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
688        if let Ok(cluster_replica_sizes) =
689            serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
690        {
691            let cluster_replica_sizes: Vec<_> =
692                cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
693            args.push(format!(
694                "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
695                cluster_replica_sizes.join("', '")
696            ));
697        }
698    }
699    if !config.cloud_provider.is_cloud() {
700        args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
701    }
702
703    if config.enable_internal_statement_logging {
704        args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
705    }
706
707    if config.disable_statement_logging {
708        args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
709    }
710
711    if !mz.spec.enable_rbac {
712        args.push("--system-parameter-default=enable_rbac_checks=false".into());
713    }
714
715    // Add persist arguments.
716
717    // Configure the Persist Isolated Runtime to use one less thread than the total available.
718    args.push("--persist-isolated-runtime-threads=-1".to_string());
719
720    // Add AWS arguments.
721    if config.cloud_provider == CloudProvider::Aws {
722        if let Some(azs) = config.environmentd_availability_zones.as_ref() {
723            for az in azs {
724                args.push(format!("--availability-zone={az}"));
725            }
726        }
727
728        if let Some(environmentd_connection_role_arn) = mz
729            .spec
730            .environmentd_connection_role_arn
731            .as_deref()
732            .or(config.environmentd_connection_role_arn.as_deref())
733        {
734            args.push(format!(
735                "--aws-connection-role-arn={}",
736                environmentd_connection_role_arn
737            ));
738        }
739        if let Some(account_id) = &config.aws_account_id {
740            args.push(format!("--aws-account-id={account_id}"));
741        }
742
743        args.extend([format!(
744            "--aws-secrets-controller-tags=Environment={}",
745            mz.name_unchecked()
746        )]);
747        args.extend_from_slice(&config.aws_secrets_controller_tags);
748    }
749
750    // Add Kubernetes arguments.
751    args.extend([
752        "--orchestrator=kubernetes".into(),
753        format!(
754            "--orchestrator-kubernetes-service-account={}",
755            &mz.service_account_name()
756        ),
757        format!(
758            "--orchestrator-kubernetes-image-pull-policy={}",
759            config.image_pull_policy.as_kebab_case_str(),
760        ),
761    ]);
762    for selector in &config.clusterd_node_selector {
763        args.push(format!(
764            "--orchestrator-kubernetes-service-node-selector={}={}",
765            selector.key, selector.value,
766        ));
767    }
768    if mz.meets_minimum_version(&V144) {
769        if let Some(affinity) = &config.clusterd_affinity {
770            let affinity = serde_json::to_string(affinity).unwrap();
771            args.push(format!(
772                "--orchestrator-kubernetes-service-affinity={affinity}"
773            ))
774        }
775        if let Some(tolerations) = &config.clusterd_tolerations {
776            let tolerations = serde_json::to_string(tolerations).unwrap();
777            args.push(format!(
778                "--orchestrator-kubernetes-service-tolerations={tolerations}"
779            ))
780        }
781    }
782    if let Some(scheduler_name) = &config.scheduler_name {
783        args.push(format!(
784            "--orchestrator-kubernetes-scheduler-name={}",
785            scheduler_name
786        ));
787    }
788    if mz.meets_minimum_version(&V154_DEV0) {
789        args.extend(
790            mz.spec
791                .pod_annotations
792                .as_ref()
793                .map(|annotations| annotations.iter())
794                .unwrap_or_default()
795                .map(|(key, val)| {
796                    format!("--orchestrator-kubernetes-service-annotation={key}={val}")
797                }),
798        );
799    }
800    args.extend(
801        mz.default_labels()
802            .iter()
803            .chain(
804                mz.spec
805                    .pod_labels
806                    .as_ref()
807                    .map(|labels| labels.iter())
808                    .unwrap_or_default(),
809            )
810            .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
811    );
812    if let Some(status) = &mz.status {
813        args.push(format!(
814            "--orchestrator-kubernetes-name-prefix=mz{}-",
815            status.resource_id
816        ));
817    }
818
819    // Add logging and tracing arguments.
820    args.extend(["--log-format=json".into()]);
821    if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
822        args.push(format!("--opentelemetry-endpoint={}", endpoint));
823    }
824    // --opentelemetry-resource also configures sentry tags
825    args.extend([
826        format!(
827            "--opentelemetry-resource=organization_id={}",
828            mz.spec.environment_id
829        ),
830        format!(
831            "--opentelemetry-resource=environment_name={}",
832            mz.name_unchecked()
833        ),
834    ]);
835
836    if let Some(segment_api_key) = &config.segment_api_key {
837        args.push(format!("--segment-api-key={}", segment_api_key));
838        if config.segment_client_side {
839            args.push("--segment-client-side".into());
840        }
841    }
842
843    let mut volumes = Vec::new();
844    let mut volume_mounts = Vec::new();
845    if issuer_ref_defined(
846        &config.default_certificate_specs.internal,
847        &mz.spec.internal_certificate_spec,
848    ) {
849        volumes.push(Volume {
850            name: "certificate".to_owned(),
851            secret: Some(SecretVolumeSource {
852                default_mode: Some(0o400),
853                secret_name: Some(mz.environmentd_certificate_secret_name()),
854                items: None,
855                optional: Some(false),
856            }),
857            ..Default::default()
858        });
859        volume_mounts.push(VolumeMount {
860            name: "certificate".to_owned(),
861            mount_path: "/etc/materialized".to_owned(),
862            read_only: Some(true),
863            ..Default::default()
864        });
865        args.extend([
866            "--tls-mode=require".into(),
867            "--tls-cert=/etc/materialized/tls.crt".into(),
868            "--tls-key=/etc/materialized/tls.key".into(),
869        ]);
870    } else {
871        args.push("--tls-mode=disable".to_string());
872    }
873    if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
874        args.push(format!(
875            "--orchestrator-kubernetes-ephemeral-volume-class={}",
876            ephemeral_volume_class
877        ));
878    }
879    // The `materialize` user used by clusterd always has gid 999.
880    args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
881
882    // Add system_param configmap
883    // This feature was enabled in 0.163 but did not have testing until after 0.164.
884    // 0.165 should work with anything greater than 0.164 including v26 and v25.
885    if mz.meets_minimum_version(&V26_1_0) {
886        if let Some(ref name) = mz.spec.system_parameter_configmap_name {
887            volumes.push(Volume {
888                name: "system-params".to_string(),
889                config_map: Some(ConfigMapVolumeSource {
890                    default_mode: Some(0o400),
891                    name: name.to_owned(),
892                    items: None,
893                    optional: Some(true),
894                }),
895                ..Default::default()
896            });
897            volume_mounts.push(VolumeMount {
898                name: "system-params".to_string(),
899                // The user must write to the `system-params.json` entry in the config map
900                mount_path: "/system-params".to_owned(),
901                read_only: Some(true),
902                ..Default::default()
903            });
904            args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
905            args.push("--config-sync-loop-interval=1s".to_string());
906        }
907    }
908
909    // Add Sentry arguments.
910    if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
911        args.push(format!("--sentry-dsn={}", sentry_dsn));
912        if let Some(sentry_environment) = &config.tracing.sentry_environment {
913            args.push(format!("--sentry-environment={}", sentry_environment));
914        }
915        args.push(format!("--sentry-tag=region={}", config.region));
916    }
917
918    // Add Persist PubSub arguments
919    args.push(format!(
920        "--persist-pubsub-url=http://{}:{}",
921        mz.persist_pubsub_service_name(generation),
922        config.environmentd_internal_persist_pubsub_port,
923    ));
924    args.push(format!(
925        "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
926        config.environmentd_internal_persist_pubsub_port
927    ));
928
929    args.push(format!("--deploy-generation={}", generation));
930
931    // Add URL for internal user impersonation endpoint
932    args.push(format!(
933        "--internal-console-redirect-url={}",
934        &config.internal_console_proxy_url,
935    ));
936
937    if !config.collect_pod_metrics {
938        args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
939    }
940    if config.enable_prometheus_scrape_annotations {
941        args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
942    }
943
944    // the --disable-license-key-checks environmentd flag only existed
945    // between these versions
946    if config.disable_license_key_checks {
947        if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
948            args.push("--disable-license-key-checks".into());
949        }
950    }
951
952    // as of version 0.153, the ability to disable license key checks was
953    // removed, so we should always set up license keys in that case
954    if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
955        || mz.meets_minimum_version(&V153)
956    {
957        volume_mounts.push(VolumeMount {
958            name: "license-key".to_string(),
959            mount_path: "/license_key".to_string(),
960            ..Default::default()
961        });
962        volumes.push(Volume {
963            name: "license-key".to_string(),
964            secret: Some(SecretVolumeSource {
965                default_mode: Some(256),
966                optional: Some(false),
967                secret_name: Some(mz.backend_secret_name()),
968                items: Some(vec![KeyToPath {
969                    key: "license_key".to_string(),
970                    path: "license_key".to_string(),
971                    ..Default::default()
972                }]),
973                ..Default::default()
974            }),
975            ..Default::default()
976        });
977        env.push(EnvVar {
978            name: "MZ_LICENSE_KEY".to_string(),
979            value: Some("/license_key/license_key".to_string()),
980            ..Default::default()
981        });
982    }
983
984    // Add user-specified extra arguments.
985    if let Some(extra_args) = &mz.spec.environmentd_extra_args {
986        args.extend(extra_args.iter().cloned());
987    }
988
989    let probe = Probe {
990        initial_delay_seconds: Some(1),
991        failure_threshold: Some(12),
992        tcp_socket: Some(TCPSocketAction {
993            host: None,
994            port: IntOrString::Int(config.environmentd_sql_port.into()),
995        }),
996        ..Default::default()
997    };
998
999    let security_context = if config.enable_security_context {
1000        // Since we want to adhere to the most restrictive security context, all
1001        // of these fields have to be set how they are.
1002        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
1003        Some(SecurityContext {
1004            run_as_non_root: Some(true),
1005            capabilities: Some(Capabilities {
1006                drop: Some(vec!["ALL".to_string()]),
1007                ..Default::default()
1008            }),
1009            seccomp_profile: Some(SeccompProfile {
1010                type_: "RuntimeDefault".to_string(),
1011                ..Default::default()
1012            }),
1013            allow_privilege_escalation: Some(false),
1014            ..Default::default()
1015        })
1016    } else {
1017        None
1018    };
1019
1020    let ports = vec![
1021        ContainerPort {
1022            container_port: config.environmentd_sql_port.into(),
1023            name: Some("sql".to_owned()),
1024            ..Default::default()
1025        },
1026        ContainerPort {
1027            container_port: config.environmentd_internal_sql_port.into(),
1028            name: Some("internal-sql".to_owned()),
1029            ..Default::default()
1030        },
1031        ContainerPort {
1032            container_port: config.environmentd_http_port.into(),
1033            name: Some("http".to_owned()),
1034            ..Default::default()
1035        },
1036        ContainerPort {
1037            container_port: config.environmentd_internal_http_port.into(),
1038            name: Some("internal-http".to_owned()),
1039            ..Default::default()
1040        },
1041        ContainerPort {
1042            container_port: config.environmentd_internal_persist_pubsub_port.into(),
1043            name: Some("persist-pubsub".to_owned()),
1044            ..Default::default()
1045        },
1046    ];
1047
1048    // Add networking arguments.
1049    if mz.meets_minimum_version(&V147_DEV0) {
1050        volume_mounts.push(VolumeMount {
1051            name: "listeners-configmap".to_string(),
1052            mount_path: "/listeners".to_string(),
1053            ..Default::default()
1054        });
1055        volumes.push(Volume {
1056            name: "listeners-configmap".to_string(),
1057            config_map: Some(ConfigMapVolumeSource {
1058                name: mz.listeners_configmap_name(generation),
1059                default_mode: Some(256),
1060                optional: Some(false),
1061                items: Some(vec![KeyToPath {
1062                    key: "listeners.json".to_string(),
1063                    path: "listeners.json".to_string(),
1064                    ..Default::default()
1065                }]),
1066            }),
1067            ..Default::default()
1068        });
1069        args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1070        if matches!(
1071            mz.spec.authenticator_kind,
1072            AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc
1073        ) {
1074            args.push("--system-parameter-default=enable_password_auth=true".into());
1075            env.push(EnvVar {
1076                name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1077                value_from: Some(EnvVarSource {
1078                    secret_key_ref: Some(SecretKeySelector {
1079                        name: mz.backend_secret_name(),
1080                        key: "external_login_password_mz_system".to_string(),
1081                        optional: Some(false),
1082                    }),
1083                    ..Default::default()
1084                }),
1085                ..Default::default()
1086            })
1087        }
1088    } else {
1089        args.extend([
1090            format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1091            format!(
1092                "--http-listen-addr=0.0.0.0:{}",
1093                config.environmentd_http_port
1094            ),
1095            format!(
1096                "--internal-sql-listen-addr=0.0.0.0:{}",
1097                config.environmentd_internal_sql_port
1098            ),
1099            format!(
1100                "--internal-http-listen-addr=0.0.0.0:{}",
1101                config.environmentd_internal_http_port
1102            ),
1103        ]);
1104    }
1105
1106    let container = Container {
1107        name: "environmentd".to_owned(),
1108        image: Some(mz.spec.environmentd_image_ref.to_owned()),
1109        image_pull_policy: Some(config.image_pull_policy.to_string()),
1110        ports: Some(ports),
1111        args: Some(args),
1112        env: Some(env),
1113        volume_mounts: Some(volume_mounts),
1114        liveness_probe: Some(probe.clone()),
1115        readiness_probe: Some(probe),
1116        resources: mz
1117            .spec
1118            .environmentd_resource_requirements
1119            .clone()
1120            .or_else(|| config.environmentd_default_resources.clone()),
1121        security_context: security_context.clone(),
1122        ..Default::default()
1123    };
1124
1125    let mut pod_template_labels = mz.default_labels();
1126    pod_template_labels.insert(
1127        "materialize.cloud/name".to_owned(),
1128        mz.environmentd_statefulset_name(generation),
1129    );
1130    pod_template_labels.insert(
1131        "materialize.cloud/app".to_owned(),
1132        mz.environmentd_app_name(),
1133    );
1134    pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1135    pod_template_labels.extend(
1136        mz.spec
1137            .pod_labels
1138            .as_ref()
1139            .map(|labels| labels.iter())
1140            .unwrap_or_default()
1141            .map(|(key, value)| (key.clone(), value.clone())),
1142    );
1143
1144    let mut pod_template_annotations = btreemap! {
1145        // We can re-enable eviction once we have HA
1146        "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1147
1148        // Prevents old (< 0.30) and new versions of karpenter from evicting database pods
1149        "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1150        "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1151        "materialize.cloud/generation".to_owned() => generation.to_string(),
1152    };
1153    if config.enable_prometheus_scrape_annotations {
1154        pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1155        pod_template_annotations.insert(
1156            "prometheus.io/port".to_owned(),
1157            config.environmentd_internal_http_port.to_string(),
1158        );
1159        pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1160        pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1161        pod_template_annotations.insert(
1162            "materialize.prometheus.io/mz_usage_path".to_owned(),
1163            "/metrics/mz_usage".to_string(),
1164        );
1165        pod_template_annotations.insert(
1166            "materialize.prometheus.io/mz_frontier_path".to_owned(),
1167            "/metrics/mz_frontier".to_string(),
1168        );
1169        pod_template_annotations.insert(
1170            "materialize.prometheus.io/mz_compute_path".to_owned(),
1171            "/metrics/mz_compute".to_string(),
1172        );
1173        pod_template_annotations.insert(
1174            "materialize.prometheus.io/mz_storage_path".to_owned(),
1175            "/metrics/mz_storage".to_string(),
1176        );
1177    }
1178    pod_template_annotations.extend(
1179        mz.spec
1180            .pod_annotations
1181            .as_ref()
1182            .map(|annotations| annotations.iter())
1183            .unwrap_or_default()
1184            .map(|(key, value)| (key.clone(), value.clone())),
1185    );
1186
1187    let mut tolerations = vec![
1188        // When the node becomes `NotReady` it indicates there is a problem with the node,
1189        // By default kubernetes waits 300s (5 minutes) before doing anything in this case,
1190        // But we want to limit this to 30s for faster recovery
1191        Toleration {
1192            effect: Some("NoExecute".into()),
1193            key: Some("node.kubernetes.io/not-ready".into()),
1194            operator: Some("Exists".into()),
1195            toleration_seconds: Some(30),
1196            value: None,
1197        },
1198        Toleration {
1199            effect: Some("NoExecute".into()),
1200            key: Some("node.kubernetes.io/unreachable".into()),
1201            operator: Some("Exists".into()),
1202            toleration_seconds: Some(30),
1203            value: None,
1204        },
1205    ];
1206    if let Some(user_tolerations) = &config.environmentd_tolerations {
1207        tolerations.extend(user_tolerations.iter().cloned());
1208    }
1209    let tolerations = Some(tolerations);
1210
1211    let pod_template_spec = PodTemplateSpec {
1212        // not using managed_resource_meta because the pod should be owned
1213        // by the statefulset, not the materialize instance
1214        metadata: Some(ObjectMeta {
1215            labels: Some(pod_template_labels),
1216            annotations: Some(pod_template_annotations), // This is inserted into later, do not delete.
1217            ..Default::default()
1218        }),
1219        spec: Some(PodSpec {
1220            containers: vec![container],
1221            node_selector: Some(
1222                config
1223                    .environmentd_node_selector
1224                    .iter()
1225                    .map(|selector| (selector.key.clone(), selector.value.clone()))
1226                    .collect(),
1227            ),
1228            affinity: config.environmentd_affinity.clone(),
1229            scheduler_name: config.scheduler_name.clone(),
1230            service_account_name: Some(mz.service_account_name()),
1231            volumes: Some(volumes),
1232            security_context: Some(PodSecurityContext {
1233                fs_group: Some(999),
1234                run_as_user: Some(999),
1235                run_as_group: Some(999),
1236                ..Default::default()
1237            }),
1238            tolerations,
1239            // This (apparently) has the side effect of automatically starting a new pod
1240            // when the previous pod is currently terminating. This side steps the statefulset fencing
1241            // but allows for quicker recovery from a failed node
1242            // The Kubernetes documentation strongly advises against this
1243            // setting, as StatefulSets attempt to provide "at most once"
1244            // semantics [0]-- that is, the guarantee that for a given pod in a
1245            // StatefulSet there is *at most* one pod with that identity running
1246            // in the cluster
1247            //
1248            // Materialize, however, has been carefully designed to *not* rely
1249            // on this guarantee. (In fact, we do not believe that correct
1250            // distributed systems can meaningfully rely on Kubernetes's
1251            // guarantee--network packets from a pod can be arbitrarily delayed,
1252            // long past that pod's termination.) Running two `environmentd`
1253            // processes is safe: the newer process will safely and correctly
1254            // fence out the older process via CockroachDB. In the future,
1255            // we'll support running multiple `environmentd` processes in
1256            // parallel for high availability.
1257            //
1258            // [0]: https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#statefulset-considerations
1259            termination_grace_period_seconds: Some(0),
1260            ..Default::default()
1261        }),
1262    };
1263
1264    let mut match_labels = BTreeMap::new();
1265    match_labels.insert(
1266        "materialize.cloud/name".to_owned(),
1267        mz.environmentd_statefulset_name(generation),
1268    );
1269
1270    let statefulset_spec = StatefulSetSpec {
1271        replicas: Some(1),
1272        template: pod_template_spec,
1273        service_name: Some(mz.environmentd_service_name()),
1274        selector: LabelSelector {
1275            match_expressions: None,
1276            match_labels: Some(match_labels),
1277        },
1278        ..Default::default()
1279    };
1280
1281    StatefulSet {
1282        metadata: ObjectMeta {
1283            annotations: Some(btreemap! {
1284                "materialize.cloud/generation".to_owned() => generation.to_string(),
1285                "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1286            }),
1287            ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1288        },
1289        spec: Some(statefulset_spec),
1290        status: None,
1291    }
1292}
1293
1294fn create_connection_info(
1295    config: &super::Config,
1296    mz: &Materialize,
1297    generation: u64,
1298) -> ConnectionInfo {
1299    let external_enable_tls = issuer_ref_defined(
1300        &config.default_certificate_specs.internal,
1301        &mz.spec.internal_certificate_spec,
1302    );
1303    let authenticator_kind = mz.spec.authenticator_kind;
1304
1305    let mut listeners_config = ListenersConfig {
1306        sql: btreemap! {
1307            "external".to_owned() => SqlListenerConfig{
1308                addr: SocketAddr::new(
1309                    IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1310                    config.environmentd_sql_port,
1311                ),
1312                authenticator_kind,
1313                allowed_roles: AllowedRoles::Normal,
1314                enable_tls: external_enable_tls,
1315            },
1316            "internal".to_owned() => SqlListenerConfig{
1317                addr: SocketAddr::new(
1318                    IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1319                    config.environmentd_internal_sql_port,
1320                ),
1321                authenticator_kind: AuthenticatorKind::None,
1322                // Should this just be Internal?
1323                allowed_roles: AllowedRoles::NormalAndInternal,
1324                enable_tls: false,
1325            },
1326        },
1327        http: btreemap! {
1328            "external".to_owned() => HttpListenerConfig{
1329                base: BaseListenerConfig {
1330                    addr: SocketAddr::new(
1331                        IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1332                        config.environmentd_http_port,
1333                    ),
1334                    // SASL authentication is only supported for SQL (PostgreSQL wire protocol).
1335                    // HTTP listeners must use Password authentication when SASL is enabled.
1336                    // This is validated at environmentd startup via ListenerConfig::validate().
1337                    authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1338                        AuthenticatorKind::Password
1339                    } else {
1340                        authenticator_kind
1341                    },
1342                    allowed_roles: AllowedRoles::Normal,
1343                    enable_tls: external_enable_tls,
1344                },
1345                routes: HttpRoutesEnabled{
1346                    base: true,
1347                    webhook: true,
1348                    internal: false,
1349                    metrics: false,
1350                    profiling: false,
1351                    mcp_agent: true,
1352                    mcp_developer: true,
1353                    console_config: true,
1354                }
1355            },
1356            "internal".to_owned() => HttpListenerConfig{
1357                base: BaseListenerConfig {
1358                    addr: SocketAddr::new(
1359                        IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1360                        config.environmentd_internal_http_port,
1361                    ),
1362                    authenticator_kind: AuthenticatorKind::None,
1363                    // Should this just be Internal?
1364                    allowed_roles: AllowedRoles::NormalAndInternal,
1365                    enable_tls: false,
1366                },
1367                routes: HttpRoutesEnabled{
1368                    base: true,
1369                    webhook: true,
1370                    internal: true,
1371                    metrics: true,
1372                    profiling: true,
1373                    mcp_agent: true,
1374                    mcp_developer: true,
1375                    console_config: false,
1376                }
1377            },
1378        },
1379    };
1380
1381    if matches!(
1382        authenticator_kind,
1383        AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc
1384    ) {
1385        listeners_config.sql.remove("internal");
1386        listeners_config.http.remove("internal");
1387
1388        listeners_config.sql.get_mut("external").map(|listener| {
1389            listener.allowed_roles = AllowedRoles::NormalAndInternal;
1390            listener
1391        });
1392        listeners_config.http.get_mut("external").map(|listener| {
1393            listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1394            listener.routes.internal = true;
1395            listener.routes.profiling = true;
1396            listener
1397        });
1398
1399        listeners_config.http.insert(
1400            "metrics".to_owned(),
1401            HttpListenerConfig {
1402                base: BaseListenerConfig {
1403                    addr: SocketAddr::new(
1404                        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1405                        config.environmentd_internal_http_port,
1406                    ),
1407                    authenticator_kind: AuthenticatorKind::None,
1408                    allowed_roles: AllowedRoles::NormalAndInternal,
1409                    enable_tls: false,
1410                },
1411                routes: HttpRoutesEnabled {
1412                    base: false,
1413                    webhook: false,
1414                    internal: false,
1415                    metrics: true,
1416                    profiling: false,
1417                    mcp_agent: false,
1418                    mcp_developer: false,
1419                    console_config: false,
1420                },
1421            },
1422        );
1423    }
1424
1425    let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1426    let listeners_configmap = ConfigMap {
1427        binary_data: None,
1428        data: Some(btreemap! {
1429            "listeners.json".to_owned() => listeners_json,
1430        }),
1431        immutable: None,
1432        metadata: ObjectMeta {
1433            annotations: Some(btreemap! {
1434                "materialize.cloud/generation".to_owned() => generation.to_string(),
1435            }),
1436            ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1437        },
1438    };
1439
1440    let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1441        AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc => {
1442            let scheme = if external_enable_tls { "https" } else { "http" };
1443            (
1444                scheme,
1445                config.environmentd_http_port,
1446                Some(mz.spec.backend_secret_name.clone()),
1447            )
1448        }
1449        _ => ("http", config.environmentd_internal_http_port, None),
1450    };
1451    let environmentd_url = format!(
1452        "{}://{}.{}.svc.cluster.local:{}",
1453        scheme,
1454        mz.environmentd_generation_service_name(generation),
1455        mz.namespace(),
1456        leader_api_port,
1457    );
1458    ConnectionInfo {
1459        environmentd_url,
1460        listeners_configmap,
1461        mz_system_secret_name,
1462    }
1463}
1464
1465// see materialize/src/environmentd/src/http.rs
1466#[derive(Debug, Deserialize, PartialEq, Eq)]
1467struct BecomeLeaderResponse {
1468    result: BecomeLeaderResult,
1469}
1470
1471#[derive(Debug, Deserialize, PartialEq, Eq)]
1472enum BecomeLeaderResult {
1473    Success,
1474    Failure { message: String },
1475}
1476
1477#[derive(Debug, Deserialize, PartialEq, Eq)]
1478struct SkipCatchupError {
1479    message: String,
1480}