1use std::{
11 collections::BTreeMap,
12 net::{IpAddr, Ipv4Addr, SocketAddr},
13 sync::LazyLock,
14 time::Duration,
15};
16
17use k8s_openapi::{
18 api::{
19 apps::v1::{StatefulSet, StatefulSetSpec},
20 core::v1::{
21 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
22 EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
23 SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext,
24 Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount,
25 },
26 },
27 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
28};
29use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
30use maplit::btreemap;
31use mz_server_core::listeners::{
32 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
33 ListenersConfig, SqlListenerConfig,
34};
35use reqwest::{Client as HttpClient, StatusCode};
36use semver::{BuildMetadata, Prerelease, Version};
37use serde::{Deserialize, Serialize};
38use sha2::{Digest, Sha256};
39use tracing::{error, trace};
40
41use super::Error;
42use super::matching_image_from_environmentd_image_ref;
43use crate::k8s::{apply_resource, delete_resource, get_resource};
44use crate::tls::issuer_ref_defined;
45use mz_cloud_provider::CloudProvider;
46use mz_cloud_resources::crd::ManagedResource;
47use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
48use mz_ore::instrument;
49
50static V140_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
51 major: 0,
52 minor: 140,
53 patch: 0,
54 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
55 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
56});
57const V143: Version = Version::new(0, 143, 0);
58const V144: Version = Version::new(0, 144, 0);
59static V147_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
60 major: 0,
61 minor: 147,
62 patch: 0,
63 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
64 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
65});
66const V153: Version = Version::new(0, 153, 0);
67static V154_DEV0: LazyLock<Version> = LazyLock::new(|| Version {
68 major: 0,
69 minor: 154,
70 patch: 0,
71 pre: Prerelease::new("").expect("dev.0 is valid prerelease"),
72 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
73});
74pub const V161: Version = Version::new(0, 161, 0);
75
76static V26_1_0: LazyLock<Version> = LazyLock::new(|| Version {
77 major: 26,
78 minor: 1,
79 patch: 0,
80 pre: Prerelease::new("dev.0").expect("dev.0 is valid prerelease"),
81 build: BuildMetadata::new("").expect("empty string is valid buildmetadata"),
82});
83
84#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
89pub enum DeploymentStatus {
90 Initializing,
93 ReadyToPromote,
95 Promoting,
97 IsLeader,
99}
100
101#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
102pub struct GetLeaderStatusResponse {
103 status: DeploymentStatus,
104}
105
106#[derive(Deserialize, Serialize)]
107pub struct LoginCredentials {
108 username: String,
109 password: String,
111}
112
113#[derive(Debug, Serialize)]
114pub struct ConnectionInfo {
115 pub environmentd_url: String,
116 pub mz_system_secret_name: Option<String>,
117 pub listeners_configmap: ConfigMap,
118}
119
120#[derive(Debug, Serialize)]
121pub struct Resources {
122 pub generation: u64,
123 pub public_service: Box<Service>,
124 pub generation_service: Box<Service>,
125 pub persist_pubsub_service: Box<Service>,
126 pub environmentd_statefulset: Box<StatefulSet>,
127 pub connection_info: Box<ConnectionInfo>,
128}
129
130impl Resources {
131 pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self {
132 let public_service = Box::new(create_public_service_object(config, mz, generation));
133 let generation_service = Box::new(create_generation_service_object(config, mz, generation));
134 let persist_pubsub_service =
135 Box::new(create_persist_pubsub_service(config, mz, generation));
136 let environmentd_statefulset = Box::new(create_environmentd_statefulset_object(
137 config, mz, generation,
138 ));
139 let connection_info = Box::new(create_connection_info(config, mz, generation));
140
141 Self {
142 generation,
143 public_service,
144 generation_service,
145 persist_pubsub_service,
146 environmentd_statefulset,
147 connection_info,
148 }
149 }
150
151 #[instrument]
152 pub async fn apply(
153 &self,
154 client: &Client,
155 force_promote: bool,
156 namespace: &str,
157 ) -> Result<Option<Action>, Error> {
158 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
159 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
160 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
161
162 trace!("applying environmentd per-generation service");
163 apply_resource(&service_api, &*self.generation_service).await?;
164
165 trace!("creating persist pubsub service");
166 apply_resource(&service_api, &*self.persist_pubsub_service).await?;
167
168 trace!("applying listeners configmap");
169 apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?;
170
171 trace!("creating new environmentd statefulset");
172 apply_resource(&statefulset_api, &*self.environmentd_statefulset).await?;
173
174 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
175
176 let statefulset = get_resource(
177 &statefulset_api,
178 &self.environmentd_statefulset.name_unchecked(),
179 )
180 .await?;
181 if statefulset
182 .and_then(|statefulset| statefulset.status)
183 .and_then(|status| status.ready_replicas)
184 .unwrap_or(0)
185 == 0
186 {
187 trace!("environmentd statefulset is not ready yet...");
188 return Ok(Some(retry_action));
189 }
190
191 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
192 return Ok(Some(retry_action));
193 };
194 let status_url = reqwest::Url::parse(&format!(
195 "{}/api/leader/status",
196 self.connection_info.environmentd_url,
197 ))
198 .unwrap();
199
200 match http_client.get(status_url.clone()).send().await {
201 Ok(response) => {
202 let response: GetLeaderStatusResponse = match response.error_for_status() {
203 Ok(response) => response.json().await?,
204 Err(e) => {
205 trace!("failed to get status of environmentd, retrying... ({e})");
206 return Ok(Some(retry_action));
207 }
208 };
209 if force_promote {
210 trace!("skipping cluster catchup");
211 let skip_catchup_url = reqwest::Url::parse(&format!(
212 "{}/api/leader/skip-catchup",
213 self.connection_info.environmentd_url,
214 ))
215 .unwrap();
216 let response = http_client.post(skip_catchup_url).send().await?;
217 if response.status() == StatusCode::BAD_REQUEST {
218 let err: SkipCatchupError = response.json().await?;
219 return Err(
220 anyhow::anyhow!("failed to skip catchup: {}", err.message).into()
221 );
222 }
223 } else {
224 match response.status {
225 DeploymentStatus::Initializing => {
226 trace!("environmentd is still initializing, retrying...");
227 return Ok(Some(retry_action));
228 }
229 DeploymentStatus::ReadyToPromote
230 | DeploymentStatus::Promoting
231 | DeploymentStatus::IsLeader => trace!("environmentd is ready"),
232 }
233 }
234 }
235 Err(e) => {
236 trace!("failed to connect to environmentd, retrying... ({e})");
237 return Ok(Some(retry_action));
238 }
239 }
240
241 Ok(None)
242 }
243
244 #[instrument]
245 async fn get_http_client(&self, client: Client, namespace: &str) -> Option<HttpClient> {
246 let secret_api: Api<Secret> = Api::namespaced(client.clone(), namespace);
247 Some(match &self.connection_info.mz_system_secret_name {
248 Some(mz_system_secret_name) => {
249 let http_client = reqwest::Client::builder()
250 .timeout(std::time::Duration::from_secs(10))
251 .cookie_store(true)
252 .danger_accept_invalid_certs(true)
254 .build()
255 .unwrap();
256
257 let secret = secret_api
258 .get(mz_system_secret_name)
259 .await
260 .map_err(|e| {
261 error!("Failed to get backend secret: {:?}", e);
262 e
263 })
264 .ok()?;
265 if let Some(data) = secret.data {
266 if let Some(password) = data.get("external_login_password_mz_system").cloned() {
267 let password = String::from_utf8_lossy(&password.0).to_string();
268 let login_url = reqwest::Url::parse(&format!(
269 "{}/api/login",
270 self.connection_info.environmentd_url,
271 ))
272 .unwrap();
273 match http_client
274 .post(login_url)
275 .body(
276 serde_json::to_string(&LoginCredentials {
277 username: "mz_system".to_owned(),
278 password,
279 })
280 .expect(
281 "Serializing a simple struct with utf8 strings doesn't fail.",
282 ),
283 )
284 .header("Content-Type", "application/json")
285 .send()
286 .await
287 {
288 Ok(response) => {
289 if let Err(e) = response.error_for_status() {
290 trace!("failed to login to environmentd, retrying... ({e})");
291 return None;
292 }
293 }
294 Err(e) => {
295 trace!("failed to connect to environmentd, retrying... ({e})");
296 return None;
297 }
298 };
299 }
300 };
301 http_client
302 }
303 None => reqwest::Client::builder()
304 .timeout(std::time::Duration::from_secs(10))
305 .build()
306 .unwrap(),
307 })
308 }
309
310 #[instrument]
311 pub async fn promote_services(
312 &self,
313 client: &Client,
314 namespace: &str,
315 ) -> Result<Option<Action>, Error> {
316 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
317 let retry_action = Action::requeue(Duration::from_secs(rand::random_range(5..10)));
318
319 let promote_url = reqwest::Url::parse(&format!(
320 "{}/api/leader/promote",
321 self.connection_info.environmentd_url,
322 ))
323 .unwrap();
324
325 let Some(http_client) = self.get_http_client(client.clone(), namespace).await else {
326 return Ok(Some(retry_action));
327 };
328
329 trace!("promoting new environmentd to leader");
330 let response = http_client.post(promote_url).send().await?;
331 let response: BecomeLeaderResponse = response.error_for_status()?.json().await?;
332 if let BecomeLeaderResult::Failure { message } = response.result {
333 return Err(Error::Anyhow(anyhow::anyhow!(
334 "failed to promote new environmentd: {message}"
335 )));
336 }
337
338 let status_url = reqwest::Url::parse(&format!(
350 "{}/api/leader/status",
351 self.connection_info.environmentd_url,
352 ))
353 .unwrap();
354 match http_client.get(status_url.clone()).send().await {
355 Ok(response) => {
356 let response: GetLeaderStatusResponse = response.json().await?;
357 if response.status != DeploymentStatus::IsLeader {
358 trace!(
359 "environmentd is still promoting (status: {:?}), retrying...",
360 response.status
361 );
362 return Ok(Some(retry_action));
363 } else {
364 trace!("environmentd is ready");
365 }
366 }
367 Err(e) => {
368 trace!("failed to connect to environmentd, retrying... ({e})");
369 return Ok(Some(retry_action));
370 }
371 }
372
373 trace!("applying environmentd public service");
374 apply_resource(&service_api, &*self.public_service).await?;
375
376 Ok(None)
377 }
378
379 #[instrument]
380 pub async fn teardown_generation(
381 &self,
382 client: &Client,
383 mz: &Materialize,
384 generation: u64,
385 ) -> Result<(), anyhow::Error> {
386 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &mz.namespace());
387 let service_api: Api<Service> = Api::namespaced(client.clone(), &mz.namespace());
388 let statefulset_api: Api<StatefulSet> = Api::namespaced(client.clone(), &mz.namespace());
389
390 trace!("deleting environmentd statefulset for generation {generation}");
391 delete_resource(
392 &statefulset_api,
393 &mz.environmentd_statefulset_name(generation),
394 )
395 .await?;
396
397 trace!("deleting persist pubsub service for generation {generation}");
398 delete_resource(&service_api, &mz.persist_pubsub_service_name(generation)).await?;
399
400 trace!("deleting environmentd per-generation service for generation {generation}");
401 delete_resource(
402 &service_api,
403 &mz.environmentd_generation_service_name(generation),
404 )
405 .await?;
406
407 trace!("deleting listeners configmap for generation {generation}");
408 delete_resource(&configmap_api, &mz.listeners_configmap_name(generation)).await?;
409
410 Ok(())
411 }
412
413 pub fn generate_hash(&self) -> String {
416 let mut hasher = Sha256::new();
417 hasher.update(&serde_json::to_string(self).unwrap());
418 format!("{:x}", hasher.finalize())
419 }
420}
421
422fn create_public_service_object(
423 config: &super::Config,
424 mz: &Materialize,
425 generation: u64,
426) -> Service {
427 create_base_service_object(config, mz, generation, &mz.environmentd_service_name())
428}
429
430fn create_generation_service_object(
431 config: &super::Config,
432 mz: &Materialize,
433 generation: u64,
434) -> Service {
435 create_base_service_object(
436 config,
437 mz,
438 generation,
439 &mz.environmentd_generation_service_name(generation),
440 )
441}
442
443fn create_base_service_object(
444 config: &super::Config,
445 mz: &Materialize,
446 generation: u64,
447 service_name: &str,
448) -> Service {
449 let ports = vec![
450 ServicePort {
451 port: config.environmentd_sql_port.into(),
452 protocol: Some("TCP".to_string()),
453 name: Some("sql".to_string()),
454 ..Default::default()
455 },
456 ServicePort {
457 port: config.environmentd_http_port.into(),
458 protocol: Some("TCP".to_string()),
459 name: Some("https".to_string()),
460 ..Default::default()
461 },
462 ServicePort {
463 port: config.environmentd_internal_sql_port.into(),
464 protocol: Some("TCP".to_string()),
465 name: Some("internal-sql".to_string()),
466 ..Default::default()
467 },
468 ServicePort {
469 port: config.environmentd_internal_http_port.into(),
470 protocol: Some("TCP".to_string()),
471 name: Some("internal-http".to_string()),
472 ..Default::default()
473 },
474 ];
475
476 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation)};
477
478 let spec = ServiceSpec {
479 type_: Some("ClusterIP".to_string()),
480 cluster_ip: Some("None".to_string()),
481 selector: Some(selector),
482 ports: Some(ports),
483 ..Default::default()
484 };
485
486 Service {
487 metadata: mz.managed_resource_meta(service_name.to_string()),
488 spec: Some(spec),
489 status: None,
490 }
491}
492
493fn create_persist_pubsub_service(
494 config: &super::Config,
495 mz: &Materialize,
496 generation: u64,
497) -> Service {
498 Service {
499 metadata: mz.managed_resource_meta(mz.persist_pubsub_service_name(generation)),
500 spec: Some(ServiceSpec {
501 type_: Some("ClusterIP".to_string()),
502 cluster_ip: Some("None".to_string()),
503 selector: Some(btreemap! {
504 "materialize.cloud/name".to_string() => mz.environmentd_statefulset_name(generation),
505 }),
506 ports: Some(vec![ServicePort {
507 name: Some("grpc".to_string()),
508 protocol: Some("TCP".to_string()),
509 port: config.environmentd_internal_persist_pubsub_port.into(),
510 ..Default::default()
511 }]),
512 ..Default::default()
513 }),
514 status: None,
515 }
516}
517
518fn create_environmentd_statefulset_object(
519 config: &super::Config,
520 mz: &Materialize,
521 generation: u64,
522) -> StatefulSet {
523 let mut env = vec![
533 EnvVar {
534 name: "MZ_METADATA_BACKEND_URL".to_string(),
535 value_from: Some(EnvVarSource {
536 secret_key_ref: Some(SecretKeySelector {
537 name: mz.backend_secret_name(),
538 key: "metadata_backend_url".to_string(),
539 optional: Some(false),
540 }),
541 ..Default::default()
542 }),
543 ..Default::default()
544 },
545 EnvVar {
546 name: "MZ_PERSIST_BLOB_URL".to_string(),
547 value_from: Some(EnvVarSource {
548 secret_key_ref: Some(SecretKeySelector {
549 name: mz.backend_secret_name(),
550 key: "persist_backend_url".to_string(),
551 optional: Some(false),
552 }),
553 ..Default::default()
554 }),
555 ..Default::default()
556 },
557 ];
558
559 env.push(EnvVar {
560 name: "AWS_REGION".to_string(),
561 value: Some(config.region.clone()),
562 ..Default::default()
563 });
564
565 env.extend(mz.spec.environmentd_extra_env.iter().flatten().cloned());
566
567 let mut args = vec![];
568
569 if let Some(helm_chart_version) = &config.helm_chart_version {
570 args.push(format!("--helm-chart-version={helm_chart_version}"));
571 }
572
573 args.push(format!(
575 "--environment-id={}",
576 mz.environment_id(&config.cloud_provider.to_string(), &config.region)
577 ));
578
579 args.push(format!(
581 "--clusterd-image={}",
582 matching_image_from_environmentd_image_ref(
583 &mz.spec.environmentd_image_ref,
584 "clusterd",
585 None
586 )
587 ));
588
589 args.extend(
591 [
592 config
593 .environmentd_cluster_replica_sizes
594 .as_ref()
595 .map(|sizes| format!("--cluster-replica-sizes={sizes}")),
596 config
597 .bootstrap_default_cluster_replica_size
598 .as_ref()
599 .map(|size| format!("--bootstrap-default-cluster-replica-size={size}")),
600 config
601 .bootstrap_builtin_system_cluster_replica_size
602 .as_ref()
603 .map(|size| format!("--bootstrap-builtin-system-cluster-replica-size={size}")),
604 config
605 .bootstrap_builtin_probe_cluster_replica_size
606 .as_ref()
607 .map(|size| format!("--bootstrap-builtin-probe-cluster-replica-size={size}")),
608 config
609 .bootstrap_builtin_support_cluster_replica_size
610 .as_ref()
611 .map(|size| format!("--bootstrap-builtin-support-cluster-replica-size={size}")),
612 config
613 .bootstrap_builtin_catalog_server_cluster_replica_size
614 .as_ref()
615 .map(|size| format!("--bootstrap-builtin-catalog-server-cluster-replica-size={size}")),
616 config
617 .bootstrap_builtin_analytics_cluster_replica_size
618 .as_ref()
619 .map(|size| format!("--bootstrap-builtin-analytics-cluster-replica-size={size}")),
620 config
621 .bootstrap_builtin_system_cluster_replication_factor
622 .as_ref()
623 .map(|replication_factor| {
624 format!("--bootstrap-builtin-system-cluster-replication-factor={replication_factor}")
625 }),
626 config
627 .bootstrap_builtin_probe_cluster_replication_factor
628 .as_ref()
629 .map(|replication_factor| format!("--bootstrap-builtin-probe-cluster-replication-factor={replication_factor}")),
630 config
631 .bootstrap_builtin_support_cluster_replication_factor
632 .as_ref()
633 .map(|replication_factor| format!("--bootstrap-builtin-support-cluster-replication-factor={replication_factor}")),
634 config
635 .bootstrap_builtin_analytics_cluster_replication_factor
636 .as_ref()
637 .map(|replication_factor| format!("--bootstrap-builtin-analytics-cluster-replication-factor={replication_factor}")),
638 ]
639 .into_iter()
640 .flatten(),
641 );
642
643 args.extend(
644 config
645 .environmentd_allowed_origins
646 .iter()
647 .map(|origin| format!("--cors-allowed-origin={}", origin.to_str().unwrap())),
648 );
649
650 args.push(format!(
651 "--secrets-controller={}",
652 config.secrets_controller
653 ));
654
655 if let Some(cluster_replica_sizes) = &config.environmentd_cluster_replica_sizes {
656 if let Ok(cluster_replica_sizes) =
657 serde_json::from_str::<BTreeMap<String, serde_json::Value>>(cluster_replica_sizes)
658 {
659 let cluster_replica_sizes: Vec<_> =
660 cluster_replica_sizes.keys().map(|s| s.as_str()).collect();
661 args.push(format!(
662 "--system-parameter-default=allowed_cluster_replica_sizes='{}'",
663 cluster_replica_sizes.join("', '")
664 ));
665 }
666 }
667 if !config.cloud_provider.is_cloud() {
668 args.push("--system-parameter-default=cluster_enable_topology_spread=false".into());
669 }
670
671 if config.enable_internal_statement_logging {
672 args.push("--system-parameter-default=enable_internal_statement_logging=true".into());
673 }
674
675 if config.disable_statement_logging {
676 args.push("--system-parameter-default=statement_logging_max_sample_rate=0".into());
677 }
678
679 if !mz.spec.enable_rbac {
680 args.push("--system-parameter-default=enable_rbac_checks=false".into());
681 }
682
683 args.push("--persist-isolated-runtime-threads=-1".to_string());
687
688 if config.cloud_provider == CloudProvider::Aws {
690 if let Some(azs) = config.environmentd_availability_zones.as_ref() {
691 for az in azs {
692 args.push(format!("--availability-zone={az}"));
693 }
694 }
695
696 if let Some(environmentd_connection_role_arn) = mz
697 .spec
698 .environmentd_connection_role_arn
699 .as_deref()
700 .or(config.environmentd_connection_role_arn.as_deref())
701 {
702 args.push(format!(
703 "--aws-connection-role-arn={}",
704 environmentd_connection_role_arn
705 ));
706 }
707 if let Some(account_id) = &config.aws_account_id {
708 args.push(format!("--aws-account-id={account_id}"));
709 }
710
711 args.extend([format!(
712 "--aws-secrets-controller-tags=Environment={}",
713 mz.name_unchecked()
714 )]);
715 args.extend_from_slice(&config.aws_secrets_controller_tags);
716 }
717
718 args.extend([
720 "--orchestrator=kubernetes".into(),
721 format!(
722 "--orchestrator-kubernetes-service-account={}",
723 &mz.service_account_name()
724 ),
725 format!(
726 "--orchestrator-kubernetes-image-pull-policy={}",
727 config.image_pull_policy.as_kebab_case_str(),
728 ),
729 ]);
730 for selector in &config.clusterd_node_selector {
731 args.push(format!(
732 "--orchestrator-kubernetes-service-node-selector={}={}",
733 selector.key, selector.value,
734 ));
735 }
736 if mz.meets_minimum_version(&V144) {
737 if let Some(affinity) = &config.clusterd_affinity {
738 let affinity = serde_json::to_string(affinity).unwrap();
739 args.push(format!(
740 "--orchestrator-kubernetes-service-affinity={affinity}"
741 ))
742 }
743 if let Some(tolerations) = &config.clusterd_tolerations {
744 let tolerations = serde_json::to_string(tolerations).unwrap();
745 args.push(format!(
746 "--orchestrator-kubernetes-service-tolerations={tolerations}"
747 ))
748 }
749 }
750 if let Some(scheduler_name) = &config.scheduler_name {
751 args.push(format!(
752 "--orchestrator-kubernetes-scheduler-name={}",
753 scheduler_name
754 ));
755 }
756 if mz.meets_minimum_version(&V154_DEV0) {
757 args.extend(
758 mz.spec
759 .pod_annotations
760 .as_ref()
761 .map(|annotations| annotations.iter())
762 .unwrap_or_default()
763 .map(|(key, val)| {
764 format!("--orchestrator-kubernetes-service-annotation={key}={val}")
765 }),
766 );
767 }
768 args.extend(
769 mz.default_labels()
770 .iter()
771 .chain(
772 mz.spec
773 .pod_labels
774 .as_ref()
775 .map(|labels| labels.iter())
776 .unwrap_or_default(),
777 )
778 .map(|(key, val)| format!("--orchestrator-kubernetes-service-label={key}={val}")),
779 );
780 if let Some(status) = &mz.status {
781 args.push(format!(
782 "--orchestrator-kubernetes-name-prefix=mz{}-",
783 status.resource_id
784 ));
785 }
786
787 args.extend(["--log-format=json".into()]);
789 if let Some(endpoint) = &config.tracing.opentelemetry_endpoint {
790 args.push(format!("--opentelemetry-endpoint={}", endpoint));
791 }
792 args.extend([
794 format!(
795 "--opentelemetry-resource=organization_id={}",
796 mz.spec.environment_id
797 ),
798 format!(
799 "--opentelemetry-resource=environment_name={}",
800 mz.name_unchecked()
801 ),
802 ]);
803
804 if let Some(segment_api_key) = &config.segment_api_key {
805 args.push(format!("--segment-api-key={}", segment_api_key));
806 if config.segment_client_side {
807 args.push("--segment-client-side".into());
808 }
809 }
810
811 let mut volumes = Vec::new();
812 let mut volume_mounts = Vec::new();
813 if issuer_ref_defined(
814 &config.default_certificate_specs.internal,
815 &mz.spec.internal_certificate_spec,
816 ) {
817 volumes.push(Volume {
818 name: "certificate".to_owned(),
819 secret: Some(SecretVolumeSource {
820 default_mode: Some(0o400),
821 secret_name: Some(mz.environmentd_certificate_secret_name()),
822 items: None,
823 optional: Some(false),
824 }),
825 ..Default::default()
826 });
827 volume_mounts.push(VolumeMount {
828 name: "certificate".to_owned(),
829 mount_path: "/etc/materialized".to_owned(),
830 read_only: Some(true),
831 ..Default::default()
832 });
833 args.extend([
834 "--tls-mode=require".into(),
835 "--tls-cert=/etc/materialized/tls.crt".into(),
836 "--tls-key=/etc/materialized/tls.key".into(),
837 ]);
838 } else {
839 args.push("--tls-mode=disable".to_string());
840 }
841 if let Some(ephemeral_volume_class) = &config.ephemeral_volume_class {
842 args.push(format!(
843 "--orchestrator-kubernetes-ephemeral-volume-class={}",
844 ephemeral_volume_class
845 ));
846 }
847 args.push("--orchestrator-kubernetes-service-fs-group=999".to_string());
849
850 if mz.meets_minimum_version(&V26_1_0) {
854 if let Some(ref name) = mz.spec.system_parameter_configmap_name {
855 volumes.push(Volume {
856 name: "system-params".to_string(),
857 config_map: Some(ConfigMapVolumeSource {
858 default_mode: Some(0o400),
859 name: name.to_owned(),
860 items: None,
861 optional: Some(true),
862 }),
863 ..Default::default()
864 });
865 volume_mounts.push(VolumeMount {
866 name: "system-params".to_string(),
867 mount_path: "/system-params".to_owned(),
869 read_only: Some(true),
870 ..Default::default()
871 });
872 args.push("--config-sync-file-path=/system-params/system-params.json".to_string());
873 args.push("--config-sync-loop-interval=1s".to_string());
874 }
875 }
876
877 if let Some(sentry_dsn) = &config.tracing.sentry_dsn {
879 args.push(format!("--sentry-dsn={}", sentry_dsn));
880 if let Some(sentry_environment) = &config.tracing.sentry_environment {
881 args.push(format!("--sentry-environment={}", sentry_environment));
882 }
883 args.push(format!("--sentry-tag=region={}", config.region));
884 }
885
886 args.push(format!(
888 "--persist-pubsub-url=http://{}:{}",
889 mz.persist_pubsub_service_name(generation),
890 config.environmentd_internal_persist_pubsub_port,
891 ));
892 args.push(format!(
893 "--internal-persist-pubsub-listen-addr=0.0.0.0:{}",
894 config.environmentd_internal_persist_pubsub_port
895 ));
896
897 args.push(format!("--deploy-generation={}", generation));
898
899 args.push(format!(
901 "--internal-console-redirect-url={}",
902 &config.internal_console_proxy_url,
903 ));
904
905 if !config.collect_pod_metrics {
906 args.push("--orchestrator-kubernetes-disable-pod-metrics-collection".into());
907 }
908 if config.enable_prometheus_scrape_annotations {
909 args.push("--orchestrator-kubernetes-enable-prometheus-scrape-annotations".into());
910 }
911
912 if config.disable_license_key_checks {
915 if mz.meets_minimum_version(&V143) && !mz.meets_minimum_version(&V153) {
916 args.push("--disable-license-key-checks".into());
917 }
918 }
919
920 if (mz.meets_minimum_version(&V140_DEV0) && !config.disable_license_key_checks)
923 || mz.meets_minimum_version(&V153)
924 {
925 volume_mounts.push(VolumeMount {
926 name: "license-key".to_string(),
927 mount_path: "/license_key".to_string(),
928 ..Default::default()
929 });
930 volumes.push(Volume {
931 name: "license-key".to_string(),
932 secret: Some(SecretVolumeSource {
933 default_mode: Some(256),
934 optional: Some(false),
935 secret_name: Some(mz.backend_secret_name()),
936 items: Some(vec![KeyToPath {
937 key: "license_key".to_string(),
938 path: "license_key".to_string(),
939 ..Default::default()
940 }]),
941 ..Default::default()
942 }),
943 ..Default::default()
944 });
945 env.push(EnvVar {
946 name: "MZ_LICENSE_KEY".to_string(),
947 value: Some("/license_key/license_key".to_string()),
948 ..Default::default()
949 });
950 }
951
952 if let Some(extra_args) = &mz.spec.environmentd_extra_args {
954 args.extend(extra_args.iter().cloned());
955 }
956
957 let probe = Probe {
958 initial_delay_seconds: Some(1),
959 failure_threshold: Some(12),
960 tcp_socket: Some(TCPSocketAction {
961 host: None,
962 port: IntOrString::Int(config.environmentd_sql_port.into()),
963 }),
964 ..Default::default()
965 };
966
967 let security_context = if config.enable_security_context {
968 Some(SecurityContext {
972 run_as_non_root: Some(true),
973 capabilities: Some(Capabilities {
974 drop: Some(vec!["ALL".to_string()]),
975 ..Default::default()
976 }),
977 seccomp_profile: Some(SeccompProfile {
978 type_: "RuntimeDefault".to_string(),
979 ..Default::default()
980 }),
981 allow_privilege_escalation: Some(false),
982 ..Default::default()
983 })
984 } else {
985 None
986 };
987
988 let ports = vec![
989 ContainerPort {
990 container_port: config.environmentd_sql_port.into(),
991 name: Some("sql".to_owned()),
992 ..Default::default()
993 },
994 ContainerPort {
995 container_port: config.environmentd_internal_sql_port.into(),
996 name: Some("internal-sql".to_owned()),
997 ..Default::default()
998 },
999 ContainerPort {
1000 container_port: config.environmentd_http_port.into(),
1001 name: Some("http".to_owned()),
1002 ..Default::default()
1003 },
1004 ContainerPort {
1005 container_port: config.environmentd_internal_http_port.into(),
1006 name: Some("internal-http".to_owned()),
1007 ..Default::default()
1008 },
1009 ContainerPort {
1010 container_port: config.environmentd_internal_persist_pubsub_port.into(),
1011 name: Some("persist-pubsub".to_owned()),
1012 ..Default::default()
1013 },
1014 ];
1015
1016 if mz.meets_minimum_version(&V147_DEV0) {
1018 volume_mounts.push(VolumeMount {
1019 name: "listeners-configmap".to_string(),
1020 mount_path: "/listeners".to_string(),
1021 ..Default::default()
1022 });
1023 volumes.push(Volume {
1024 name: "listeners-configmap".to_string(),
1025 config_map: Some(ConfigMapVolumeSource {
1026 name: mz.listeners_configmap_name(generation),
1027 default_mode: Some(256),
1028 optional: Some(false),
1029 items: Some(vec![KeyToPath {
1030 key: "listeners.json".to_string(),
1031 path: "listeners.json".to_string(),
1032 ..Default::default()
1033 }]),
1034 }),
1035 ..Default::default()
1036 });
1037 args.push("--listeners-config-path=/listeners/listeners.json".to_owned());
1038 if matches!(
1039 mz.spec.authenticator_kind,
1040 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1041 ) {
1042 args.push("--system-parameter-default=enable_password_auth=true".into());
1043 env.push(EnvVar {
1044 name: "MZ_EXTERNAL_LOGIN_PASSWORD_MZ_SYSTEM".to_string(),
1045 value_from: Some(EnvVarSource {
1046 secret_key_ref: Some(SecretKeySelector {
1047 name: mz.backend_secret_name(),
1048 key: "external_login_password_mz_system".to_string(),
1049 optional: Some(false),
1050 }),
1051 ..Default::default()
1052 }),
1053 ..Default::default()
1054 })
1055 }
1056 } else {
1057 args.extend([
1058 format!("--sql-listen-addr=0.0.0.0:{}", config.environmentd_sql_port),
1059 format!(
1060 "--http-listen-addr=0.0.0.0:{}",
1061 config.environmentd_http_port
1062 ),
1063 format!(
1064 "--internal-sql-listen-addr=0.0.0.0:{}",
1065 config.environmentd_internal_sql_port
1066 ),
1067 format!(
1068 "--internal-http-listen-addr=0.0.0.0:{}",
1069 config.environmentd_internal_http_port
1070 ),
1071 ]);
1072 }
1073
1074 let container = Container {
1075 name: "environmentd".to_owned(),
1076 image: Some(mz.spec.environmentd_image_ref.to_owned()),
1077 image_pull_policy: Some(config.image_pull_policy.to_string()),
1078 ports: Some(ports),
1079 args: Some(args),
1080 env: Some(env),
1081 volume_mounts: Some(volume_mounts),
1082 liveness_probe: Some(probe.clone()),
1083 readiness_probe: Some(probe),
1084 resources: mz
1085 .spec
1086 .environmentd_resource_requirements
1087 .clone()
1088 .or_else(|| config.environmentd_default_resources.clone()),
1089 security_context: security_context.clone(),
1090 ..Default::default()
1091 };
1092
1093 let mut pod_template_labels = mz.default_labels();
1094 pod_template_labels.insert(
1095 "materialize.cloud/name".to_owned(),
1096 mz.environmentd_statefulset_name(generation),
1097 );
1098 pod_template_labels.insert(
1099 "materialize.cloud/app".to_owned(),
1100 mz.environmentd_app_name(),
1101 );
1102 pod_template_labels.insert("app".to_owned(), "environmentd".to_string());
1103 pod_template_labels.extend(
1104 mz.spec
1105 .pod_labels
1106 .as_ref()
1107 .map(|labels| labels.iter())
1108 .unwrap_or_default()
1109 .map(|(key, value)| (key.clone(), value.clone())),
1110 );
1111
1112 let mut pod_template_annotations = btreemap! {
1113 "cluster-autoscaler.kubernetes.io/safe-to-evict".to_owned() => "false".to_string(),
1115
1116 "karpenter.sh/do-not-evict".to_owned() => "true".to_string(),
1118 "karpenter.sh/do-not-disrupt".to_owned() => "true".to_string(),
1119 "materialize.cloud/generation".to_owned() => generation.to_string(),
1120 };
1121 if config.enable_prometheus_scrape_annotations {
1122 pod_template_annotations.insert("prometheus.io/scrape".to_owned(), "true".to_string());
1123 pod_template_annotations.insert(
1124 "prometheus.io/port".to_owned(),
1125 config.environmentd_internal_http_port.to_string(),
1126 );
1127 pod_template_annotations.insert("prometheus.io/path".to_owned(), "/metrics".to_string());
1128 pod_template_annotations.insert("prometheus.io/scheme".to_owned(), "http".to_string());
1129 pod_template_annotations.insert(
1130 "materialize.prometheus.io/mz_usage_path".to_owned(),
1131 "/metrics/mz_usage".to_string(),
1132 );
1133 pod_template_annotations.insert(
1134 "materialize.prometheus.io/mz_frontier_path".to_owned(),
1135 "/metrics/mz_frontier".to_string(),
1136 );
1137 pod_template_annotations.insert(
1138 "materialize.prometheus.io/mz_compute_path".to_owned(),
1139 "/metrics/mz_compute".to_string(),
1140 );
1141 pod_template_annotations.insert(
1142 "materialize.prometheus.io/mz_storage_path".to_owned(),
1143 "/metrics/mz_storage".to_string(),
1144 );
1145 }
1146 pod_template_annotations.extend(
1147 mz.spec
1148 .pod_annotations
1149 .as_ref()
1150 .map(|annotations| annotations.iter())
1151 .unwrap_or_default()
1152 .map(|(key, value)| (key.clone(), value.clone())),
1153 );
1154
1155 let mut tolerations = vec![
1156 Toleration {
1160 effect: Some("NoExecute".into()),
1161 key: Some("node.kubernetes.io/not-ready".into()),
1162 operator: Some("Exists".into()),
1163 toleration_seconds: Some(30),
1164 value: None,
1165 },
1166 Toleration {
1167 effect: Some("NoExecute".into()),
1168 key: Some("node.kubernetes.io/unreachable".into()),
1169 operator: Some("Exists".into()),
1170 toleration_seconds: Some(30),
1171 value: None,
1172 },
1173 ];
1174 if let Some(user_tolerations) = &config.environmentd_tolerations {
1175 tolerations.extend(user_tolerations.iter().cloned());
1176 }
1177 let tolerations = Some(tolerations);
1178
1179 let pod_template_spec = PodTemplateSpec {
1180 metadata: Some(ObjectMeta {
1183 labels: Some(pod_template_labels),
1184 annotations: Some(pod_template_annotations), ..Default::default()
1186 }),
1187 spec: Some(PodSpec {
1188 containers: vec![container],
1189 node_selector: Some(
1190 config
1191 .environmentd_node_selector
1192 .iter()
1193 .map(|selector| (selector.key.clone(), selector.value.clone()))
1194 .collect(),
1195 ),
1196 affinity: config.environmentd_affinity.clone(),
1197 scheduler_name: config.scheduler_name.clone(),
1198 service_account_name: Some(mz.service_account_name()),
1199 volumes: Some(volumes),
1200 security_context: Some(PodSecurityContext {
1201 fs_group: Some(999),
1202 run_as_user: Some(999),
1203 run_as_group: Some(999),
1204 ..Default::default()
1205 }),
1206 tolerations,
1207 termination_grace_period_seconds: Some(0),
1228 ..Default::default()
1229 }),
1230 };
1231
1232 let mut match_labels = BTreeMap::new();
1233 match_labels.insert(
1234 "materialize.cloud/name".to_owned(),
1235 mz.environmentd_statefulset_name(generation),
1236 );
1237
1238 let statefulset_spec = StatefulSetSpec {
1239 replicas: Some(1),
1240 template: pod_template_spec,
1241 service_name: Some(mz.environmentd_service_name()),
1242 selector: LabelSelector {
1243 match_expressions: None,
1244 match_labels: Some(match_labels),
1245 },
1246 ..Default::default()
1247 };
1248
1249 StatefulSet {
1250 metadata: ObjectMeta {
1251 annotations: Some(btreemap! {
1252 "materialize.cloud/generation".to_owned() => generation.to_string(),
1253 "materialize.cloud/force".to_owned() => mz.spec.force_rollout.to_string(),
1254 }),
1255 ..mz.managed_resource_meta(mz.environmentd_statefulset_name(generation))
1256 },
1257 spec: Some(statefulset_spec),
1258 status: None,
1259 }
1260}
1261
1262fn create_connection_info(
1263 config: &super::Config,
1264 mz: &Materialize,
1265 generation: u64,
1266) -> ConnectionInfo {
1267 let external_enable_tls = issuer_ref_defined(
1268 &config.default_certificate_specs.internal,
1269 &mz.spec.internal_certificate_spec,
1270 );
1271 let authenticator_kind = mz.spec.authenticator_kind;
1272
1273 let mut listeners_config = ListenersConfig {
1274 sql: btreemap! {
1275 "external".to_owned() => SqlListenerConfig{
1276 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_sql_port),
1277 authenticator_kind,
1278 allowed_roles: AllowedRoles::Normal,
1279 enable_tls: external_enable_tls,
1280 },
1281 "internal".to_owned() => SqlListenerConfig{
1282 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_sql_port),
1283 authenticator_kind: AuthenticatorKind::None,
1284 allowed_roles: AllowedRoles::NormalAndInternal,
1286 enable_tls: false,
1287 },
1288 },
1289 http: btreemap! {
1290 "external".to_owned() => HttpListenerConfig{
1291 base: BaseListenerConfig {
1292 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_http_port),
1293 authenticator_kind: if authenticator_kind == AuthenticatorKind::Sasl {
1297 AuthenticatorKind::Password
1298 } else {
1299 authenticator_kind
1300 },
1301 allowed_roles: AllowedRoles::Normal,
1302 enable_tls: external_enable_tls,
1303 },
1304 routes: HttpRoutesEnabled{
1305 base: true,
1306 webhook: true,
1307 internal: false,
1308 metrics: false,
1309 profiling: false,
1310 }
1311 },
1312 "internal".to_owned() => HttpListenerConfig{
1313 base: BaseListenerConfig {
1314 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), config.environmentd_internal_http_port),
1315 authenticator_kind: AuthenticatorKind::None,
1316 allowed_roles: AllowedRoles::NormalAndInternal,
1318 enable_tls: false,
1319 },
1320 routes: HttpRoutesEnabled{
1321 base: true,
1322 webhook: true,
1323 internal: true,
1324 metrics: true,
1325 profiling: true,
1326 }
1327 },
1328 },
1329 };
1330
1331 if matches!(
1332 authenticator_kind,
1333 AuthenticatorKind::Password | AuthenticatorKind::Sasl
1334 ) {
1335 listeners_config.sql.remove("internal");
1336 listeners_config.http.remove("internal");
1337
1338 listeners_config.sql.get_mut("external").map(|listener| {
1339 listener.allowed_roles = AllowedRoles::NormalAndInternal;
1340 listener
1341 });
1342 listeners_config.http.get_mut("external").map(|listener| {
1343 listener.base.allowed_roles = AllowedRoles::NormalAndInternal;
1344 listener.routes.internal = true;
1345 listener.routes.profiling = true;
1346 listener
1347 });
1348
1349 listeners_config.http.insert(
1350 "metrics".to_owned(),
1351 HttpListenerConfig {
1352 base: BaseListenerConfig {
1353 addr: SocketAddr::new(
1354 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1355 config.environmentd_internal_http_port,
1356 ),
1357 authenticator_kind: AuthenticatorKind::None,
1358 allowed_roles: AllowedRoles::NormalAndInternal,
1359 enable_tls: false,
1360 },
1361 routes: HttpRoutesEnabled {
1362 base: false,
1363 webhook: false,
1364 internal: false,
1365 metrics: true,
1366 profiling: false,
1367 },
1368 },
1369 );
1370 }
1371
1372 let listeners_json = serde_json::to_string(&listeners_config).expect("known valid");
1373 let listeners_configmap = ConfigMap {
1374 binary_data: None,
1375 data: Some(btreemap! {
1376 "listeners.json".to_owned() => listeners_json,
1377 }),
1378 immutable: None,
1379 metadata: ObjectMeta {
1380 annotations: Some(btreemap! {
1381 "materialize.cloud/generation".to_owned() => generation.to_string(),
1382 }),
1383 ..mz.managed_resource_meta(mz.listeners_configmap_name(generation))
1384 },
1385 };
1386
1387 let (scheme, leader_api_port, mz_system_secret_name) = match authenticator_kind {
1388 AuthenticatorKind::Password | AuthenticatorKind::Sasl => {
1389 let scheme = if external_enable_tls { "https" } else { "http" };
1390 (
1391 scheme,
1392 config.environmentd_http_port,
1393 Some(mz.spec.backend_secret_name.clone()),
1394 )
1395 }
1396 _ => ("http", config.environmentd_internal_http_port, None),
1397 };
1398 let environmentd_url = format!(
1399 "{}://{}.{}.svc.cluster.local:{}",
1400 scheme,
1401 mz.environmentd_generation_service_name(generation),
1402 mz.namespace(),
1403 leader_api_port,
1404 );
1405 ConnectionInfo {
1406 environmentd_url,
1407 listeners_configmap,
1408 mz_system_secret_name,
1409 }
1410}
1411
1412#[derive(Debug, Deserialize, PartialEq, Eq)]
1414struct BecomeLeaderResponse {
1415 result: BecomeLeaderResult,
1416}
1417
1418#[derive(Debug, Deserialize, PartialEq, Eq)]
1419enum BecomeLeaderResult {
1420 Success,
1421 Failure { message: String },
1422}
1423
1424#[derive(Debug, Deserialize, PartialEq, Eq)]
1425struct SkipCatchupError {
1426 message: String,
1427}