1use std::collections::BTreeMap;
11use std::time::Duration;
12
13use k8s_openapi::{
14 api::core::v1::{EnvVar, ResourceRequirements},
15 apimachinery::pkg::{
16 api::resource::Quantity,
17 apis::meta::v1::{Condition, Time},
18 },
19 jiff::Timestamp,
20};
21use kube::{CustomResource, Resource, ResourceExt};
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use sha2::{Digest, Sha256};
26use uuid::Uuid;
27
28use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
29use mz_server_core::listeners::AuthenticatorKind;
30
31pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
32 "materialize.cloud/last-known-active-generation";
33pub const FORCE_ROLLOUT_ANNOTATION: &str = "materialize.cloud/force-rollout";
34
35#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36pub enum MaterializeRolloutStrategy {
37 #[default]
41 WaitUntilReady,
42
43 ManuallyPromote,
69
70 ImmediatelyPromoteCausingDowntime,
81}
82
83pub const DEFAULT_ROLLOUT_REQUEST_TIMEOUT: &str = "24h";
88
89#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
100#[serde(transparent)]
101pub struct RolloutRequestTimeout(pub String);
102
103impl Default for RolloutRequestTimeout {
104 fn default() -> Self {
105 RolloutRequestTimeout(DEFAULT_ROLLOUT_REQUEST_TIMEOUT.to_owned())
106 }
107}
108
109pub mod v1alpha1 {
110 use super::*;
111
112 #[derive(
113 CustomResource,
114 Clone,
115 Debug,
116 Default,
117 PartialEq,
118 Deserialize,
119 Serialize,
120 JsonSchema
121 )]
122 #[serde(rename_all = "camelCase")]
123 #[kube(
124 namespaced,
125 group = "materialize.cloud",
126 version = "v1alpha1",
127 kind = "Materialize",
128 singular = "materialize",
129 plural = "materializes",
130 shortname = "mzs",
131 status = "MaterializeStatus",
132 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
133 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
134 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
135 )]
136 pub struct MaterializeSpec {
137 pub environmentd_image_ref: String,
139 pub environmentd_extra_args: Option<Vec<String>>,
141 pub environmentd_extra_env: Option<Vec<EnvVar>>,
143 #[kube(deprecated)]
152 pub environmentd_iam_role_arn: Option<String>,
153 pub environmentd_connection_role_arn: Option<String>,
156 pub environmentd_resource_requirements: Option<ResourceRequirements>,
158 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
160 pub balancerd_resource_requirements: Option<ResourceRequirements>,
162 pub console_resource_requirements: Option<ResourceRequirements>,
164 pub balancerd_replicas: Option<i32>,
166 pub console_replicas: Option<i32>,
168
169 pub service_account_name: Option<String>,
172 pub service_account_annotations: Option<BTreeMap<String, String>>,
179 pub service_account_labels: Option<BTreeMap<String, String>>,
181 pub pod_annotations: Option<BTreeMap<String, String>>,
183 pub pod_labels: Option<BTreeMap<String, String>>,
185
186 #[serde(default)]
198 pub request_rollout: Uuid,
199 #[serde(default)]
204 pub force_promote: String,
205 #[serde(default)]
212 pub force_rollout: Uuid,
213 #[kube(deprecated)]
217 #[serde(default)]
218 pub in_place_rollout: bool,
219 #[serde(default)]
221 pub rollout_strategy: MaterializeRolloutStrategy,
222 #[serde(default)]
243 pub rollout_request_timeout: RolloutRequestTimeout,
244 pub backend_secret_name: String,
249 #[serde(default)]
251 pub authenticator_kind: AuthenticatorKind,
252 #[serde(default)]
254 pub enable_rbac: bool,
255
256 #[serde(default)]
263 pub environment_id: Uuid,
264
265 pub system_parameter_configmap_name: Option<String>,
280
281 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
285 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
290 pub internal_certificate_spec: Option<MaterializeCertSpec>,
295 }
296
297 impl Materialize {
298 pub fn backend_secret_name(&self) -> String {
299 self.spec.backend_secret_name.clone()
300 }
301
302 pub fn namespace(&self) -> String {
303 self.meta().namespace.clone().unwrap()
304 }
305
306 pub fn create_service_account(&self) -> bool {
307 self.spec.service_account_name.is_none()
308 }
309
310 pub fn service_account_name(&self) -> String {
311 self.spec
312 .service_account_name
313 .clone()
314 .unwrap_or_else(|| self.name_unchecked())
315 }
316
317 pub fn role_name(&self) -> String {
318 self.name_unchecked()
319 }
320
321 pub fn role_binding_name(&self) -> String {
322 self.name_unchecked()
323 }
324
325 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
326 self.name_prefixed(&format!("environmentd-{generation}"))
327 }
328
329 pub fn environmentd_app_name(&self) -> String {
330 "environmentd".to_owned()
331 }
332
333 pub fn environmentd_service_name(&self) -> String {
334 self.name_prefixed("environmentd")
335 }
336
337 pub fn environmentd_service_internal_fqdn(&self) -> String {
338 format!(
339 "{}.{}.svc.cluster.local",
340 self.environmentd_service_name(),
341 self.meta().namespace.as_ref().unwrap()
342 )
343 }
344
345 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
346 self.name_prefixed(&format!("environmentd-{generation}"))
347 }
348
349 pub fn balancerd_app_name(&self) -> String {
350 "balancerd".to_owned()
351 }
352
353 pub fn environmentd_certificate_name(&self) -> String {
354 self.name_prefixed("environmentd-external")
355 }
356
357 pub fn environmentd_certificate_secret_name(&self) -> String {
358 self.name_prefixed("environmentd-tls")
359 }
360
361 pub fn balancerd_deployment_name(&self) -> String {
362 self.name_prefixed("balancerd")
363 }
364
365 pub fn balancerd_service_name(&self) -> String {
366 self.name_prefixed("balancerd")
367 }
368
369 pub fn console_app_name(&self) -> String {
370 "console".to_owned()
371 }
372
373 pub fn balancerd_external_certificate_name(&self) -> String {
374 self.name_prefixed("balancerd-external")
375 }
376
377 pub fn balancerd_external_certificate_secret_name(&self) -> String {
378 self.name_prefixed("balancerd-external-tls")
379 }
380
381 pub fn balancerd_replicas(&self) -> i32 {
382 self.spec.balancerd_replicas.unwrap_or(2)
383 }
384
385 pub fn console_replicas(&self) -> i32 {
386 self.spec.console_replicas.unwrap_or(2)
387 }
388
389 pub fn console_configmap_name(&self) -> String {
390 self.name_prefixed("console")
391 }
392
393 pub fn console_deployment_name(&self) -> String {
394 self.name_prefixed("console")
395 }
396
397 pub fn console_service_name(&self) -> String {
398 self.name_prefixed("console")
399 }
400
401 pub fn console_external_certificate_name(&self) -> String {
402 self.name_prefixed("console-external")
403 }
404
405 pub fn console_external_certificate_secret_name(&self) -> String {
406 self.name_prefixed("console-external-tls")
407 }
408
409 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
410 self.name_prefixed(&format!("persist-pubsub-{generation}"))
411 }
412
413 pub fn listeners_configmap_name(&self, generation: u64) -> String {
414 self.name_prefixed(&format!("listeners-{generation}"))
415 }
416
417 pub fn name_prefixed(&self, suffix: &str) -> String {
418 format!("mz{}-{}", self.resource_id(), suffix)
419 }
420
421 pub fn resource_id(&self) -> &str {
422 &self.status.as_ref().unwrap().resource_id
423 }
424
425 pub fn system_parameter_configmap_name(&self) -> Option<String> {
426 self.spec.system_parameter_configmap_name.clone()
427 }
428
429 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
430 self.spec
431 .environmentd_scratch_volume_storage_requirement
432 .clone()
433 .unwrap_or_else(|| {
434 self.spec
435 .environmentd_resource_requirements
436 .as_ref()
437 .and_then(|requirements| {
438 requirements
439 .requests
440 .as_ref()
441 .or(requirements.limits.as_ref())
442 })
443 .and_then(|requirements| requirements.get("memory").cloned())
448 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
450 })
451 }
452
453 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
454 format!(
455 "{}-{}-{}-0",
456 cloud_provider, region, self.spec.environment_id,
457 )
458 }
459
460 pub fn requested_reconciliation_id(&self) -> Uuid {
461 self.spec.request_rollout
462 }
463
464 pub fn rollout_requested(&self) -> bool {
465 self.requested_reconciliation_id()
466 != self
467 .status
468 .as_ref()
469 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
470 }
471
472 pub fn rollout_request_timeout(&self) -> Duration {
477 let timeout = &self.spec.rollout_request_timeout.0;
478 humantime::parse_duration(timeout)
479 .or_else(|e| {
480 tracing::warn!(
481 rollout_request_timeout = %timeout,
482 "failed to parse rolloutRequestTimeout, using default: {e}",
483 );
484 humantime::parse_duration(DEFAULT_ROLLOUT_REQUEST_TIMEOUT)
485 })
486 .expect("DEFAULT_ROLLOUT_REQUEST_TIMEOUT must be a valid duration")
487 }
488
489 pub fn rollout_in_progress_since(&self) -> Option<Timestamp> {
506 self.status
507 .as_ref()?
508 .conditions
509 .iter()
510 .find_map(|condition| {
511 if condition.type_ == "UpToDate"
512 && condition.status == "Unknown"
513 && condition.reason != "Promoting"
514 {
515 Some(condition.last_transition_time.0)
516 } else {
517 None
518 }
519 })
520 }
521
522 pub fn up_to_date_transition_time(&self, new_status: &str, now: Timestamp) -> Timestamp {
534 self.status
535 .as_ref()
536 .and_then(|status| {
537 status
538 .conditions
539 .iter()
540 .find(|condition| condition.type_ == "UpToDate")
541 })
542 .filter(|condition| condition.status == new_status)
543 .map_or(now, |condition| condition.last_transition_time.0)
544 }
545
546 pub fn active_environmentd_image_ref(&self) -> &str {
554 self.status
555 .as_ref()
556 .and_then(|s| s.last_completed_rollout_environmentd_image_ref.as_deref())
557 .unwrap_or(&self.spec.environmentd_image_ref)
558 }
559
560 pub fn set_force_promote(&mut self) {
561 self.spec.force_promote = self.spec.request_rollout.hyphenated().to_string();
562 }
563
564 pub fn should_force_promote(&self) -> bool {
565 self.spec.force_promote == self.spec.request_rollout.hyphenated().to_string()
566 || self.spec.force_promote
567 == super::v1::Materialize::from(self.clone()).generate_rollout_hash()
568 || self.spec.rollout_strategy
569 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
570 }
571
572 pub fn conditions_need_update(&self) -> bool {
573 let Some(status) = self.status.as_ref() else {
574 return true;
575 };
576 if status.conditions.is_empty() {
577 return true;
578 }
579 for condition in &status.conditions {
580 if condition.observed_generation != self.meta().generation {
581 return true;
582 }
583 }
584 false
585 }
586
587 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
588 let Some(status) = self.status.as_ref() else {
589 return false;
590 };
591 if status.conditions.is_empty() {
592 return false;
593 }
594 status
595 .conditions
596 .iter()
597 .any(|condition| condition.reason == "ReadyToPromote")
598 && &status.resources_hash == resources_hash
599 }
600
601 pub fn is_promoting(&self) -> bool {
602 let Some(status) = self.status.as_ref() else {
603 return false;
604 };
605 if status.conditions.is_empty() {
606 return false;
607 }
608 status
609 .conditions
610 .iter()
611 .any(|condition| condition.reason == "Promoting")
612 }
613
614 pub fn update_in_progress(&self) -> bool {
615 let Some(status) = self.status.as_ref() else {
616 return false;
617 };
618 if status.conditions.is_empty() {
619 return false;
620 }
621 for condition in &status.conditions {
622 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
623 return true;
624 }
625 }
626 false
627 }
628
629 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
633 let version = parse_image_ref(&self.spec.environmentd_image_ref);
634 match version {
635 Some(version) => version.cmp_precedence(minimum).is_ge(),
637 None => {
643 tracing::warn!(
644 image_ref = %self.spec.environmentd_image_ref,
645 "failed to parse image ref",
646 );
647 true
648 }
649 }
650 }
651
652 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
656 if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
661 return false;
662 }
663
664 if active_version.major == 0 {
665 if next_version.major != active_version.major {
666 if next_version.major == 26 {
667 return (active_version.minor == 147 && active_version.patch >= 20)
670 || active_version.minor >= 164;
671 } else {
672 return false;
673 }
674 }
675 if next_version.minor == 147 && active_version.minor == 130 {
677 return true;
678 }
679 return next_version.minor <= active_version.minor + 1;
681 } else if active_version.major >= 26 {
682 return next_version.major <= active_version.major + 1;
684 }
685
686 true
687 }
688
689 pub fn within_upgrade_window(&self) -> bool {
692 let active_environmentd_version = self
693 .status
694 .as_ref()
695 .and_then(|status| {
696 status
697 .last_completed_rollout_environmentd_image_ref
698 .as_ref()
699 })
700 .and_then(|image_ref| parse_image_ref(image_ref));
701
702 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
703 parse_image_ref(&self.spec.environmentd_image_ref),
704 active_environmentd_version,
705 ) {
706 Self::is_valid_upgrade_version(
707 &active_environmentd_version,
708 &next_environmentd_version,
709 )
710 } else {
711 true
714 }
715 }
716
717 pub fn status(&self) -> MaterializeStatus {
718 self.status.clone().unwrap_or_else(|| {
719 let mut status = MaterializeStatus::default();
720
721 status.resource_id = new_resource_id();
722
723 if let Some(last_active_generation) = self
728 .annotations()
729 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
730 {
731 status.active_generation = last_active_generation
732 .parse()
733 .expect("valid int generation");
734 }
735
736 status.last_completed_rollout_environmentd_image_ref =
739 Some(self.spec.environmentd_image_ref.clone());
740
741 status
742 })
743 }
744 }
745
746 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
747 #[serde(rename_all = "camelCase")]
748 pub struct MaterializeStatus {
749 pub resource_id: String,
751 pub active_generation: u64,
753 pub last_completed_rollout_request: Uuid,
755 pub last_completed_rollout_environmentd_image_ref: Option<String>,
759 pub resources_hash: String,
764 pub last_completed_rollout_hash: Option<String>,
767 pub conditions: Vec<Condition>,
768 }
769
770 impl MaterializeStatus {
771 pub fn needs_update(&self, other: &Self) -> bool {
772 let now = Timestamp::now();
773 let mut a = self.clone();
774 for condition in &mut a.conditions {
775 condition.last_transition_time = Time(now);
776 }
777 let mut b = other.clone();
778 for condition in &mut b.conditions {
779 condition.last_transition_time = Time(now);
780 }
781 a != b
782 }
783 }
784
785 impl ManagedResource for Materialize {
786 fn default_labels(&self) -> BTreeMap<String, String> {
787 BTreeMap::from_iter([
788 (
789 "materialize.cloud/organization-name".to_owned(),
790 self.name_unchecked(),
791 ),
792 (
793 "materialize.cloud/organization-namespace".to_owned(),
794 self.namespace(),
795 ),
796 (
797 "materialize.cloud/mz-resource-id".to_owned(),
798 self.resource_id().to_owned(),
799 ),
800 ])
801 }
802 }
803
804 impl From<v1::Materialize> for Materialize {
805 fn from(value: v1::Materialize) -> Self {
806 let rollout_hash = value.generate_rollout_hash();
807 let request_rollout = Uuid::new_v5(&Uuid::NAMESPACE_OID, rollout_hash.as_bytes());
811 Materialize {
812 metadata: value.metadata,
813 spec: MaterializeSpec {
814 environmentd_image_ref: value.spec.environmentd_image_ref,
815 environmentd_extra_args: value.spec.environmentd_extra_args,
816 environmentd_extra_env: value.spec.environmentd_extra_env,
817 environmentd_iam_role_arn: None,
818 environmentd_connection_role_arn: value.spec.environmentd_connection_role_arn,
819 environmentd_resource_requirements: value
820 .spec
821 .environmentd_resource_requirements,
822 environmentd_scratch_volume_storage_requirement: value
823 .spec
824 .environmentd_scratch_volume_storage_requirement,
825 balancerd_resource_requirements: value.spec.balancerd_resource_requirements,
826 console_resource_requirements: value.spec.console_resource_requirements,
827 balancerd_replicas: value.spec.balancerd_replicas,
828 console_replicas: value.spec.console_replicas,
829 service_account_name: value.spec.service_account_name,
830 service_account_annotations: value.spec.service_account_annotations,
831 service_account_labels: value.spec.service_account_labels,
832 pod_annotations: value.spec.pod_annotations,
833 pod_labels: value.spec.pod_labels,
834 force_promote: value.spec.force_promote.unwrap_or_default(),
835 force_rollout: value.spec.force_rollout,
836 rollout_strategy: value.spec.rollout_strategy,
837 rollout_request_timeout: value.spec.rollout_request_timeout,
838 backend_secret_name: value.spec.backend_secret_name,
839 authenticator_kind: value.spec.authenticator_kind,
840 enable_rbac: value.spec.enable_rbac,
841 environment_id: value.spec.environment_id,
842 system_parameter_configmap_name: value.spec.system_parameter_configmap_name,
843 balancerd_external_certificate_spec: value
844 .spec
845 .balancerd_external_certificate_spec,
846 console_external_certificate_spec: value.spec.console_external_certificate_spec,
847 internal_certificate_spec: value.spec.internal_certificate_spec,
848 request_rollout,
849 in_place_rollout: false,
850 },
851 status: value.status.map(|status| MaterializeStatus {
852 resource_id: status.resource_id,
853 active_generation: status.active_generation,
854 last_completed_rollout_environmentd_image_ref: status
855 .last_completed_rollout_environmentd_image_ref,
856 conditions: status.conditions,
857 last_completed_rollout_request: status
861 .last_completed_rollout_hash
862 .as_ref()
863 .map(|hash| Uuid::new_v5(&Uuid::NAMESPACE_OID, hash.as_bytes()))
864 .unwrap_or(Uuid::nil()),
865 last_completed_rollout_hash: status.last_completed_rollout_hash,
866 resources_hash: "".to_owned(),
867 }),
868 }
869 }
870 }
871}
872
873pub mod v1 {
874 use super::*;
875
876 #[derive(
877 CustomResource,
878 Clone,
879 Debug,
880 Default,
881 PartialEq,
882 Deserialize,
883 Serialize,
884 JsonSchema
885 )]
886 #[serde(rename_all = "camelCase")]
887 #[kube(
888 namespaced,
889 group = "materialize.cloud",
890 version = "v1",
891 kind = "Materialize",
892 singular = "materialize",
893 plural = "materializes",
894 shortname = "mzs",
895 status = "MaterializeStatus",
896 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
897 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
898 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
899 )]
900 pub struct MaterializeSpec {
901 pub environmentd_image_ref: String,
903 pub environmentd_extra_args: Option<Vec<String>>,
905 pub environmentd_extra_env: Option<Vec<EnvVar>>,
907 pub environmentd_connection_role_arn: Option<String>,
910 pub environmentd_resource_requirements: Option<ResourceRequirements>,
912 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
914 pub balancerd_resource_requirements: Option<ResourceRequirements>,
918 pub console_resource_requirements: Option<ResourceRequirements>,
922 pub balancerd_replicas: Option<i32>,
926 pub console_replicas: Option<i32>,
930
931 pub service_account_name: Option<String>,
934 pub service_account_annotations: Option<BTreeMap<String, String>>,
941 pub service_account_labels: Option<BTreeMap<String, String>>,
943 pub pod_annotations: Option<BTreeMap<String, String>>,
945 pub pod_labels: Option<BTreeMap<String, String>>,
947
948 pub force_promote: Option<String>,
955 #[serde(default)]
959 pub force_rollout: Uuid,
960 #[serde(default)]
962 pub rollout_strategy: MaterializeRolloutStrategy,
963 #[serde(default)]
984 pub rollout_request_timeout: RolloutRequestTimeout,
985 pub backend_secret_name: String,
989 #[serde(default)]
991 pub authenticator_kind: AuthenticatorKind,
992 #[serde(default)]
994 pub enable_rbac: bool,
995
996 #[serde(default)]
1003 pub environment_id: Uuid,
1004
1005 pub system_parameter_configmap_name: Option<String>,
1020
1021 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
1027 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
1034 pub internal_certificate_spec: Option<MaterializeCertSpec>,
1039 }
1040
1041 impl Materialize {
1042 pub fn generate_rollout_hash(&self) -> String {
1043 let mut hasher = Sha256::new();
1044 let spec = MaterializeSpec {
1047 environmentd_image_ref: self.spec.environmentd_image_ref.clone(),
1048 environmentd_extra_args: self.spec.environmentd_extra_args.clone(),
1049 environmentd_extra_env: self.spec.environmentd_extra_env.clone(),
1050 environmentd_connection_role_arn: self
1051 .spec
1052 .environmentd_connection_role_arn
1053 .clone(),
1054 environmentd_resource_requirements: self
1055 .spec
1056 .environmentd_resource_requirements
1057 .clone(),
1058 environmentd_scratch_volume_storage_requirement: self
1059 .spec
1060 .environmentd_scratch_volume_storage_requirement
1061 .clone(),
1062 balancerd_resource_requirements: None,
1063 console_resource_requirements: None,
1064 balancerd_replicas: None,
1065 console_replicas: None,
1066 service_account_name: self.spec.service_account_name.clone(),
1067 service_account_annotations: self.spec.service_account_annotations.clone(),
1068 service_account_labels: self.spec.service_account_labels.clone(),
1069 pod_annotations: self.spec.pod_annotations.clone(),
1070 pod_labels: self.spec.pod_labels.clone(),
1071 force_promote: None,
1072 force_rollout: self.spec.force_rollout,
1073 rollout_strategy: self.spec.rollout_strategy.clone(),
1074 rollout_request_timeout: self.spec.rollout_request_timeout.clone(),
1075 backend_secret_name: self.spec.backend_secret_name.clone(),
1076 authenticator_kind: self.spec.authenticator_kind,
1077 enable_rbac: self.spec.enable_rbac,
1078 environment_id: self.spec.environment_id,
1079 system_parameter_configmap_name: self.spec.system_parameter_configmap_name.clone(),
1080 balancerd_external_certificate_spec: None,
1081 console_external_certificate_spec: None,
1082 internal_certificate_spec: self.spec.internal_certificate_spec.clone(),
1083 };
1084 hasher.update(&serde_json::to_vec(&spec).unwrap());
1085 if let Some(annotation) = self
1086 .metadata
1087 .annotations
1088 .as_ref()
1089 .and_then(|annotations| annotations.get(FORCE_ROLLOUT_ANNOTATION))
1090 {
1091 hasher.update(annotation);
1092 }
1093 format!("{:x}", hasher.finalize())
1094 }
1095
1096 pub fn backend_secret_name(&self) -> String {
1097 self.spec.backend_secret_name.clone()
1098 }
1099
1100 pub fn namespace(&self) -> String {
1101 self.meta().namespace.clone().unwrap()
1102 }
1103
1104 pub fn create_service_account(&self) -> bool {
1105 self.spec.service_account_name.is_none()
1106 }
1107
1108 pub fn service_account_name(&self) -> String {
1109 self.spec
1110 .service_account_name
1111 .clone()
1112 .unwrap_or_else(|| self.name_unchecked())
1113 }
1114
1115 pub fn role_name(&self) -> String {
1116 self.name_unchecked()
1117 }
1118
1119 pub fn role_binding_name(&self) -> String {
1120 self.name_unchecked()
1121 }
1122
1123 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
1124 self.name_prefixed(&format!("environmentd-{generation}"))
1125 }
1126
1127 pub fn environmentd_app_name(&self) -> String {
1128 "environmentd".to_owned()
1129 }
1130
1131 pub fn environmentd_service_name(&self) -> String {
1132 self.name_prefixed("environmentd")
1133 }
1134
1135 pub fn environmentd_service_internal_fqdn(&self) -> String {
1136 format!(
1137 "{}.{}.svc.cluster.local",
1138 self.environmentd_service_name(),
1139 self.meta().namespace.as_ref().unwrap()
1140 )
1141 }
1142
1143 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
1144 self.name_prefixed(&format!("environmentd-{generation}"))
1145 }
1146
1147 pub fn balancerd_app_name(&self) -> String {
1148 "balancerd".to_owned()
1149 }
1150
1151 pub fn environmentd_certificate_name(&self) -> String {
1152 self.name_prefixed("environmentd-external")
1153 }
1154
1155 pub fn environmentd_certificate_secret_name(&self) -> String {
1156 self.name_prefixed("environmentd-tls")
1157 }
1158
1159 pub fn balancerd_deployment_name(&self) -> String {
1160 self.name_prefixed("balancerd")
1161 }
1162
1163 pub fn balancerd_service_name(&self) -> String {
1164 self.name_prefixed("balancerd")
1165 }
1166
1167 pub fn console_app_name(&self) -> String {
1168 "console".to_owned()
1169 }
1170
1171 pub fn balancerd_external_certificate_name(&self) -> String {
1172 self.name_prefixed("balancerd-external")
1173 }
1174
1175 pub fn balancerd_external_certificate_secret_name(&self) -> String {
1176 self.name_prefixed("balancerd-external-tls")
1177 }
1178
1179 pub fn balancerd_replicas(&self) -> i32 {
1180 self.spec.balancerd_replicas.unwrap_or(2)
1181 }
1182
1183 pub fn console_replicas(&self) -> i32 {
1184 self.spec.console_replicas.unwrap_or(2)
1185 }
1186
1187 pub fn console_configmap_name(&self) -> String {
1188 self.name_prefixed("console")
1189 }
1190
1191 pub fn console_deployment_name(&self) -> String {
1192 self.name_prefixed("console")
1193 }
1194
1195 pub fn console_service_name(&self) -> String {
1196 self.name_prefixed("console")
1197 }
1198
1199 pub fn console_external_certificate_name(&self) -> String {
1200 self.name_prefixed("console-external")
1201 }
1202
1203 pub fn console_external_certificate_secret_name(&self) -> String {
1204 self.name_prefixed("console-external-tls")
1205 }
1206
1207 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
1208 self.name_prefixed(&format!("persist-pubsub-{generation}"))
1209 }
1210
1211 pub fn listeners_configmap_name(&self, generation: u64) -> String {
1212 self.name_prefixed(&format!("listeners-{generation}"))
1213 }
1214
1215 pub fn name_prefixed(&self, suffix: &str) -> String {
1216 format!("mz{}-{}", self.resource_id(), suffix)
1217 }
1218
1219 pub fn resource_id(&self) -> &str {
1220 &self.status.as_ref().unwrap().resource_id
1221 }
1222
1223 pub fn system_parameter_configmap_name(&self) -> Option<String> {
1224 self.spec.system_parameter_configmap_name.clone()
1225 }
1226
1227 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
1228 self.spec
1229 .environmentd_scratch_volume_storage_requirement
1230 .clone()
1231 .unwrap_or_else(|| {
1232 self.spec
1233 .environmentd_resource_requirements
1234 .as_ref()
1235 .and_then(|requirements| {
1236 requirements
1237 .requests
1238 .as_ref()
1239 .or(requirements.limits.as_ref())
1240 })
1241 .and_then(|requirements| requirements.get("memory").cloned())
1246 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
1248 })
1249 }
1250
1251 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
1252 format!(
1253 "{}-{}-{}-0",
1254 cloud_provider, region, self.spec.environment_id,
1255 )
1256 }
1257
1258 pub fn rollout_requested(&self) -> bool {
1259 self.status
1260 .as_ref()
1261 .map(|status| status.last_completed_rollout_hash != status.requested_rollout_hash)
1262 .unwrap_or(false)
1263 }
1264
1265 pub fn set_force_promote(&mut self) {
1266 self.spec.force_promote = Some(self.generate_rollout_hash());
1267 }
1268
1269 pub fn should_force_promote(&self) -> bool {
1270 self.spec.force_promote.as_ref()
1271 == self
1272 .status
1273 .as_ref()
1274 .and_then(|status| status.requested_rollout_hash.as_ref())
1275 || self.spec.rollout_strategy
1276 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
1277 }
1278
1279 pub fn conditions_need_update(&self) -> bool {
1280 let Some(status) = self.status.as_ref() else {
1281 return true;
1282 };
1283 if status.conditions.is_empty() {
1284 return true;
1285 }
1286 for condition in &status.conditions {
1287 if condition.observed_generation != self.meta().generation {
1288 return true;
1289 }
1290 }
1291 false
1292 }
1293
1294 pub fn is_ready_to_promote(&self, rollout_hash: &str) -> bool {
1295 let Some(status) = self.status.as_ref() else {
1296 return false;
1297 };
1298 if status.conditions.is_empty() {
1299 return false;
1300 }
1301 status
1302 .conditions
1303 .iter()
1304 .any(|condition| condition.reason == "ReadyToPromote")
1305 && status.requested_rollout_hash.as_deref() == Some(rollout_hash)
1306 }
1307
1308 pub fn is_promoting(&self) -> bool {
1309 let Some(status) = self.status.as_ref() else {
1310 return false;
1311 };
1312 if status.conditions.is_empty() {
1313 return false;
1314 }
1315 status
1316 .conditions
1317 .iter()
1318 .any(|condition| condition.reason == "Promoting")
1319 }
1320
1321 pub fn update_in_progress(&self) -> bool {
1322 let Some(status) = self.status.as_ref() else {
1323 return false;
1324 };
1325 if status.conditions.is_empty() {
1326 return false;
1327 }
1328 for condition in &status.conditions {
1329 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
1330 return true;
1331 }
1332 }
1333 false
1334 }
1335
1336 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
1340 let version = parse_image_ref(&self.spec.environmentd_image_ref);
1341 match version {
1342 Some(version) => version.cmp_precedence(minimum).is_ge(),
1344 None => {
1350 tracing::warn!(
1351 image_ref = %self.spec.environmentd_image_ref,
1352 "failed to parse image ref",
1353 );
1354 true
1355 }
1356 }
1357 }
1358
1359 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
1363 if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
1368 return false;
1369 }
1370
1371 if active_version.major == 0 {
1372 if next_version.major != active_version.major {
1373 if next_version.major == 26 {
1374 return (active_version.minor == 147 && active_version.patch >= 20)
1378 || active_version.minor >= 164;
1379 } else {
1380 return false;
1381 }
1382 }
1383 if next_version.minor == 147 && active_version.minor == 130 {
1385 return true;
1386 }
1387 return next_version.minor <= active_version.minor + 1;
1389 } else if active_version.major >= 26 {
1390 return next_version.major <= active_version.major + 1;
1392 }
1393
1394 true
1395 }
1396
1397 pub fn within_upgrade_window(&self) -> bool {
1400 let active_environmentd_version = self
1401 .status
1402 .as_ref()
1403 .and_then(|status| {
1404 status
1405 .last_completed_rollout_environmentd_image_ref
1406 .as_ref()
1407 })
1408 .and_then(|image_ref| parse_image_ref(image_ref));
1409
1410 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
1411 parse_image_ref(&self.spec.environmentd_image_ref),
1412 active_environmentd_version,
1413 ) {
1414 Self::is_valid_upgrade_version(
1415 &active_environmentd_version,
1416 &next_environmentd_version,
1417 )
1418 } else {
1419 true
1422 }
1423 }
1424
1425 pub fn status(&self) -> MaterializeStatus {
1426 self.status.clone().unwrap_or_else(|| {
1427 let mut status = MaterializeStatus::default();
1428
1429 status.resource_id = new_resource_id();
1430
1431 if let Some(last_active_generation) = self
1436 .annotations()
1437 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
1438 {
1439 status.active_generation = last_active_generation
1440 .parse()
1441 .expect("valid int generation");
1442 }
1443
1444 status.last_completed_rollout_environmentd_image_ref =
1447 Some(self.spec.environmentd_image_ref.clone());
1448
1449 status
1450 })
1451 }
1452 }
1453
1454 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
1455 #[serde(rename_all = "camelCase")]
1456 pub struct MaterializeStatus {
1457 pub resource_id: String,
1459 pub active_generation: u64,
1461 pub last_completed_rollout_environmentd_image_ref: Option<String>,
1465 pub last_completed_rollout_hash: Option<String>,
1467 pub requested_rollout_hash: Option<String>,
1470 pub conditions: Vec<Condition>,
1471 }
1472
1473 impl MaterializeStatus {
1474 pub fn needs_update(&self, other: &Self) -> bool {
1475 let now = Timestamp::now();
1476 let mut a = self.clone();
1477 for condition in &mut a.conditions {
1478 condition.last_transition_time = Time(now);
1479 }
1480 let mut b = other.clone();
1481 for condition in &mut b.conditions {
1482 condition.last_transition_time = Time(now);
1483 }
1484 a != b
1485 }
1486 }
1487
1488 impl ManagedResource for Materialize {
1489 fn default_labels(&self) -> BTreeMap<String, String> {
1490 BTreeMap::from_iter([
1491 (
1492 "materialize.cloud/organization-name".to_owned(),
1493 self.name_unchecked(),
1494 ),
1495 (
1496 "materialize.cloud/organization-namespace".to_owned(),
1497 self.namespace(),
1498 ),
1499 (
1500 "materialize.cloud/mz-resource-id".to_owned(),
1501 self.resource_id().to_owned(),
1502 ),
1503 ])
1504 }
1505
1506 fn app_name(&self) -> Option<&str> {
1507 Some("environmentd")
1508 }
1509 }
1510
1511 impl From<v1alpha1::Materialize> for Materialize {
1512 fn from(value: v1alpha1::Materialize) -> Self {
1513 let is_promoting = value.is_promoting();
1514 let service_account_annotations = if let Some(environmentd_iam_role_arn) =
1515 value.spec.environmentd_iam_role_arn
1516 {
1517 let mut annotations = value.spec.service_account_annotations.unwrap_or_default();
1518 annotations
1519 .entry("eks.amazonaws.com/role-arn".to_owned())
1520 .or_insert(environmentd_iam_role_arn);
1521 Some(annotations)
1522 } else {
1523 value.spec.service_account_annotations
1524 };
1525 let mut mz = Materialize {
1526 metadata: value.metadata,
1527 spec: MaterializeSpec {
1528 environmentd_image_ref: value.spec.environmentd_image_ref,
1529 environmentd_extra_args: value.spec.environmentd_extra_args,
1530 environmentd_extra_env: value.spec.environmentd_extra_env,
1531 environmentd_connection_role_arn: value.spec.environmentd_connection_role_arn,
1532 environmentd_resource_requirements: value
1533 .spec
1534 .environmentd_resource_requirements,
1535 environmentd_scratch_volume_storage_requirement: value
1536 .spec
1537 .environmentd_scratch_volume_storage_requirement,
1538 balancerd_resource_requirements: value.spec.balancerd_resource_requirements,
1539 console_resource_requirements: value.spec.console_resource_requirements,
1540 balancerd_replicas: value.spec.balancerd_replicas,
1541 console_replicas: value.spec.console_replicas,
1542 service_account_name: value.spec.service_account_name,
1543 service_account_annotations,
1544 service_account_labels: value.spec.service_account_labels,
1545 pod_annotations: value.spec.pod_annotations,
1546 pod_labels: value.spec.pod_labels,
1547 force_promote: if value.spec.force_promote.is_empty()
1548 || &value.spec.force_promote == "00000000-0000-0000-0000-000000000000"
1549 {
1550 None
1551 } else {
1552 Some(value.spec.force_promote.to_string())
1553 },
1554 force_rollout: value.spec.force_rollout,
1555 rollout_strategy: value.spec.rollout_strategy,
1556 rollout_request_timeout: value.spec.rollout_request_timeout,
1557 backend_secret_name: value.spec.backend_secret_name,
1558 authenticator_kind: value.spec.authenticator_kind,
1559 enable_rbac: value.spec.enable_rbac,
1560 environment_id: value.spec.environment_id,
1561 system_parameter_configmap_name: value.spec.system_parameter_configmap_name,
1562 balancerd_external_certificate_spec: value
1563 .spec
1564 .balancerd_external_certificate_spec,
1565 console_external_certificate_spec: value.spec.console_external_certificate_spec,
1566 internal_certificate_spec: value.spec.internal_certificate_spec,
1567 },
1568 status: None,
1569 };
1570 let calculated_rollout_hash = mz.generate_rollout_hash();
1571 let last_completed_rollout_hash = match value
1572 .status
1573 .as_ref()
1574 .and_then(|status| status.last_completed_rollout_hash.to_owned())
1575 {
1576 Some(last_completed_rollout_hash) => Some(last_completed_rollout_hash),
1577 None => {
1578 let currently_rolling_out = value
1579 .status
1580 .as_ref()
1581 .map(|status| {
1582 status.last_completed_rollout_request != value.spec.request_rollout
1583 || status.last_completed_rollout_request.is_nil()
1586 })
1587 .unwrap_or(true);
1588 if currently_rolling_out {
1589 None
1591 } else {
1592 Some(calculated_rollout_hash.clone())
1593 }
1594 }
1595 };
1596 let requested_rollout_hash = if is_promoting {
1597 None
1598 } else {
1599 Some(calculated_rollout_hash)
1600 };
1601 mz.status = value.status.map(|status| MaterializeStatus {
1602 resource_id: status.resource_id,
1603 active_generation: status.active_generation,
1604 last_completed_rollout_environmentd_image_ref: status
1605 .last_completed_rollout_environmentd_image_ref,
1606 last_completed_rollout_hash,
1607 requested_rollout_hash,
1608 conditions: status.conditions,
1609 });
1610 mz
1611 }
1612 }
1613}
1614
1615fn parse_image_ref(image_ref: &str) -> Option<Version> {
1616 image_ref
1617 .rsplit_once(':')
1618 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
1619 .and_then(|tag| {
1620 let tag = tag.replace("--", "+");
1625 Version::parse(&tag).ok()
1626 })
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use std::time::Duration;
1632
1633 use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
1634 use k8s_openapi::jiff::Timestamp;
1635 use kube::core::ObjectMeta;
1636 use semver::Version;
1637
1638 use super::v1alpha1::{Materialize, MaterializeSpec, MaterializeStatus};
1639 use super::{DEFAULT_ROLLOUT_REQUEST_TIMEOUT, RolloutRequestTimeout};
1640
1641 #[mz_ore::test]
1642 fn meets_minimum_version() {
1643 let mut mz = Materialize {
1644 spec: MaterializeSpec {
1645 environmentd_image_ref:
1646 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
1647 .to_owned(),
1648 ..Default::default()
1649 },
1650 metadata: ObjectMeta {
1651 ..Default::default()
1652 },
1653 status: None,
1654 };
1655
1656 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1658 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
1659 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1660 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
1661 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1662 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
1663 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1664 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
1665 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1666 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
1667 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1668 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
1669 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1670 mz.spec.environmentd_image_ref =
1671 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
1672 .to_owned();
1673 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
1674
1675 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
1677 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1678 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
1679 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1680 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
1681 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
1682 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
1683 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1684 }
1685
1686 #[mz_ore::test]
1687 fn within_upgrade_window() {
1688 let mut mz = Materialize {
1689 spec: MaterializeSpec {
1690 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
1691 ..Default::default()
1692 },
1693 metadata: ObjectMeta {
1694 ..Default::default()
1695 },
1696 status: Some(MaterializeStatus {
1697 last_completed_rollout_environmentd_image_ref: Some(
1698 "materialize/environmentd:v26.0.0".to_owned(),
1699 ),
1700 ..Default::default()
1701 }),
1702 };
1703
1704 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
1706 assert!(mz.within_upgrade_window());
1707
1708 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
1710 assert!(mz.within_upgrade_window());
1711
1712 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
1714 assert!(!mz.within_upgrade_window());
1715
1716 mz.spec.environmentd_image_ref =
1718 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
1719 assert!(mz.within_upgrade_window());
1720
1721 mz.status
1723 .as_mut()
1724 .unwrap()
1725 .last_completed_rollout_environmentd_image_ref =
1726 Some("materialize/environmentd:v0.147.20".to_owned());
1727 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
1728 assert!(mz.within_upgrade_window());
1729
1730 mz.status
1732 .as_mut()
1733 .unwrap()
1734 .last_completed_rollout_environmentd_image_ref =
1735 Some("materialize/environmentd:v26.11.0-dev.0+b".to_owned());
1736 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.11.0-dev.0+a".to_owned();
1737 assert!(mz.within_upgrade_window());
1738 }
1739
1740 #[mz_ore::test]
1741 fn is_valid_upgrade_version() {
1742 let success_tests = [
1743 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
1744 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
1745 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
1746 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
1747 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
1748 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
1749 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
1750 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
1752 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
1753 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
1754 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
1755 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
1756 ];
1757 for (active_version, next_version) in success_tests {
1758 assert!(
1759 Materialize::is_valid_upgrade_version(&active_version, &next_version),
1760 "v{active_version} can upgrade to v{next_version}"
1761 );
1762 }
1763
1764 let failure_tests = [
1765 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
1766 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
1767 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
1768 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
1769 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
1770 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
1771 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
1773 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
1775 ];
1776 for (active_version, next_version) in failure_tests {
1777 assert!(
1778 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
1779 "v{active_version} can't upgrade to v{next_version}"
1780 );
1781 }
1782 }
1783
1784 #[mz_ore::test]
1785 fn rollout_request_timeout() {
1786 let mz_with = |timeout: &str| Materialize {
1787 spec: MaterializeSpec {
1788 rollout_request_timeout: RolloutRequestTimeout(timeout.to_owned()),
1789 ..Default::default()
1790 },
1791 metadata: ObjectMeta::default(),
1792 status: None,
1793 };
1794
1795 let default = humantime::parse_duration(DEFAULT_ROLLOUT_REQUEST_TIMEOUT).unwrap();
1797 assert_eq!(default, Duration::from_secs(24 * 60 * 60));
1798
1799 assert_eq!(
1802 RolloutRequestTimeout::default().0,
1803 DEFAULT_ROLLOUT_REQUEST_TIMEOUT
1804 );
1805 assert_eq!(
1806 Materialize {
1807 spec: MaterializeSpec::default(),
1808 metadata: ObjectMeta::default(),
1809 status: None,
1810 }
1811 .rollout_request_timeout(),
1812 default
1813 );
1814
1815 assert_eq!(
1817 mz_with("1h").rollout_request_timeout(),
1818 Duration::from_secs(60 * 60)
1819 );
1820 assert_eq!(
1821 mz_with("90m").rollout_request_timeout(),
1822 Duration::from_secs(90 * 60)
1823 );
1824 assert_eq!(
1825 mz_with("1h 30m").rollout_request_timeout(),
1826 Duration::from_secs(90 * 60)
1827 );
1828 assert_eq!(mz_with("not a duration").rollout_request_timeout(), default);
1830 }
1831
1832 #[mz_ore::test]
1833 fn rollout_request_timeout_schema_default() {
1834 let crd = serde_json::to_value(<Materialize as kube::CustomResourceExt>::crd())
1838 .expect("CRD serializes");
1839 let default = &crd["spec"]["versions"][0]["schema"]["openAPIV3Schema"]["properties"]["spec"]
1840 ["properties"]["rolloutRequestTimeout"]["default"];
1841 assert_eq!(
1842 default,
1843 &serde_json::json!(DEFAULT_ROLLOUT_REQUEST_TIMEOUT),
1844 "rolloutRequestTimeout schema default missing/wrong in generated CRD",
1845 );
1846 }
1847
1848 #[mz_ore::test]
1849 fn rollout_in_progress_since() {
1850 let now = Timestamp::now();
1851 let condition = |type_: &str, status: &str| Condition {
1852 type_: type_.to_owned(),
1853 status: status.to_owned(),
1854 last_transition_time: Time(now),
1855 message: String::new(),
1856 observed_generation: None,
1857 reason: "Test".to_owned(),
1858 };
1859 let mz_with = |conditions: Vec<Condition>| Materialize {
1860 spec: MaterializeSpec::default(),
1861 metadata: ObjectMeta::default(),
1862 status: Some(MaterializeStatus {
1863 conditions,
1864 ..Default::default()
1865 }),
1866 };
1867
1868 let mz = Materialize {
1870 spec: MaterializeSpec::default(),
1871 metadata: ObjectMeta::default(),
1872 status: None,
1873 };
1874 assert_eq!(mz.rollout_in_progress_since(), None);
1875
1876 assert_eq!(
1879 mz_with(vec![condition("UpToDate", "Unknown")]).rollout_in_progress_since(),
1880 Some(now)
1881 );
1882
1883 assert_eq!(
1887 mz_with(vec![Condition {
1888 reason: "Promoting".to_owned(),
1889 ..condition("UpToDate", "Unknown")
1890 }])
1891 .rollout_in_progress_since(),
1892 None
1893 );
1894
1895 assert_eq!(
1897 mz_with(vec![condition("UpToDate", "True")]).rollout_in_progress_since(),
1898 None
1899 );
1900 assert_eq!(
1901 mz_with(vec![condition("UpToDate", "False")]).rollout_in_progress_since(),
1902 None
1903 );
1904 }
1905
1906 #[mz_ore::test]
1907 fn up_to_date_transition_time() {
1908 let stored = Timestamp::from_second(1_000).unwrap();
1911 let now = Timestamp::from_second(2_000).unwrap();
1912
1913 let condition = |status: &str| Condition {
1914 type_: "UpToDate".to_owned(),
1915 status: status.to_owned(),
1916 last_transition_time: Time(stored),
1917 message: String::new(),
1918 observed_generation: None,
1919 reason: "Test".to_owned(),
1920 };
1921 let mz_with = |conditions: Vec<Condition>| Materialize {
1922 spec: MaterializeSpec::default(),
1923 metadata: ObjectMeta::default(),
1924 status: Some(MaterializeStatus {
1925 conditions,
1926 ..Default::default()
1927 }),
1928 };
1929
1930 let mz = Materialize {
1932 spec: MaterializeSpec::default(),
1933 metadata: ObjectMeta::default(),
1934 status: None,
1935 };
1936 assert_eq!(mz.up_to_date_transition_time("Unknown", now), now);
1937
1938 assert_eq!(
1942 mz_with(vec![condition("Unknown")]).up_to_date_transition_time("Unknown", now),
1943 stored
1944 );
1945
1946 assert_eq!(
1948 mz_with(vec![condition("Unknown")]).up_to_date_transition_time("True", now),
1949 now
1950 );
1951 }
1952
1953 #[mz_ore::test]
1954 fn active_environmentd_image_ref() {
1955 const OLD: &str = "materialize/environmentd:v26.0.0";
1956 const NEW: &str = "materialize/environmentd:v27.0.0";
1957
1958 let mz_with = |spec_image: &str, status: Option<MaterializeStatus>| Materialize {
1959 spec: MaterializeSpec {
1960 environmentd_image_ref: spec_image.to_owned(),
1961 ..Default::default()
1962 },
1963 metadata: ObjectMeta::default(),
1964 status,
1965 };
1966
1967 let mz = mz_with(NEW, None);
1969 assert_eq!(mz.active_environmentd_image_ref(), NEW);
1970
1971 let mz = mz_with(
1975 NEW,
1976 Some(MaterializeStatus {
1977 last_completed_rollout_environmentd_image_ref: None,
1978 ..Default::default()
1979 }),
1980 );
1981 assert_eq!(mz.active_environmentd_image_ref(), NEW);
1982
1983 let mz = mz_with(
1986 NEW,
1987 Some(MaterializeStatus {
1988 last_completed_rollout_environmentd_image_ref: Some(NEW.to_owned()),
1989 ..Default::default()
1990 }),
1991 );
1992 assert_eq!(mz.active_environmentd_image_ref(), NEW);
1993
1994 let mz = mz_with(
2003 NEW,
2004 Some(MaterializeStatus {
2005 last_completed_rollout_environmentd_image_ref: Some(OLD.to_owned()),
2006 ..Default::default()
2007 }),
2008 );
2009 assert_eq!(mz.active_environmentd_image_ref(), OLD);
2010 }
2011}