1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25 },
26 },
27 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33 ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51 major: 0,
52 minor: 140,
53 patch: 0,
54 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 147,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68 major: 0,
69 minor: 154,
70 patch: 0,
71 pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 26,
78 minor: 1,
79 patch: 0,
80 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84#[derive(
89 Debug,
90 Serialize,
91 Deserialize,
92 Clone,
93 Copy,
94 PartialEq,
95 Eq,
96 PartialOrd,
97 Ord
98)]
99pub enum DeploymentStatus {
100 Initializing,
103 ReadyToPromote,
105 Promoting,
107 IsLeader,
109}
110
111#[derive(
112 Debug,
113 Serialize,
114 Deserialize,
115 Clone,
116 Copy,
117 PartialEq,
118 Eq,
119 PartialOrd,
120 Ord
121)]
122pub struct GetLeaderStatusResponse {
123 status: DeploymentStatus,
124}
125
126#[derive(Deserialize, Serialize)]
127pub struct LoginCredentials {
128 username: String,
129 password: String,
131}
132
133#[derive(Debug, Serialize)]
134pub struct ConnectionInfo {
135 pub environmentd_url: String,
136 pub mz_system_secret_name: Option<String>,
137 pub listeners_configmap: ConfigMap,
138}
139
140#[derive(Debug, Serialize)]
141pub struct Resources {
142 pub generation: u64,
143 pub public_service: Box<Service>,
144 pub generation_service: Box<Service>,
145 pub persist_pubsub_service: Box<Service>,
146 pub environmentd_statefulset: Box<StatefulSet>,
147 pub connection_info: Box<ConnectionInfo>,
148}
149
150impl Resources {
151 pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
152 let public_service = Box::new(create_public_service_object(config, mz, generation));
153 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
154 let persist_pubsub_service =
155 Box::new(create_persist_pubsub_service(config, mz, generation));
156 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157 config, mz, generation,
158 ));
159 let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161 Self {
162 generation,
163 public_service,
164 generation_service,
165 persist_pubsub_service,
166 environmentd_statefulset,
167 connection_info,
168 }
169 }
170
171 #[instrument]
172 pub async fn apply(
173 &self,
174 client: &Client,
175 force_promote: bool,
176 namespace: &str,
177 ) -> Result<Option<Action>, Error> {
178 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
179 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
180 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
181
182 trace!("applying environmentd per-generation service");
183 apply_resource(&service_api, &*self.generation_service).await?;
184
185 trace!("creating persist pubsub service");
186 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
187
188 trace!("applying listeners configmap");
189 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
190
191 trace!("creating new environmentd statefulset");
192 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
193
194 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
195
196 let statefulset = get_resource(
197 &statefulset_api,
198 &self.environmentd_statefulset.name_unchecked(),
199 )
200 .await?;
201 if statefulset
202 .and_then(|statefulset| statefulset.status)
203 .and_then(|status| status.ready_replicas)
204 .unwrap_or(0)
205 == 0
206 {
207 trace!("environmentd statefulset is not ready yet...");
208 return Ok(Some(retry_action));
209 }
210
211 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
212 return Ok(Some(retry_action));
213 };
214 let status_url = reqwest::Url::parse(&format!(
215 "{}/api/leader/status",
216 self.connection_info.environmentd_url,
217 ))
218 .unwrap();
219
220 match http_client.get(status_url.clone()).send().await {
221 Ok(response) => {
222 let response: GetLeaderStatusResponse = match response.error_for_status() {
223 Ok(response) => response.json().await?,
224 Err(e) => {
225 trace!("failed to get status of environmentd, retrying... ({e})");
226 return Ok(Some(retry_action));
227 }
228 };
229 if force_promote {
230 trace!("skipping cluster catchup");
231 let skip_catchup_url = reqwest::Url::parse(&format!(
232 "{}/api/leader/skip-catchup",
233 self.connection_info.environmentd_url,
234 ))
235 .unwrap();
236 let response = http_client.post(skip_catchup_url).send().await?;
237 if response.status() == StatusCode::BAD_REQUEST {
238 let err: SkipCatchupError = response.json().await?;
239 return Err(
240 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
241 );
242 }
243 } else {
244 match response.status {
245 DeploymentStatus::Initializing => {
246 trace!("environmentd is still initializing, retrying...");
247 return Ok(Some(retry_action));
248 }
249 DeploymentStatus::ReadyToPromote
250 | DeploymentStatus::Promoting
251 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
252 }
253 }
254 }
255 Err(e) => {
256 trace!("failed to connect to environmentd, retrying... ({e})");
257 return Ok(Some(retry_action));
258 }
259 }
260
261 Ok(None)
262 }
263
264 #[instrument]
265 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
266 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
267 Some(match &self.connection_info.mz_system_secret_name {
268 Some(mz_system_secret_name) => {
269 let http_client = reqwest::Client::builder()
270 .timeout(std::time::Duration::from_secs(10))
271 .cookie_store(true)
272 .danger_accept_invalid_certs(true)
274 .build()
275 .unwrap();
276
277 let secret = secret_api
278 .get(mz_system_secret_name)
279 .await
280 .map_err(|e| {
281 error!("Failed to get backend secret: {:?}", e);
282 e
283 })
284 .ok()?;
285 if let Some(data) = secret.data {
286 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
287 let password = String::from_utf8_lossy(&password.0).to_string();
288 let login_url = reqwest::Url::parse(&format!(
289 "{}/api/login",
290 self.connection_info.environmentd_url,
291 ))
292 .unwrap();
293 match http_client
294 .post(login_url)
295 .body(
296 serde_json::to_string(&LoginCredentials {
297 username: "mz_system".to_owned(),
298 password,
299 })
300 .expect(
301 "Serializing a simple struct with utf8 strings doesn't fail.",
302 ),
303 )
304 .header("Content-Type", "application/json")
305 .send()
306 .await
307 {
308 Ok(response) => {
309 if let Err(e) = response.error_for_status() {
310 trace!("failed to login to environmentd, retrying... ({e})");
311 return None;
312 }
313 }
314 Err(e) => {
315 trace!("failed to connect to environmentd, retrying... ({e})");
316 return None;
317 }
318 };
319 }
320 };
321 http_client
322 }
323 None => reqwest::Client::builder()
324 .timeout(std::time::Duration::from_secs(10))
325 .build()
326 .unwrap(),
327 })
328 }
329
330 #[instrument]
331 pub async fn promote_services(
332 &self,
333 client: &Client,
334 namespace: &str,
335 ) -> Result<Option<Action>, Error> {
336 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
337 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
338
339 let promote_url = reqwest::Url::parse(&format!(
340 "{}/api/leader/promote",
341 self.connection_info.environmentd_url,
342 ))
343 .unwrap();
344
345 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
346 return Ok(Some(retry_action));
347 };
348
349 trace!("promoting new environmentd to leader");
350 let response = http_client.post(promote_url).send().await?;
351 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
352 if let BecomeLeaderResult::Failure { message } = response.result {
353 return Err(Error::Anyhow(anyhow::anyhow!(
354 "failed to promote new environmentd: {message}"
355 )));
356 }
357
358 let status_url = reqwest::Url::parse(&format!(
370 "{}/api/leader/status",
371 self.connection_info.environmentd_url,
372 ))
373 .unwrap();
374 match http_client.get(status_url.clone()).send().await {
375 Ok(response) => {
376 let response: GetLeaderStatusResponse = response.json().await?;
377 if response.status != DeploymentStatus::IsLeader {
378 trace!(
379 "environmentd is still promoting (status: {:?}), retrying...",
380 response.status
381 );
382 return Ok(Some(retry_action));
383 } else {
384 trace!("environmentd is ready");
385 }
386 }
387 Err(e) => {
388 trace!("failed to connect to environmentd, retrying... ({e})");
389 return Ok(Some(retry_action));
390 }
391 }
392
393 trace!("applying environmentd public service");
394 apply_resource(&service_api, &*self.public_service).await?;
395
396 Ok(None)
397 }
398
399 #[instrument]
400 pub async fn teardown_generation(
401 &self,
402 client: &Client,
403 mz: &Materialize,
404 generation: u64,
405 ) -> Result<(), anyhow::Error> {
406 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
407 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
408 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
409
410 trace!("deleting environmentd statefulset for generation {generation}");
411 delete_resource(
412 &statefulset_api,
413 &mz.environmentd_statefulset_name(generation),
414 )
415 .await?;
416
417 trace!("deleting persist pubsub service for generation {generation}");
418 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
419
420 trace!("deleting environmentd per-generation service for generation {generation}");
421 delete_resource(
422 &service_api,
423 &mz.environmentd_generation_service_name(generation),
424 )
425 .await?;
426
427 trace!("deleting listeners configmap for generation {generation}");
428 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
429
430 Ok(())
431 }
432
433 pub fn generate_hash(&self) -> String {
436 let mut hasher = Sha256::new();
437 hasher.update(&serde_json::to_string(self).unwrap());
438 format!("{:x}", hasher.finalize())
439 }
440}
441
442fn create_public_service_object(
443 config: &super::Config,
444 mz: &Materialize,
445 generation: u64,
446) -> Service {
447 create_base_service_object(
448 config,
449 mz,
450 generation,
451 &mz.environmentd_service_name(),
452 true,
453 )
454}
455
456fn create_generation_service_object(
457 config: &super::Config,
458 mz: &Materialize,
459 generation: u64,
460) -> Service {
461 create_base_service_object(
462 config,
463 mz,
464 generation,
465 &mz.environmentd_generation_service_name(generation),
466 false,
467 )
468}
469
470fn create_base_service_object(
471 config: &super::Config,
472 mz: &Materialize,
473 generation: u64,
474 service_name: &str,
475 headless: bool,
476) -> Service {
477 let ports = vec![
478 ServicePort {
479 port: config.environmentd_sql_port.into(),
480 protocol: Some("TCP".to_string()),
481 name: Some("sql".to_string()),
482 ..Default::default()
483 },
484 ServicePort {
485 port: config.environmentd_http_port.into(),
486 protocol: Some("TCP".to_string()),
487 name: Some("https".to_string()),
488 ..Default::default()
489 },
490 ServicePort {
491 port: config.environmentd_internal_sql_port.into(),
492 protocol: Some("TCP".to_string()),
493 name: Some("internal-sql".to_string()),
494 ..Default::default()
495 },
496 ServicePort {
497 port: config.environmentd_internal_http_port.into(),
498 protocol: Some("TCP".to_string()),
499 name: Some("internal-http".to_string()),
500 ..Default::default()
501 },
502 ];
503
504 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
505
506 let spec = ServiceSpec {
507 type_: Some("ClusterIP".to_string()),
508 cluster_ip: if headless {
509 Some("None".to_string())
510 } else {
511 None
512 },
513 selector: Some(selector),
514 ports: Some(ports),
515 ..Default::default()
516 };
517
518 Service {
519 metadata: mz.managed_resource_meta(service_name.to_string()),
520 spec: Some(spec),
521 status: None,
522 }
523}
524
525fn create_persist_pubsub_service(
526 config: &super::Config,
527 mz: &Materialize,
528 generation: u64,
529) -> Service {
530 Service {
531 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
532 spec: Some(ServiceSpec {
533 type_: Some("ClusterIP".to_string()),
534 cluster_ip: Some("None".to_string()),
535 selector: Some(btreemap! {
536 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
537 }),
538 ports: Some(vec![ServicePort {
539 name: Some("grpc".to_string()),
540 protocol: Some("TCP".to_string()),
541 port: config.environmentd_internal_persist_pubsub_port.into(),
542 ..Default::default()
543 }]),
544 ..Default::default()
545 }),
546 status: None,
547 }
548}
549
550fn create_environmentd_statefulset_object(
551 config: &super::Config,
552 mz: &Materialize,
553 generation: u64,
554) -> StatefulSet {
555 let mut env = vec![
565 EnvVar {
566 name: "MZ_METADATA_BACKEND_URL".to_string(),
567 value_from: Some(EnvVarSource {
568 secret_key_ref: Some(SecretKeySelector {
569 name: mz.backend_secret_name(),
570 key: "metadata_backend_url".to_string(),
571 optional: Some(false),
572 }),
573 ..Default::default()
574 }),
575 ..Default::default()
576 },
577 EnvVar {
578 name: "MZ_PERSIST_BLOB_URL".to_string(),
579 value_from: Some(EnvVarSource {
580 secret_key_ref: Some(SecretKeySelector {
581 name: mz.backend_secret_name(),
582 key: "persist_backend_url".to_string(),
583 optional: Some(false),
584 }),
585 ..Default::default()
586 }),
587 ..Default::default()
588 },
589 ];
590
591 env.push(EnvVar {
592 name: "AWS_REGION".to_string(),
593 value: Some(config.region.clone()),
594 ..Default::default()
595 });
596
597 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
598
599 let mut args = vec![];
600
601 if let Some(helm_chart_version) = &config.helm_chart_version {
602 args.push(format!("--helm-chart-version={helm_chart_version}"));
603 }
604
605 args.push(format!(
607 "--environment-id={}",
608 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
609 ));
610
611 args.push(format!(
613 "--clusterd-image={}",
614 matching_image_from_environmentd_image_ref(
615 &mz.spec.environmentd_image_ref,
616 "clusterd",
617 None
618 )
619 ));
620
621 args.extend(
623 [
624 config
625 .environmentd_cluster_replica_sizes
626 .as_ref()
627 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
628 config
629 .bootstrap_default_cluster_replica_size
630 .as_ref()
631 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
632 config
633 .bootstrap_builtin_system_cluster_replica_size
634 .as_ref()
635 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
636 config
637 .bootstrap_builtin_probe_cluster_replica_size
638 .as_ref()
639 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
640 config
641 .bootstrap_builtin_support_cluster_replica_size
642 .as_ref()
643 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
644 config
645 .bootstrap_builtin_catalog_server_cluster_replica_size
646 .as_ref()
647 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
648 config
649 .bootstrap_builtin_analytics_cluster_replica_size
650 .as_ref()
651 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
652 config
653 .bootstrap_builtin_system_cluster_replication_factor
654 .as_ref()
655 .map(|replication_factor| {
656 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
657 }),
658 config
659 .bootstrap_builtin_probe_cluster_replication_factor
660 .as_ref()
661 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
662 config
663 .bootstrap_builtin_support_cluster_replication_factor
664 .as_ref()
665 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
666 config
667 .bootstrap_builtin_analytics_cluster_replication_factor
668 .as_ref()
669 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
670 ]
671 .into_iter()
672 .flatten(),
673 );
674
675 args.extend(
676 config
677 .environmentd_allowed_origins
678 .iter()
679 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
680 );
681
682 args.push(format!(
683 "--secrets-controller={}",
684 config.secrets_controller
685 ));
686
687 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
688 if let Ok(cluster_replica_sizes) =
689 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
690 {
691 let cluster_replica_sizes: Vec<_> =
692 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
693 args.push(format!(
694 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
695 cluster_replica_sizes.join("', '")
696 ));
697 }
698 }
699 if !config.cloud_provider.is_cloud() {
700 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
701 }
702
703 if config.enable_internal_statement_logging {
704 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
705 }
706
707 if config.disable_statement_logging {
708 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
709 }
710
711 if !mz.spec.enable_rbac {
712 args.push("--system-parameter-default=enable_rbac_checks=false".into());
713 }
714
715 args.push("--persist-isolated-runtime-threads=-1".to_string());
719
720 if config.cloud_provider == CloudProvider::Aws {
722 if let Some(azs) = config.environmentd_availability_zones.as_ref() {
723 for az in azs {
724 args.push(format!("--availability-zone={az}"));
725 }
726 }
727
728 if let Some(environmentd_connection_role_arn) = mz
729 .spec
730 .environmentd_connection_role_arn
731 .as_deref()
732 .or(config.environmentd_connection_role_arn.as_deref())
733 {
734 args.push(format!(
735 "--aws-connection-role-arn={}",
736 environmentd_connection_role_arn
737 ));
738 }
739 if let Some(account_id) = &config.aws_account_id {
740 args.push(format!("--aws-account-id={account_id}"));
741 }
742
743 args.extend([format!(
744 "--aws-secrets-controller-tags=Environment={}",
745 mz.name_unchecked()
746 )]);
747 args.extend_from_slice(&config.aws_secrets_controller_tags);
748 }
749
750 args.extend([
752 "--orchestrator=kubernetes".into(),
753 format!(
754 "--orchestrator-kubernetes-service-account={}",
755 &mz.service_account_name()
756 ),
757 format!(
758 "--orchestrator-kubernetes-image-pull-policy={}",
759 config.image_pull_policy.as_kebab_case_str(),
760 ),
761 ]);
762 for selector in &config.clusterd_node_selector {
763 args.push(format!(
764 "--orchestrator-kubernetes-service-node-selector={}={}",
765 selector.key, selector.value,
766 ));
767 }
768 if mz.meets_minimum_version(&V144) {
769 if let Some(affinity) = &config.clusterd_affinity {
770 let affinity = serde_json::to_string(affinity).unwrap();
771 args.push(format!(
772 "--orchestrator-kubernetes-service-affinity={affinity}"
773 ))
774 }
775 if let Some(tolerations) = &config.clusterd_tolerations {
776 let tolerations = serde_json::to_string(tolerations).unwrap();
777 args.push(format!(
778 "--orchestrator-kubernetes-service-tolerations={tolerations}"
779 ))
780 }
781 }
782 if let Some(scheduler_name) = &config.scheduler_name {
783 args.push(format!(
784 "--orchestrator-kubernetes-scheduler-name={}",
785 scheduler_name
786 ));
787 }
788 if mz.meets_minimum_version(&V154_DEV0) {
789 args.extend(
790 mz.spec
791 .pod_annotations
792 .as_ref()
793 .map(|annotations| annotations.iter())
794 .unwrap_or_default()
795 .map(|(key, val)| {
796 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
797 }),
798 );
799 }
800 args.extend(
801 mz.default_labels()
802 .iter()
803 .chain(
804 mz.spec
805 .pod_labels
806 .as_ref()
807 .map(|labels| labels.iter())
808 .unwrap_or_default(),
809 )
810 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
811 );
812 if let Some(status) = &mz.status {
813 args.push(format!(
814 "--orchestrator-kubernetes-name-prefix=mz{}-",
815 status.resource_id
816 ));
817 }
818
819 args.extend(["--log-format=json".into()]);
821 if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
822 args.push(format!("--opentelemetry-endpoint={}", endpoint));
823 }
824 args.extend([
826 format!(
827 "--opentelemetry-resource=organization_id={}",
828 mz.spec.environment_id
829 ),
830 format!(
831 "--opentelemetry-resource=environment_name={}",
832 mz.name_unchecked()
833 ),
834 ]);
835
836 if let Some(segment_api_key) = &config.segment_api_key {
837 args.push(format!("--segment-api-key={}", segment_api_key));
838 if config.segment_client_side {
839 args.push("--segment-client-side".into());
840 }
841 }
842
843 let mut volumes = Vec::new();
844 let mut volume_mounts = Vec::new();
845 if issuer_ref_defined(
846 &config.default_certificate_specs.internal,
847 &mz.spec.internal_certificate_spec,
848 ) {
849 volumes.push(Volume {
850 name: "certificate".to_owned(),
851 secret: Some(SecretVolumeSource {
852 default_mode: Some(0o400),
853 secret_name: Some(mz.environmentd_certificate_secret_name()),
854 items: None,
855 optional: Some(false),
856 }),
857 ..Default::default()
858 });
859 volume_mounts.push(VolumeMount {
860 name: "certificate".to_owned(),
861 mount_path: "/etc/materialized".to_owned(),
862 read_only: Some(true),
863 ..Default::default()
864 });
865 args.extend([
866 "--tls-mode=require".into(),
867 "--tls-cert=/etc/materialized/tls.crt".into(),
868 "--tls-key=/etc/materialized/tls.key".into(),
869 ]);
870 } else {
871 args.push("--tls-mode=disable".to_string());
872 }
873 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
874 args.push(format!(
875 "--orchestrator-kubernetes-ephemeral-volume-class={}",
876 ephemeral_volume_class
877 ));
878 }
879 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
881
882 if mz.meets_minimum_version(&V26_1_0) {
886 if let Some(ref name) = mz.spec.system_parameter_configmap_name {
887 volumes.push(Volume {
888 name: "system-params".to_string(),
889 config_map: Some(ConfigMapVolumeSource {
890 default_mode: Some(0o400),
891 name: name.to_owned(),
892 items: None,
893 optional: Some(true),
894 }),
895 ..Default::default()
896 });
897 volume_mounts.push(VolumeMount {
898 name: "system-params".to_string(),
899 mount_path: "/system-params".to_owned(),
901 read_only: Some(true),
902 ..Default::default()
903 });
904 args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
905 args.push("--config-sync-loop-interval=1s".to_string());
906 }
907 }
908
909 if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
911 args.push(format!("--sentry-dsn={}", sentry_dsn));
912 if let Some(sentry_environment) = &config.tracing.sentry_environment {
913 args.push(format!("--sentry-environment={}", sentry_environment));
914 }
915 args.push(format!("--sentry-tag=region={}", config.region));
916 }
917
918 args.push(format!(
920 "--persist-pubsub-url=http://{}:{}",
921 mz.persist_pubsub_service_name(generation),
922 config.environmentd_internal_persist_pubsub_port,
923 ));
924 args.push(format!(
925 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
926 config.environmentd_internal_persist_pubsub_port
927 ));
928
929 args.push(format!("--deploy-generation={}", generation));
930
931 args.push(format!(
933 "--internal-console-redirect-url={}",
934 &config.internal_console_proxy_url,
935 ));
936
937 if !config.collect_pod_metrics {
938 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
939 }
940 if config.enable_prometheus_scrape_annotations {
941 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
942 }
943
944 if config.disable_license_key_checks {
947 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
948 args.push("--disable-license-key-checks".into());
949 }
950 }
951
952 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
955 || mz.meets_minimum_version(&V153)
956 {
957 volume_mounts.push(VolumeMount {
958 name: "license-key".to_string(),
959 mount_path: "/license_key".to_string(),
960 ..Default::default()
961 });
962 volumes.push(Volume {
963 name: "license-key".to_string(),
964 secret: Some(SecretVolumeSource {
965 default_mode: Some(256),
966 optional: Some(false),
967 secret_name: Some(mz.backend_secret_name()),
968 items: Some(vec![KeyToPath {
969 key: "license_key".to_string(),
970 path: "license_key".to_string(),
971 ..Default::default()
972 }]),
973 ..Default::default()
974 }),
975 ..Default::default()
976 });
977 env.push(EnvVar {
978 name: "MZ_LICENSE_KEY".to_string(),
979 value: Some("/license_key/license_key".to_string()),
980 ..Default::default()
981 });
982 }
983
984 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
986 args.extend(extra_args.iter().cloned());
987 }
988
989 let probe = Probe {
990 initial_delay_seconds: Some(1),
991 failure_threshold: Some(12),
992 tcp_socket: Some(TCPSocketAction {
993 host: None,
994 port: IntOrString::Int(config.environmentd_sql_port.into()),
995 }),
996 ..Default::default()
997 };
998
999 let security_context = if config.enable_security_context {
1000 Some(SecurityContext {
1004 run_as_non_root: Some(true),
1005 capabilities: Some(Capabilities {
1006 drop: Some(vec!["ALL".to_string()]),
1007 ..Default::default()
1008 }),
1009 seccomp_profile: Some(SeccompProfile {
1010 type_: "RuntimeDefault".to_string(),
1011 ..Default::default()
1012 }),
1013 allow_privilege_escalation: Some(false),
1014 ..Default::default()
1015 })
1016 } else {
1017 None
1018 };
1019
1020 let ports = vec![
1021 ContainerPort {
1022 container_port: config.environmentd_sql_port.into(),
1023 name: Some("sql".to_owned()),
1024 ..Default::default()
1025 },
1026 ContainerPort {
1027 container_port: config.environmentd_internal_sql_port.into(),
1028 name: Some("internal-sql".to_owned()),
1029 ..Default::default()
1030 },
1031 ContainerPort {
1032 container_port: config.environmentd_http_port.into(),
1033 name: Some("http".to_owned()),
1034 ..Default::default()
1035 },
1036 ContainerPort {
1037 container_port: config.environmentd_internal_http_port.into(),
1038 name: Some("internal-http".to_owned()),
1039 ..Default::default()
1040 },
1041 ContainerPort {
1042 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1043 name: Some("persist-pubsub".to_owned()),
1044 ..Default::default()
1045 },
1046 ];
1047
1048 if mz.meets_minimum_version(&V147_DEV0) {
1050 volume_mounts.push(VolumeMount {
1051 name: "listeners-configmap".to_string(),
1052 mount_path: "/listeners".to_string(),
1053 ..Default::default()
1054 });
1055 volumes.push(Volume {
1056 name: "listeners-configmap".to_string(),
1057 config_map: Some(ConfigMapVolumeSource {
1058 name: mz.listeners_configmap_name(generation),
1059 default_mode: Some(256),
1060 optional: Some(false),
1061 items: Some(vec![KeyToPath {
1062 key: "listeners.json".to_string(),
1063 path: "listeners.json".to_string(),
1064 ..Default::default()
1065 }]),
1066 }),
1067 ..Default::default()
1068 });
1069 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1070 if matches!(
1071 mz.spec.authenticator_kind,
1072 AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc
1073 ) {
1074 args.push("--system-parameter-default=enable_password_auth=true".into());
1075 env.push(EnvVar {
1076 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1077 value_from: Some(EnvVarSource {
1078 secret_key_ref: Some(SecretKeySelector {
1079 name: mz.backend_secret_name(),
1080 key: "external_login_password_mz_system".to_string(),
1081 optional: Some(false),
1082 }),
1083 ..Default::default()
1084 }),
1085 ..Default::default()
1086 })
1087 }
1088 } else {
1089 args.extend([
1090 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1091 format!(
1092 "--http-listen-addr=0.0.0.0:{}",
1093 config.environmentd_http_port
1094 ),
1095 format!(
1096 "--internal-sql-listen-addr=0.0.0.0:{}",
1097 config.environmentd_internal_sql_port
1098 ),
1099 format!(
1100 "--internal-http-listen-addr=0.0.0.0:{}",
1101 config.environmentd_internal_http_port
1102 ),
1103 ]);
1104 }
1105
1106 let container = Container {
1107 name: "environmentd".to_owned(),
1108 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1109 image_pull_policy: Some(config.image_pull_policy.to_string()),
1110 ports: Some(ports),
1111 args: Some(args),
1112 env: Some(env),
1113 volume_mounts: Some(volume_mounts),
1114 liveness_probe: Some(probe.clone()),
1115 readiness_probe: Some(probe),
1116 resources: mz
1117 .spec
1118 .environmentd_resource_requirements
1119 .clone()
1120 .or_else(|| config.environmentd_default_resources.clone()),
1121 security_context: security_context.clone(),
1122 ..Default::default()
1123 };
1124
1125 let mut pod_template_labels = mz.default_labels();
1126 pod_template_labels.insert(
1127 "materialize.cloud/name".to_owned(),
1128 mz.environmentd_statefulset_name(generation),
1129 );
1130 pod_template_labels.insert(
1131 "materialize.cloud/app".to_owned(),
1132 mz.environmentd_app_name(),
1133 );
1134 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1135 pod_template_labels.extend(
1136 mz.spec
1137 .pod_labels
1138 .as_ref()
1139 .map(|labels| labels.iter())
1140 .unwrap_or_default()
1141 .map(|(key, value)| (key.clone(), value.clone())),
1142 );
1143
1144 let mut pod_template_annotations = btreemap! {
1145 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1147
1148 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1150 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1151 "materialize.cloud/generation".to_owned() => generation.to_string(),
1152 };
1153 if config.enable_prometheus_scrape_annotations {
1154 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1155 pod_template_annotations.insert(
1156 "prometheus.io/port".to_owned(),
1157 config.environmentd_internal_http_port.to_string(),
1158 );
1159 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1160 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1161 pod_template_annotations.insert(
1162 "materialize.prometheus.io/mz_usage_path".to_owned(),
1163 "/metrics/mz_usage".to_string(),
1164 );
1165 pod_template_annotations.insert(
1166 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1167 "/metrics/mz_frontier".to_string(),
1168 );
1169 pod_template_annotations.insert(
1170 "materialize.prometheus.io/mz_compute_path".to_owned(),
1171 "/metrics/mz_compute".to_string(),
1172 );
1173 pod_template_annotations.insert(
1174 "materialize.prometheus.io/mz_storage_path".to_owned(),
1175 "/metrics/mz_storage".to_string(),
1176 );
1177 }
1178 pod_template_annotations.extend(
1179 mz.spec
1180 .pod_annotations
1181 .as_ref()
1182 .map(|annotations| annotations.iter())
1183 .unwrap_or_default()
1184 .map(|(key, value)| (key.clone(), value.clone())),
1185 );
1186
1187 let mut tolerations = vec![
1188 Toleration {
1192 effect: Some("NoExecute".into()),
1193 key: Some("node.kubernetes.io/not-ready".into()),
1194 operator: Some("Exists".into()),
1195 toleration_seconds: Some(30),
1196 value: None,
1197 },
1198 Toleration {
1199 effect: Some("NoExecute".into()),
1200 key: Some("node.kubernetes.io/unreachable".into()),
1201 operator: Some("Exists".into()),
1202 toleration_seconds: Some(30),
1203 value: None,
1204 },
1205 ];
1206 if let Some(user_tolerations) = &config.environmentd_tolerations {
1207 tolerations.extend(user_tolerations.iter().cloned());
1208 }
1209 let tolerations = Some(tolerations);
1210
1211 let pod_template_spec = PodTemplateSpec {
1212 metadata: Some(ObjectMeta {
1215 labels: Some(pod_template_labels),
1216 annotations: Some(pod_template_annotations), ..Default::default()
1218 }),
1219 spec: Some(PodSpec {
1220 containers: vec![container],
1221 node_selector: Some(
1222 config
1223 .environmentd_node_selector
1224 .iter()
1225 .map(|selector| (selector.key.clone(), selector.value.clone()))
1226 .collect(),
1227 ),
1228 affinity: config.environmentd_affinity.clone(),
1229 scheduler_name: config.scheduler_name.clone(),
1230 service_account_name: Some(mz.service_account_name()),
1231 volumes: Some(volumes),
1232 security_context: Some(PodSecurityContext {
1233 fs_group: Some(999),
1234 run_as_user: Some(999),
1235 run_as_group: Some(999),
1236 ..Default::default()
1237 }),
1238 tolerations,
1239 termination_grace_period_seconds: Some(0),
1260 ..Default::default()
1261 }),
1262 };
1263
1264 let mut match_labels = BTreeMap::new();
1265 match_labels.insert(
1266 "materialize.cloud/name".to_owned(),
1267 mz.environmentd_statefulset_name(generation),
1268 );
1269
1270 let statefulset_spec = StatefulSetSpec {
1271 replicas: Some(1),
1272 template: pod_template_spec,
1273 service_name: Some(mz.environmentd_service_name()),
1274 selector: LabelSelector {
1275 match_expressions: None,
1276 match_labels: Some(match_labels),
1277 },
1278 ..Default::default()
1279 };
1280
1281 StatefulSet {
1282 metadata: ObjectMeta {
1283 annotations: Some(btreemap! {
1284 "materialize.cloud/generation".to_owned() => generation.to_string(),
1285 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1286 }),
1287 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1288 },
1289 spec: Some(statefulset_spec),
1290 status: None,
1291 }
1292}
1293
1294fn create_connection_info(
1295 config: &super::Config,
1296 mz: &Materialize,
1297 generation: u64,
1298) -> ConnectionInfo {
1299 let external_enable_tls = issuer_ref_defined(
1300 &config.default_certificate_specs.internal,
1301 &mz.spec.internal_certificate_spec,
1302 );
1303 let authenticator_kind = mz.spec.authenticator_kind;
1304
1305 let mut listeners_config = ListenersConfig {
1306 sql: btreemap! {
1307 "external".to_owned() => SqlListenerConfig{
1308 addr: SocketAddr::new(
1309 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1310 config.environmentd_sql_port,
1311 ),
1312 authenticator_kind,
1313 allowed_roles: AllowedRoles::Normal,
1314 enable_tls: external_enable_tls,
1315 },
1316 "internal".to_owned() => SqlListenerConfig{
1317 addr: SocketAddr::new(
1318 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1319 config.environmentd_internal_sql_port,
1320 ),
1321 authenticator_kind: AuthenticatorKind::None,
1322 allowed_roles: AllowedRoles::NormalAndInternal,
1324 enable_tls: false,
1325 },
1326 },
1327 http: btreemap! {
1328 "external".to_owned() => HttpListenerConfig{
1329 base: BaseListenerConfig {
1330 addr: SocketAddr::new(
1331 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1332 config.environmentd_http_port,
1333 ),
1334 authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1338 AuthenticatorKind::Password
1339 } else {
1340 authenticator_kind
1341 },
1342 allowed_roles: AllowedRoles::Normal,
1343 enable_tls: external_enable_tls,
1344 },
1345 routes: HttpRoutesEnabled{
1346 base: true,
1347 webhook: true,
1348 internal: false,
1349 metrics: false,
1350 profiling: false,
1351 mcp_agent: true,
1352 mcp_developer: true,
1353 console_config: true,
1354 }
1355 },
1356 "internal".to_owned() => HttpListenerConfig{
1357 base: BaseListenerConfig {
1358 addr: SocketAddr::new(
1359 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1360 config.environmentd_internal_http_port,
1361 ),
1362 authenticator_kind: AuthenticatorKind::None,
1363 allowed_roles: AllowedRoles::NormalAndInternal,
1365 enable_tls: false,
1366 },
1367 routes: HttpRoutesEnabled{
1368 base: true,
1369 webhook: true,
1370 internal: true,
1371 metrics: true,
1372 profiling: true,
1373 mcp_agent: true,
1374 mcp_developer: true,
1375 console_config: false,
1376 }
1377 },
1378 },
1379 };
1380
1381 if matches!(
1382 authenticator_kind,
1383 AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc
1384 ) {
1385 listeners_config.sql.remove("internal");
1386 listeners_config.http.remove("internal");
1387
1388 listeners_config.sql.get_mut("external").map(|listener| {
1389 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1390 listener
1391 });
1392 listeners_config.http.get_mut("external").map(|listener| {
1393 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1394 listener.routes.internal = true;
1395 listener.routes.profiling = true;
1396 listener
1397 });
1398
1399 listeners_config.http.insert(
1400 "metrics".to_owned(),
1401 HttpListenerConfig {
1402 base: BaseListenerConfig {
1403 addr: SocketAddr::new(
1404 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1405 config.environmentd_internal_http_port,
1406 ),
1407 authenticator_kind: AuthenticatorKind::None,
1408 allowed_roles: AllowedRoles::NormalAndInternal,
1409 enable_tls: false,
1410 },
1411 routes: HttpRoutesEnabled {
1412 base: false,
1413 webhook: false,
1414 internal: false,
1415 metrics: true,
1416 profiling: false,
1417 mcp_agent: false,
1418 mcp_developer: false,
1419 console_config: false,
1420 },
1421 },
1422 );
1423 }
1424
1425 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1426 let listeners_configmap = ConfigMap {
1427 binary_data: None,
1428 data: Some(btreemap! {
1429 "listeners.json".to_owned() => listeners_json,
1430 }),
1431 immutable: None,
1432 metadata: ObjectMeta {
1433 annotations: Some(btreemap! {
1434 "materialize.cloud/generation".to_owned() => generation.to_string(),
1435 }),
1436 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1437 },
1438 };
1439
1440 let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1441 AuthenticatorKind::Password | AuthenticatorKind::Sasl | AuthenticatorKind::Oidc => {
1442 let scheme = if external_enable_tls { "https" } else { "http" };
1443 (
1444 scheme,
1445 config.environmentd_http_port,
1446 Some(mz.spec.backend_secret_name.clone()),
1447 )
1448 }
1449 _ => ("http", config.environmentd_internal_http_port, None),
1450 };
1451 let environmentd_url = format!(
1452 "{}://{}.{}.svc.cluster.local:{}",
1453 scheme,
1454 mz.environmentd_generation_service_name(generation),
1455 mz.namespace(),
1456 leader_api_port,
1457 );
1458 ConnectionInfo {
1459 environmentd_url,
1460 listeners_configmap,
1461 mz_system_secret_name,
1462 }
1463}
1464
1465#[derive(Debug, Deserialize, PartialEq, Eq)]
1467struct BecomeLeaderResponse {
1468 result: BecomeLeaderResult,
1469}
1470
1471#[derive(Debug, Deserialize, PartialEq, Eq)]
1472enum BecomeLeaderResult {
1473 Success,
1474 Failure { message: String },
1475}
1476
1477#[derive(Debug, Deserialize, PartialEq, Eq)]
1478struct SkipCatchupError {
1479 message: String,
1480}