1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25 },
26 },
27 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33 ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51 major: 0,
52 minor: 140,
53 patch: 0,
54 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 147,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68 major: 0,
69 minor: 154,
70 patch: 0,
71 pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 26,
78 minor: 1,
79 patch: 0,
80 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84#[derive(
89 Debug,
90 Serialize,
91 Deserialize,
92 Clone,
93 Copy,
94 PartialEq,
95 Eq,
96 PartialOrd,
97 Ord
98)]
99pub enum DeploymentStatus {
100 Initializing,
103 ReadyToPromote,
105 Promoting,
107 IsLeader,
109}
110
111#[derive(
112 Debug,
113 Serialize,
114 Deserialize,
115 Clone,
116 Copy,
117 PartialEq,
118 Eq,
119 PartialOrd,
120 Ord
121)]
122pub struct GetLeaderStatusResponse {
123 status: DeploymentStatus,
124}
125
126#[derive(Deserialize, Serialize)]
127pub struct LoginCredentials {
128 username: String,
129 password: String,
131}
132
133#[derive(Debug, Serialize)]
134pub struct ConnectionInfo {
135 pub environmentd_url: String,
136 pub mz_system_secret_name: Option<String>,
137 pub listeners_configmap: ConfigMap,
138}
139
140#[derive(Debug, Serialize)]
141pub struct Resources {
142 pub generation: u64,
143 pub public_service: Box<Service>,
144 pub generation_service: Box<Service>,
145 pub persist_pubsub_service: Box<Service>,
146 pub environmentd_statefulset: Box<StatefulSet>,
147 pub connection_info: Box<ConnectionInfo>,
148}
149
150impl Resources {
151 pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
152 let public_service = Box::new(create_public_service_object(config, mz, generation));
153 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
154 let persist_pubsub_service =
155 Box::new(create_persist_pubsub_service(config, mz, generation));
156 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
157 config, mz, generation,
158 ));
159 let connection_info = Box::new(create_connection_info(config, mz, generation));
160
161 Self {
162 generation,
163 public_service,
164 generation_service,
165 persist_pubsub_service,
166 environmentd_statefulset,
167 connection_info,
168 }
169 }
170
171 #[instrument]
172 pub async fn apply(
173 &self,
174 client: &Client,
175 force_promote: bool,
176 namespace: &str,
177 ) -> Result<Option<Action>, Error> {
178 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
179 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
180 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
181
182 trace!("applying environmentd per-generation service");
183 apply_resource(&service_api, &*self.generation_service).await?;
184
185 trace!("creating persist pubsub service");
186 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
187
188 trace!("applying listeners configmap");
189 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
190
191 trace!("creating new environmentd statefulset");
192 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
193
194 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
195
196 let statefulset = get_resource(
197 &statefulset_api,
198 &self.environmentd_statefulset.name_unchecked(),
199 )
200 .await?;
201 if statefulset
202 .and_then(|statefulset| statefulset.status)
203 .and_then(|status| status.ready_replicas)
204 .unwrap_or(0)
205 == 0
206 {
207 trace!("environmentd statefulset is not ready yet...");
208 return Ok(Some(retry_action));
209 }
210
211 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
212 return Ok(Some(retry_action));
213 };
214 let status_url = reqwest::Url::parse(&format!(
215 "{}/api/leader/status",
216 self.connection_info.environmentd_url,
217 ))
218 .unwrap();
219
220 match http_client.get(status_url.clone()).send().await {
221 Ok(response) => {
222 let response: GetLeaderStatusResponse = match response.error_for_status() {
223 Ok(response) => response.json().await?,
224 Err(e) => {
225 trace!("failed to get status of environmentd, retrying... ({e})");
226 return Ok(Some(retry_action));
227 }
228 };
229 if force_promote {
230 trace!("skipping cluster catchup");
231 let skip_catchup_url = reqwest::Url::parse(&format!(
232 "{}/api/leader/skip-catchup",
233 self.connection_info.environmentd_url,
234 ))
235 .unwrap();
236 let response = http_client.post(skip_catchup_url).send().await?;
237 if response.status() == StatusCode::BAD_REQUEST {
238 let err: SkipCatchupError = response.json().await?;
239 return Err(
240 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
241 );
242 }
243 } else {
244 match response.status {
245 DeploymentStatus::Initializing => {
246 trace!("environmentd is still initializing, retrying...");
247 return Ok(Some(retry_action));
248 }
249 DeploymentStatus::ReadyToPromote
250 | DeploymentStatus::Promoting
251 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
252 }
253 }
254 }
255 Err(e) => {
256 trace!("failed to connect to environmentd, retrying... ({e})");
257 return Ok(Some(retry_action));
258 }
259 }
260
261 Ok(None)
262 }
263
264 #[instrument]
265 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
266 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
267 Some(match &self.connection_info.mz_system_secret_name {
268 Some(mz_system_secret_name) => {
269 let http_client = reqwest::Client::builder()
270 .timeout(std::time::Duration::from_secs(10))
271 .cookie_store(true)
272 .danger_accept_invalid_certs(true)
274 .build()
275 .unwrap();
276
277 let secret = secret_api
278 .get(mz_system_secret_name)
279 .await
280 .map_err(|e| {
281 error!("Failed to get backend secret: {:?}", e);
282 e
283 })
284 .ok()?;
285 if let Some(data) = secret.data {
286 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
287 let password = String::from_utf8_lossy(&password.0).to_string();
288 let login_url = reqwest::Url::parse(&format!(
289 "{}/api/login",
290 self.connection_info.environmentd_url,
291 ))
292 .unwrap();
293 match http_client
294 .post(login_url)
295 .body(
296 serde_json::to_string(&LoginCredentials {
297 username: "mz_system".to_owned(),
298 password,
299 })
300 .expect(
301 "Serializing a simple struct with utf8 strings doesn't fail.",
302 ),
303 )
304 .header("Content-Type", "application/json")
305 .send()
306 .await
307 {
308 Ok(response) => {
309 if let Err(e) = response.error_for_status() {
310 trace!("failed to login to environmentd, retrying... ({e})");
311 return None;
312 }
313 }
314 Err(e) => {
315 trace!("failed to connect to environmentd, retrying... ({e})");
316 return None;
317 }
318 };
319 }
320 };
321 http_client
322 }
323 None => reqwest::Client::builder()
324 .timeout(std::time::Duration::from_secs(10))
325 .build()
326 .unwrap(),
327 })
328 }
329
330 #[instrument]
331 pub async fn promote_services(
332 &self,
333 client: &Client,
334 namespace: &str,
335 ) -> Result<Option<Action>, Error> {
336 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
337 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
338
339 let promote_url = reqwest::Url::parse(&format!(
340 "{}/api/leader/promote",
341 self.connection_info.environmentd_url,
342 ))
343 .unwrap();
344
345 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
346 return Ok(Some(retry_action));
347 };
348
349 trace!("promoting new environmentd to leader");
350 let response = http_client.post(promote_url).send().await?;
351 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
352 if let BecomeLeaderResult::Failure { message } = response.result {
353 return Err(Error::Anyhow(anyhow::anyhow!(
354 "failed to promote new environmentd: {message}"
355 )));
356 }
357
358 let status_url = reqwest::Url::parse(&format!(
370 "{}/api/leader/status",
371 self.connection_info.environmentd_url,
372 ))
373 .unwrap();
374 match http_client.get(status_url.clone()).send().await {
375 Ok(response) => {
376 let response: GetLeaderStatusResponse = response.json().await?;
377 if response.status != DeploymentStatus::IsLeader {
378 trace!(
379 "environmentd is still promoting (status: {:?}), retrying...",
380 response.status
381 );
382 return Ok(Some(retry_action));
383 } else {
384 trace!("environmentd is ready");
385 }
386 }
387 Err(e) => {
388 trace!("failed to connect to environmentd, retrying... ({e})");
389 return Ok(Some(retry_action));
390 }
391 }
392
393 trace!("applying environmentd public service");
394 apply_resource(&service_api, &*self.public_service).await?;
395
396 Ok(None)
397 }
398
399 #[instrument]
400 pub async fn teardown_generation(
401 &self,
402 client: &Client,
403 mz: &Materialize,
404 generation: u64,
405 ) -> Result<(), anyhow::Error> {
406 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
407 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
408 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
409
410 trace!("deleting environmentd statefulset for generation {generation}");
411 delete_resource(
412 &statefulset_api,
413 &mz.environmentd_statefulset_name(generation),
414 )
415 .await?;
416
417 trace!("deleting persist pubsub service for generation {generation}");
418 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
419
420 trace!("deleting environmentd per-generation service for generation {generation}");
421 delete_resource(
422 &service_api,
423 &mz.environmentd_generation_service_name(generation),
424 )
425 .await?;
426
427 trace!("deleting listeners configmap for generation {generation}");
428 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
429
430 Ok(())
431 }
432
433 pub fn generate_hash(&self) -> String {
436 let mut hasher = Sha256::new();
437 hasher.update(&serde_json::to_string(self).unwrap());
438 format!("{:x}", hasher.finalize())
439 }
440}
441
442fn create_public_service_object(
443 config: &super::Config,
444 mz: &Materialize,
445 generation: u64,
446) -> Service {
447 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
448}
449
450fn create_generation_service_object(
451 config: &super::Config,
452 mz: &Materialize,
453 generation: u64,
454) -> Service {
455 create_base_service_object(
456 config,
457 mz,
458 generation,
459 &mz.environmentd_generation_service_name(generation),
460 )
461}
462
463fn create_base_service_object(
464 config: &super::Config,
465 mz: &Materialize,
466 generation: u64,
467 service_name: &str,
468) -> Service {
469 let ports = vec![
470 ServicePort {
471 port: config.environmentd_sql_port.into(),
472 protocol: Some("TCP".to_string()),
473 name: Some("sql".to_string()),
474 ..Default::default()
475 },
476 ServicePort {
477 port: config.environmentd_http_port.into(),
478 protocol: Some("TCP".to_string()),
479 name: Some("https".to_string()),
480 ..Default::default()
481 },
482 ServicePort {
483 port: config.environmentd_internal_sql_port.into(),
484 protocol: Some("TCP".to_string()),
485 name: Some("internal-sql".to_string()),
486 ..Default::default()
487 },
488 ServicePort {
489 port: config.environmentd_internal_http_port.into(),
490 protocol: Some("TCP".to_string()),
491 name: Some("internal-http".to_string()),
492 ..Default::default()
493 },
494 ];
495
496 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
497
498 let spec = ServiceSpec {
499 type_: Some("ClusterIP".to_string()),
500 cluster_ip: Some("None".to_string()),
501 selector: Some(selector),
502 ports: Some(ports),
503 ..Default::default()
504 };
505
506 Service {
507 metadata: mz.managed_resource_meta(service_name.to_string()),
508 spec: Some(spec),
509 status: None,
510 }
511}
512
513fn create_persist_pubsub_service(
514 config: &super::Config,
515 mz: &Materialize,
516 generation: u64,
517) -> Service {
518 Service {
519 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
520 spec: Some(ServiceSpec {
521 type_: Some("ClusterIP".to_string()),
522 cluster_ip: Some("None".to_string()),
523 selector: Some(btreemap! {
524 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
525 }),
526 ports: Some(vec![ServicePort {
527 name: Some("grpc".to_string()),
528 protocol: Some("TCP".to_string()),
529 port: config.environmentd_internal_persist_pubsub_port.into(),
530 ..Default::default()
531 }]),
532 ..Default::default()
533 }),
534 status: None,
535 }
536}
537
538fn create_environmentd_statefulset_object(
539 config: &super::Config,
540 mz: &Materialize,
541 generation: u64,
542) -> StatefulSet {
543 let mut env = vec![
553 EnvVar {
554 name: "MZ_METADATA_BACKEND_URL".to_string(),
555 value_from: Some(EnvVarSource {
556 secret_key_ref: Some(SecretKeySelector {
557 name: mz.backend_secret_name(),
558 key: "metadata_backend_url".to_string(),
559 optional: Some(false),
560 }),
561 ..Default::default()
562 }),
563 ..Default::default()
564 },
565 EnvVar {
566 name: "MZ_PERSIST_BLOB_URL".to_string(),
567 value_from: Some(EnvVarSource {
568 secret_key_ref: Some(SecretKeySelector {
569 name: mz.backend_secret_name(),
570 key: "persist_backend_url".to_string(),
571 optional: Some(false),
572 }),
573 ..Default::default()
574 }),
575 ..Default::default()
576 },
577 ];
578
579 env.push(EnvVar {
580 name: "AWS_REGION".to_string(),
581 value: Some(config.region.clone()),
582 ..Default::default()
583 });
584
585 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
586
587 let mut args = vec![];
588
589 if let Some(helm_chart_version) = &config.helm_chart_version {
590 args.push(format!("--helm-chart-version={helm_chart_version}"));
591 }
592
593 args.push(format!(
595 "--environment-id={}",
596 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
597 ));
598
599 args.push(format!(
601 "--clusterd-image={}",
602 matching_image_from_environmentd_image_ref(
603 &mz.spec.environmentd_image_ref,
604 "clusterd",
605 None
606 )
607 ));
608
609 args.extend(
611 [
612 config
613 .environmentd_cluster_replica_sizes
614 .as_ref()
615 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
616 config
617 .bootstrap_default_cluster_replica_size
618 .as_ref()
619 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
620 config
621 .bootstrap_builtin_system_cluster_replica_size
622 .as_ref()
623 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
624 config
625 .bootstrap_builtin_probe_cluster_replica_size
626 .as_ref()
627 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
628 config
629 .bootstrap_builtin_support_cluster_replica_size
630 .as_ref()
631 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
632 config
633 .bootstrap_builtin_catalog_server_cluster_replica_size
634 .as_ref()
635 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
636 config
637 .bootstrap_builtin_analytics_cluster_replica_size
638 .as_ref()
639 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
640 config
641 .bootstrap_builtin_system_cluster_replication_factor
642 .as_ref()
643 .map(|replication_factor| {
644 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
645 }),
646 config
647 .bootstrap_builtin_probe_cluster_replication_factor
648 .as_ref()
649 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
650 config
651 .bootstrap_builtin_support_cluster_replication_factor
652 .as_ref()
653 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
654 config
655 .bootstrap_builtin_analytics_cluster_replication_factor
656 .as_ref()
657 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
658 ]
659 .into_iter()
660 .flatten(),
661 );
662
663 args.extend(
664 config
665 .environmentd_allowed_origins
666 .iter()
667 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
668 );
669
670 args.push(format!(
671 "--secrets-controller={}",
672 config.secrets_controller
673 ));
674
675 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
676 if let Ok(cluster_replica_sizes) =
677 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
678 {
679 let cluster_replica_sizes: Vec<_> =
680 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
681 args.push(format!(
682 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
683 cluster_replica_sizes.join("', '")
684 ));
685 }
686 }
687 if !config.cloud_provider.is_cloud() {
688 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
689 }
690
691 if config.enable_internal_statement_logging {
692 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
693 }
694
695 if config.disable_statement_logging {
696 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
697 }
698
699 if !mz.spec.enable_rbac {
700 args.push("--system-parameter-default=enable_rbac_checks=false".into());
701 }
702
703 args.push("--persist-isolated-runtime-threads=-1".to_string());
707
708 if config.cloud_provider == CloudProvider::Aws {
710 if let Some(azs) = config.environmentd_availability_zones.as_ref() {
711 for az in azs {
712 args.push(format!("--availability-zone={az}"));
713 }
714 }
715
716 if let Some(environmentd_connection_role_arn) = mz
717 .spec
718 .environmentd_connection_role_arn
719 .as_deref()
720 .or(config.environmentd_connection_role_arn.as_deref())
721 {
722 args.push(format!(
723 "--aws-connection-role-arn={}",
724 environmentd_connection_role_arn
725 ));
726 }
727 if let Some(account_id) = &config.aws_account_id {
728 args.push(format!("--aws-account-id={account_id}"));
729 }
730
731 args.extend([format!(
732 "--aws-secrets-controller-tags=Environment={}",
733 mz.name_unchecked()
734 )]);
735 args.extend_from_slice(&config.aws_secrets_controller_tags);
736 }
737
738 args.extend([
740 "--orchestrator=kubernetes".into(),
741 format!(
742 "--orchestrator-kubernetes-service-account={}",
743 &mz.service_account_name()
744 ),
745 format!(
746 "--orchestrator-kubernetes-image-pull-policy={}",
747 config.image_pull_policy.as_kebab_case_str(),
748 ),
749 ]);
750 for selector in &config.clusterd_node_selector {
751 args.push(format!(
752 "--orchestrator-kubernetes-service-node-selector={}={}",
753 selector.key, selector.value,
754 ));
755 }
756 if mz.meets_minimum_version(&V144) {
757 if let Some(affinity) = &config.clusterd_affinity {
758 let affinity = serde_json::to_string(affinity).unwrap();
759 args.push(format!(
760 "--orchestrator-kubernetes-service-affinity={affinity}"
761 ))
762 }
763 if let Some(tolerations) = &config.clusterd_tolerations {
764 let tolerations = serde_json::to_string(tolerations).unwrap();
765 args.push(format!(
766 "--orchestrator-kubernetes-service-tolerations={tolerations}"
767 ))
768 }
769 }
770 if let Some(scheduler_name) = &config.scheduler_name {
771 args.push(format!(
772 "--orchestrator-kubernetes-scheduler-name={}",
773 scheduler_name
774 ));
775 }
776 if mz.meets_minimum_version(&V154_DEV0) {
777 args.extend(
778 mz.spec
779 .pod_annotations
780 .as_ref()
781 .map(|annotations| annotations.iter())
782 .unwrap_or_default()
783 .map(|(key, val)| {
784 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
785 }),
786 );
787 }
788 args.extend(
789 mz.default_labels()
790 .iter()
791 .chain(
792 mz.spec
793 .pod_labels
794 .as_ref()
795 .map(|labels| labels.iter())
796 .unwrap_or_default(),
797 )
798 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
799 );
800 if let Some(status) = &mz.status {
801 args.push(format!(
802 "--orchestrator-kubernetes-name-prefix=mz{}-",
803 status.resource_id
804 ));
805 }
806
807 args.extend(["--log-format=json".into()]);
809 if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
810 args.push(format!("--opentelemetry-endpoint={}", endpoint));
811 }
812 args.extend([
814 format!(
815 "--opentelemetry-resource=organization_id={}",
816 mz.spec.environment_id
817 ),
818 format!(
819 "--opentelemetry-resource=environment_name={}",
820 mz.name_unchecked()
821 ),
822 ]);
823
824 if let Some(segment_api_key) = &config.segment_api_key {
825 args.push(format!("--segment-api-key={}", segment_api_key));
826 if config.segment_client_side {
827 args.push("--segment-client-side".into());
828 }
829 }
830
831 let mut volumes = Vec::new();
832 let mut volume_mounts = Vec::new();
833 if issuer_ref_defined(
834 &config.default_certificate_specs.internal,
835 &mz.spec.internal_certificate_spec,
836 ) {
837 volumes.push(Volume {
838 name: "certificate".to_owned(),
839 secret: Some(SecretVolumeSource {
840 default_mode: Some(0o400),
841 secret_name: Some(mz.environmentd_certificate_secret_name()),
842 items: None,
843 optional: Some(false),
844 }),
845 ..Default::default()
846 });
847 volume_mounts.push(VolumeMount {
848 name: "certificate".to_owned(),
849 mount_path: "/etc/materialized".to_owned(),
850 read_only: Some(true),
851 ..Default::default()
852 });
853 args.extend([
854 "--tls-mode=require".into(),
855 "--tls-cert=/etc/materialized/tls.crt".into(),
856 "--tls-key=/etc/materialized/tls.key".into(),
857 ]);
858 } else {
859 args.push("--tls-mode=disable".to_string());
860 }
861 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
862 args.push(format!(
863 "--orchestrator-kubernetes-ephemeral-volume-class={}",
864 ephemeral_volume_class
865 ));
866 }
867 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
869
870 if mz.meets_minimum_version(&V26_1_0) {
874 if let Some(ref name) = mz.spec.system_parameter_configmap_name {
875 volumes.push(Volume {
876 name: "system-params".to_string(),
877 config_map: Some(ConfigMapVolumeSource {
878 default_mode: Some(0o400),
879 name: name.to_owned(),
880 items: None,
881 optional: Some(true),
882 }),
883 ..Default::default()
884 });
885 volume_mounts.push(VolumeMount {
886 name: "system-params".to_string(),
887 mount_path: "/system-params".to_owned(),
889 read_only: Some(true),
890 ..Default::default()
891 });
892 args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
893 args.push("--config-sync-loop-interval=1s".to_string());
894 }
895 }
896
897 if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
899 args.push(format!("--sentry-dsn={}", sentry_dsn));
900 if let Some(sentry_environment) = &config.tracing.sentry_environment {
901 args.push(format!("--sentry-environment={}", sentry_environment));
902 }
903 args.push(format!("--sentry-tag=region={}", config.region));
904 }
905
906 args.push(format!(
908 "--persist-pubsub-url=http://{}:{}",
909 mz.persist_pubsub_service_name(generation),
910 config.environmentd_internal_persist_pubsub_port,
911 ));
912 args.push(format!(
913 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
914 config.environmentd_internal_persist_pubsub_port
915 ));
916
917 args.push(format!("--deploy-generation={}", generation));
918
919 args.push(format!(
921 "--internal-console-redirect-url={}",
922 &config.internal_console_proxy_url,
923 ));
924
925 if !config.collect_pod_metrics {
926 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
927 }
928 if config.enable_prometheus_scrape_annotations {
929 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
930 }
931
932 if config.disable_license_key_checks {
935 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
936 args.push("--disable-license-key-checks".into());
937 }
938 }
939
940 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
943 || mz.meets_minimum_version(&V153)
944 {
945 volume_mounts.push(VolumeMount {
946 name: "license-key".to_string(),
947 mount_path: "/license_key".to_string(),
948 ..Default::default()
949 });
950 volumes.push(Volume {
951 name: "license-key".to_string(),
952 secret: Some(SecretVolumeSource {
953 default_mode: Some(256),
954 optional: Some(false),
955 secret_name: Some(mz.backend_secret_name()),
956 items: Some(vec![KeyToPath {
957 key: "license_key".to_string(),
958 path: "license_key".to_string(),
959 ..Default::default()
960 }]),
961 ..Default::default()
962 }),
963 ..Default::default()
964 });
965 env.push(EnvVar {
966 name: "MZ_LICENSE_KEY".to_string(),
967 value: Some("/license_key/license_key".to_string()),
968 ..Default::default()
969 });
970 }
971
972 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
974 args.extend(extra_args.iter().cloned());
975 }
976
977 let probe = Probe {
978 initial_delay_seconds: Some(1),
979 failure_threshold: Some(12),
980 tcp_socket: Some(TCPSocketAction {
981 host: None,
982 port: IntOrString::Int(config.environmentd_sql_port.into()),
983 }),
984 ..Default::default()
985 };
986
987 let security_context = if config.enable_security_context {
988 Some(SecurityContext {
992 run_as_non_root: Some(true),
993 capabilities: Some(Capabilities {
994 drop: Some(vec!["ALL".to_string()]),
995 ..Default::default()
996 }),
997 seccomp_profile: Some(SeccompProfile {
998 type_: "RuntimeDefault".to_string(),
999 ..Default::default()
1000 }),
1001 allow_privilege_escalation: Some(false),
1002 ..Default::default()
1003 })
1004 } else {
1005 None
1006 };
1007
1008 let ports = vec![
1009 ContainerPort {
1010 container_port: config.environmentd_sql_port.into(),
1011 name: Some("sql".to_owned()),
1012 ..Default::default()
1013 },
1014 ContainerPort {
1015 container_port: config.environmentd_internal_sql_port.into(),
1016 name: Some("internal-sql".to_owned()),
1017 ..Default::default()
1018 },
1019 ContainerPort {
1020 container_port: config.environmentd_http_port.into(),
1021 name: Some("http".to_owned()),
1022 ..Default::default()
1023 },
1024 ContainerPort {
1025 container_port: config.environmentd_internal_http_port.into(),
1026 name: Some("internal-http".to_owned()),
1027 ..Default::default()
1028 },
1029 ContainerPort {
1030 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1031 name: Some("persist-pubsub".to_owned()),
1032 ..Default::default()
1033 },
1034 ];
1035
1036 if mz.meets_minimum_version(&V147_DEV0) {
1038 volume_mounts.push(VolumeMount {
1039 name: "listeners-configmap".to_string(),
1040 mount_path: "/listeners".to_string(),
1041 ..Default::default()
1042 });
1043 volumes.push(Volume {
1044 name: "listeners-configmap".to_string(),
1045 config_map: Some(ConfigMapVolumeSource {
1046 name: mz.listeners_configmap_name(generation),
1047 default_mode: Some(256),
1048 optional: Some(false),
1049 items: Some(vec![KeyToPath {
1050 key: "listeners.json".to_string(),
1051 path: "listeners.json".to_string(),
1052 ..Default::default()
1053 }]),
1054 }),
1055 ..Default::default()
1056 });
1057 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1058 if matches!(
1059 mz.spec.authenticator_kind,
1060 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1061 ) {
1062 args.push("--system-parameter-default=enable_password_auth=true".into());
1063 env.push(EnvVar {
1064 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1065 value_from: Some(EnvVarSource {
1066 secret_key_ref: Some(SecretKeySelector {
1067 name: mz.backend_secret_name(),
1068 key: "external_login_password_mz_system".to_string(),
1069 optional: Some(false),
1070 }),
1071 ..Default::default()
1072 }),
1073 ..Default::default()
1074 })
1075 }
1076 } else {
1077 args.extend([
1078 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1079 format!(
1080 "--http-listen-addr=0.0.0.0:{}",
1081 config.environmentd_http_port
1082 ),
1083 format!(
1084 "--internal-sql-listen-addr=0.0.0.0:{}",
1085 config.environmentd_internal_sql_port
1086 ),
1087 format!(
1088 "--internal-http-listen-addr=0.0.0.0:{}",
1089 config.environmentd_internal_http_port
1090 ),
1091 ]);
1092 }
1093
1094 let container = Container {
1095 name: "environmentd".to_owned(),
1096 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1097 image_pull_policy: Some(config.image_pull_policy.to_string()),
1098 ports: Some(ports),
1099 args: Some(args),
1100 env: Some(env),
1101 volume_mounts: Some(volume_mounts),
1102 liveness_probe: Some(probe.clone()),
1103 readiness_probe: Some(probe),
1104 resources: mz
1105 .spec
1106 .environmentd_resource_requirements
1107 .clone()
1108 .or_else(|| config.environmentd_default_resources.clone()),
1109 security_context: security_context.clone(),
1110 ..Default::default()
1111 };
1112
1113 let mut pod_template_labels = mz.default_labels();
1114 pod_template_labels.insert(
1115 "materialize.cloud/name".to_owned(),
1116 mz.environmentd_statefulset_name(generation),
1117 );
1118 pod_template_labels.insert(
1119 "materialize.cloud/app".to_owned(),
1120 mz.environmentd_app_name(),
1121 );
1122 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1123 pod_template_labels.extend(
1124 mz.spec
1125 .pod_labels
1126 .as_ref()
1127 .map(|labels| labels.iter())
1128 .unwrap_or_default()
1129 .map(|(key, value)| (key.clone(), value.clone())),
1130 );
1131
1132 let mut pod_template_annotations = btreemap! {
1133 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1135
1136 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1138 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1139 "materialize.cloud/generation".to_owned() => generation.to_string(),
1140 };
1141 if config.enable_prometheus_scrape_annotations {
1142 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1143 pod_template_annotations.insert(
1144 "prometheus.io/port".to_owned(),
1145 config.environmentd_internal_http_port.to_string(),
1146 );
1147 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1148 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1149 pod_template_annotations.insert(
1150 "materialize.prometheus.io/mz_usage_path".to_owned(),
1151 "/metrics/mz_usage".to_string(),
1152 );
1153 pod_template_annotations.insert(
1154 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1155 "/metrics/mz_frontier".to_string(),
1156 );
1157 pod_template_annotations.insert(
1158 "materialize.prometheus.io/mz_compute_path".to_owned(),
1159 "/metrics/mz_compute".to_string(),
1160 );
1161 pod_template_annotations.insert(
1162 "materialize.prometheus.io/mz_storage_path".to_owned(),
1163 "/metrics/mz_storage".to_string(),
1164 );
1165 }
1166 pod_template_annotations.extend(
1167 mz.spec
1168 .pod_annotations
1169 .as_ref()
1170 .map(|annotations| annotations.iter())
1171 .unwrap_or_default()
1172 .map(|(key, value)| (key.clone(), value.clone())),
1173 );
1174
1175 let mut tolerations = vec![
1176 Toleration {
1180 effect: Some("NoExecute".into()),
1181 key: Some("node.kubernetes.io/not-ready".into()),
1182 operator: Some("Exists".into()),
1183 toleration_seconds: Some(30),
1184 value: None,
1185 },
1186 Toleration {
1187 effect: Some("NoExecute".into()),
1188 key: Some("node.kubernetes.io/unreachable".into()),
1189 operator: Some("Exists".into()),
1190 toleration_seconds: Some(30),
1191 value: None,
1192 },
1193 ];
1194 if let Some(user_tolerations) = &config.environmentd_tolerations {
1195 tolerations.extend(user_tolerations.iter().cloned());
1196 }
1197 let tolerations = Some(tolerations);
1198
1199 let pod_template_spec = PodTemplateSpec {
1200 metadata: Some(ObjectMeta {
1203 labels: Some(pod_template_labels),
1204 annotations: Some(pod_template_annotations), ..Default::default()
1206 }),
1207 spec: Some(PodSpec {
1208 containers: vec![container],
1209 node_selector: Some(
1210 config
1211 .environmentd_node_selector
1212 .iter()
1213 .map(|selector| (selector.key.clone(), selector.value.clone()))
1214 .collect(),
1215 ),
1216 affinity: config.environmentd_affinity.clone(),
1217 scheduler_name: config.scheduler_name.clone(),
1218 service_account_name: Some(mz.service_account_name()),
1219 volumes: Some(volumes),
1220 security_context: Some(PodSecurityContext {
1221 fs_group: Some(999),
1222 run_as_user: Some(999),
1223 run_as_group: Some(999),
1224 ..Default::default()
1225 }),
1226 tolerations,
1227 termination_grace_period_seconds: Some(0),
1248 ..Default::default()
1249 }),
1250 };
1251
1252 let mut match_labels = BTreeMap::new();
1253 match_labels.insert(
1254 "materialize.cloud/name".to_owned(),
1255 mz.environmentd_statefulset_name(generation),
1256 );
1257
1258 let statefulset_spec = StatefulSetSpec {
1259 replicas: Some(1),
1260 template: pod_template_spec,
1261 service_name: Some(mz.environmentd_service_name()),
1262 selector: LabelSelector {
1263 match_expressions: None,
1264 match_labels: Some(match_labels),
1265 },
1266 ..Default::default()
1267 };
1268
1269 StatefulSet {
1270 metadata: ObjectMeta {
1271 annotations: Some(btreemap! {
1272 "materialize.cloud/generation".to_owned() => generation.to_string(),
1273 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1274 }),
1275 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1276 },
1277 spec: Some(statefulset_spec),
1278 status: None,
1279 }
1280}
1281
1282fn create_connection_info(
1283 config: &super::Config,
1284 mz: &Materialize,
1285 generation: u64,
1286) -> ConnectionInfo {
1287 let external_enable_tls = issuer_ref_defined(
1288 &config.default_certificate_specs.internal,
1289 &mz.spec.internal_certificate_spec,
1290 );
1291 let authenticator_kind = mz.spec.authenticator_kind;
1292
1293 let mut listeners_config = ListenersConfig {
1294 sql: btreemap! {
1295 "external".to_owned() => SqlListenerConfig{
1296 addr: SocketAddr::new(
1297 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1298 config.environmentd_sql_port,
1299 ),
1300 authenticator_kind,
1301 allowed_roles: AllowedRoles::Normal,
1302 enable_tls: external_enable_tls,
1303 },
1304 "internal".to_owned() => SqlListenerConfig{
1305 addr: SocketAddr::new(
1306 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1307 config.environmentd_internal_sql_port,
1308 ),
1309 authenticator_kind: AuthenticatorKind::None,
1310 allowed_roles: AllowedRoles::NormalAndInternal,
1312 enable_tls: false,
1313 },
1314 },
1315 http: btreemap! {
1316 "external".to_owned() => HttpListenerConfig{
1317 base: BaseListenerConfig {
1318 addr: SocketAddr::new(
1319 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1320 config.environmentd_http_port,
1321 ),
1322 authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1326 AuthenticatorKind::Password
1327 } else {
1328 authenticator_kind
1329 },
1330 allowed_roles: AllowedRoles::Normal,
1331 enable_tls: external_enable_tls,
1332 },
1333 routes: HttpRoutesEnabled{
1334 base: true,
1335 webhook: true,
1336 internal: false,
1337 metrics: false,
1338 profiling: false,
1339 }
1340 },
1341 "internal".to_owned() => HttpListenerConfig{
1342 base: BaseListenerConfig {
1343 addr: SocketAddr::new(
1344 IpAddr::V4(Ipv4Addr::new(0,0,0,0)),
1345 config.environmentd_internal_http_port,
1346 ),
1347 authenticator_kind: AuthenticatorKind::None,
1348 allowed_roles: AllowedRoles::NormalAndInternal,
1350 enable_tls: false,
1351 },
1352 routes: HttpRoutesEnabled{
1353 base: true,
1354 webhook: true,
1355 internal: true,
1356 metrics: true,
1357 profiling: true,
1358 }
1359 },
1360 },
1361 };
1362
1363 if matches!(
1364 authenticator_kind,
1365 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1366 ) {
1367 listeners_config.sql.remove("internal");
1368 listeners_config.http.remove("internal");
1369
1370 listeners_config.sql.get_mut("external").map(|listener| {
1371 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1372 listener
1373 });
1374 listeners_config.http.get_mut("external").map(|listener| {
1375 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1376 listener.routes.internal = true;
1377 listener.routes.profiling = true;
1378 listener
1379 });
1380
1381 listeners_config.http.insert(
1382 "metrics".to_owned(),
1383 HttpListenerConfig {
1384 base: BaseListenerConfig {
1385 addr: SocketAddr::new(
1386 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1387 config.environmentd_internal_http_port,
1388 ),
1389 authenticator_kind: AuthenticatorKind::None,
1390 allowed_roles: AllowedRoles::NormalAndInternal,
1391 enable_tls: false,
1392 },
1393 routes: HttpRoutesEnabled {
1394 base: false,
1395 webhook: false,
1396 internal: false,
1397 metrics: true,
1398 profiling: false,
1399 },
1400 },
1401 );
1402 }
1403
1404 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1405 let listeners_configmap = ConfigMap {
1406 binary_data: None,
1407 data: Some(btreemap! {
1408 "listeners.json".to_owned() => listeners_json,
1409 }),
1410 immutable: None,
1411 metadata: ObjectMeta {
1412 annotations: Some(btreemap! {
1413 "materialize.cloud/generation".to_owned() => generation.to_string(),
1414 }),
1415 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1416 },
1417 };
1418
1419 let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1420 AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1421 let scheme = if external_enable_tls { "https" } else { "http" };
1422 (
1423 scheme,
1424 config.environmentd_http_port,
1425 Some(mz.spec.backend_secret_name.clone()),
1426 )
1427 }
1428 _ => ("http", config.environmentd_internal_http_port, None),
1429 };
1430 let environmentd_url = format!(
1431 "{}://{}.{}.svc.cluster.local:{}",
1432 scheme,
1433 mz.environmentd_generation_service_name(generation),
1434 mz.namespace(),
1435 leader_api_port,
1436 );
1437 ConnectionInfo {
1438 environmentd_url,
1439 listeners_configmap,
1440 mz_system_secret_name,
1441 }
1442}
1443
1444#[derive(Debug, Deserialize, PartialEq, Eq)]
1446struct BecomeLeaderResponse {
1447 result: BecomeLeaderResult,
1448}
1449
1450#[derive(Debug, Deserialize, PartialEq, Eq)]
1451enum BecomeLeaderResult {
1452 Success,
1453 Failure { message: String },
1454}
1455
1456#[derive(Debug, Deserialize, PartialEq, Eq)]
1457struct SkipCatchupError {
1458 message: String,
1459}