1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18 jiff::Timestamp,
19};
20use kube::{CustomResource, Resource, ResourceExt};
21use schemars::JsonSchema;
22use semver::Version;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
27use mz_server_core::listeners::AuthenticatorKind;
28
29pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
30 "materialize.cloud/last-known-active-generation";
31
32pub mod v1alpha1 {
33 use super::*;
34
35 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36 pub enum MaterializeRolloutStrategy {
37 #[default]
41 WaitUntilReady,
42
43 ManuallyPromote,
65
66 ImmediatelyPromoteCausingDowntime,
77 }
78
79 #[derive(
80 CustomResource,
81 Clone,
82 Debug,
83 Default,
84 PartialEq,
85 Deserialize,
86 Serialize,
87 JsonSchema
88 )]
89 #[serde(rename_all = "camelCase")]
90 #[kube(
91 namespaced,
92 group = "materialize.cloud",
93 version = "v1alpha1",
94 kind = "Materialize",
95 singular = "materialize",
96 plural = "materializes",
97 shortname = "mzs",
98 status = "MaterializeStatus",
99 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
100 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
101 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
102 )]
103 pub struct MaterializeSpec {
104 pub environmentd_image_ref: String,
106 pub environmentd_extra_args: Option<Vec<String>>,
108 pub environmentd_extra_env: Option<Vec<EnvVar>>,
110 #[kube(deprecated)]
119 pub environmentd_iam_role_arn: Option<String>,
120 pub environmentd_connection_role_arn: Option<String>,
123 pub environmentd_resource_requirements: Option<ResourceRequirements>,
125 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
127 pub balancerd_resource_requirements: Option<ResourceRequirements>,
129 pub console_resource_requirements: Option<ResourceRequirements>,
131 pub balancerd_replicas: Option<i32>,
133 pub console_replicas: Option<i32>,
135
136 pub service_account_name: Option<String>,
139 pub service_account_annotations: Option<BTreeMap<String, String>>,
146 pub service_account_labels: Option<BTreeMap<String, String>>,
148 pub pod_annotations: Option<BTreeMap<String, String>>,
150 pub pod_labels: Option<BTreeMap<String, String>>,
152
153 #[serde(default)]
165 pub request_rollout: Uuid,
166 #[serde(default)]
171 pub force_promote: Uuid,
172 #[serde(default)]
179 pub force_rollout: Uuid,
180 #[kube(deprecated)]
184 #[serde(default)]
185 pub in_place_rollout: bool,
186 #[serde(default)]
188 pub rollout_strategy: MaterializeRolloutStrategy,
189 pub backend_secret_name: String,
194 #[serde(default)]
196 pub authenticator_kind: AuthenticatorKind,
197 #[serde(default)]
199 pub enable_rbac: bool,
200
201 #[serde(default)]
208 pub environment_id: Uuid,
209
210 pub system_parameter_configmap_name: Option<String>,
225
226 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
230 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
235 pub internal_certificate_spec: Option<MaterializeCertSpec>,
240 }
241
242 impl Materialize {
243 pub fn backend_secret_name(&self) -> String {
244 self.spec.backend_secret_name.clone()
245 }
246
247 pub fn namespace(&self) -> String {
248 self.meta().namespace.clone().unwrap()
249 }
250
251 pub fn create_service_account(&self) -> bool {
252 self.spec.service_account_name.is_none()
253 }
254
255 pub fn service_account_name(&self) -> String {
256 self.spec
257 .service_account_name
258 .clone()
259 .unwrap_or_else(|| self.name_unchecked())
260 }
261
262 pub fn role_name(&self) -> String {
263 self.name_unchecked()
264 }
265
266 pub fn role_binding_name(&self) -> String {
267 self.name_unchecked()
268 }
269
270 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
271 self.name_prefixed(&format!("environmentd-{generation}"))
272 }
273
274 pub fn environmentd_app_name(&self) -> String {
275 "environmentd".to_owned()
276 }
277
278 pub fn environmentd_service_name(&self) -> String {
279 self.name_prefixed("environmentd")
280 }
281
282 pub fn environmentd_service_internal_fqdn(&self) -> String {
283 format!(
284 "{}.{}.svc.cluster.local",
285 self.environmentd_service_name(),
286 self.meta().namespace.as_ref().unwrap()
287 )
288 }
289
290 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
291 self.name_prefixed(&format!("environmentd-{generation}"))
292 }
293
294 pub fn balancerd_app_name(&self) -> String {
295 "balancerd".to_owned()
296 }
297
298 pub fn environmentd_certificate_name(&self) -> String {
299 self.name_prefixed("environmentd-external")
300 }
301
302 pub fn environmentd_certificate_secret_name(&self) -> String {
303 self.name_prefixed("environmentd-tls")
304 }
305
306 pub fn balancerd_deployment_name(&self) -> String {
307 self.name_prefixed("balancerd")
308 }
309
310 pub fn balancerd_service_name(&self) -> String {
311 self.name_prefixed("balancerd")
312 }
313
314 pub fn console_app_name(&self) -> String {
315 "console".to_owned()
316 }
317
318 pub fn balancerd_external_certificate_name(&self) -> String {
319 self.name_prefixed("balancerd-external")
320 }
321
322 pub fn balancerd_external_certificate_secret_name(&self) -> String {
323 self.name_prefixed("balancerd-external-tls")
324 }
325
326 pub fn balancerd_replicas(&self) -> i32 {
327 self.spec.balancerd_replicas.unwrap_or(2)
328 }
329
330 pub fn console_replicas(&self) -> i32 {
331 self.spec.console_replicas.unwrap_or(2)
332 }
333
334 pub fn console_configmap_name(&self) -> String {
335 self.name_prefixed("console")
336 }
337
338 pub fn console_deployment_name(&self) -> String {
339 self.name_prefixed("console")
340 }
341
342 pub fn console_service_name(&self) -> String {
343 self.name_prefixed("console")
344 }
345
346 pub fn console_external_certificate_name(&self) -> String {
347 self.name_prefixed("console-external")
348 }
349
350 pub fn console_external_certificate_secret_name(&self) -> String {
351 self.name_prefixed("console-external-tls")
352 }
353
354 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
355 self.name_prefixed(&format!("persist-pubsub-{generation}"))
356 }
357
358 pub fn listeners_configmap_name(&self, generation: u64) -> String {
359 self.name_prefixed(&format!("listeners-{generation}"))
360 }
361
362 pub fn name_prefixed(&self, suffix: &str) -> String {
363 format!("mz{}-{}", self.resource_id(), suffix)
364 }
365
366 pub fn resource_id(&self) -> &str {
367 &self.status.as_ref().unwrap().resource_id
368 }
369
370 pub fn system_parameter_configmap_name(&self) -> Option<String> {
371 self.spec.system_parameter_configmap_name.clone()
372 }
373
374 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
375 self.spec
376 .environmentd_scratch_volume_storage_requirement
377 .clone()
378 .unwrap_or_else(|| {
379 self.spec
380 .environmentd_resource_requirements
381 .as_ref()
382 .and_then(|requirements| {
383 requirements
384 .requests
385 .as_ref()
386 .or(requirements.limits.as_ref())
387 })
388 .and_then(|requirements| requirements.get("memory").cloned())
393 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
395 })
396 }
397
398 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
399 format!(
400 "{}-{}-{}-0",
401 cloud_provider, region, self.spec.environment_id,
402 )
403 }
404
405 pub fn requested_reconciliation_id(&self) -> Uuid {
406 self.spec.request_rollout
407 }
408
409 pub fn rollout_requested(&self) -> bool {
410 self.requested_reconciliation_id()
411 != self
412 .status
413 .as_ref()
414 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
415 }
416
417 pub fn active_environmentd_image_ref(&self) -> &str {
425 self.status
426 .as_ref()
427 .and_then(|s| s.last_completed_rollout_environmentd_image_ref.as_deref())
428 .unwrap_or(&self.spec.environmentd_image_ref)
429 }
430
431 pub fn set_force_promote(&mut self) {
432 self.spec.force_promote = self.spec.request_rollout;
433 }
434
435 pub fn should_force_promote(&self) -> bool {
436 self.spec.force_promote == self.spec.request_rollout
437 || self.spec.rollout_strategy
438 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
439 }
440
441 pub fn conditions_need_update(&self) -> bool {
442 let Some(status) = self.status.as_ref() else {
443 return true;
444 };
445 if status.conditions.is_empty() {
446 return true;
447 }
448 for condition in &status.conditions {
449 if condition.observed_generation != self.meta().generation {
450 return true;
451 }
452 }
453 false
454 }
455
456 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
457 let Some(status) = self.status.as_ref() else {
458 return false;
459 };
460 if status.conditions.is_empty() {
461 return false;
462 }
463 status
464 .conditions
465 .iter()
466 .any(|condition| condition.reason == "ReadyToPromote")
467 && &status.resources_hash == resources_hash
468 }
469
470 pub fn is_promoting(&self) -> bool {
471 let Some(status) = self.status.as_ref() else {
472 return false;
473 };
474 if status.conditions.is_empty() {
475 return false;
476 }
477 status
478 .conditions
479 .iter()
480 .any(|condition| condition.reason == "Promoting")
481 }
482
483 pub fn update_in_progress(&self) -> bool {
484 let Some(status) = self.status.as_ref() else {
485 return false;
486 };
487 if status.conditions.is_empty() {
488 return false;
489 }
490 for condition in &status.conditions {
491 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
492 return true;
493 }
494 }
495 false
496 }
497
498 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
502 let version = parse_image_ref(&self.spec.environmentd_image_ref);
503 match version {
504 Some(version) => version.cmp_precedence(minimum).is_ge(),
506 None => {
512 tracing::warn!(
513 image_ref = %self.spec.environmentd_image_ref,
514 "failed to parse image ref",
515 );
516 true
517 }
518 }
519 }
520
521 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
525 if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
530 return false;
531 }
532
533 if active_version.major == 0 {
534 if next_version.major != active_version.major {
535 if next_version.major == 26 {
536 return (active_version.minor == 147 && active_version.patch >= 20)
540 || active_version.minor >= 164;
541 } else {
542 return false;
543 }
544 }
545 if next_version.minor == 147 && active_version.minor == 130 {
547 return true;
548 }
549 return next_version.minor <= active_version.minor + 1;
551 } else if active_version.major >= 26 {
552 return next_version.major <= active_version.major + 1;
554 }
555
556 true
557 }
558
559 pub fn within_upgrade_window(&self) -> bool {
562 let active_environmentd_version = self
563 .status
564 .as_ref()
565 .and_then(|status| {
566 status
567 .last_completed_rollout_environmentd_image_ref
568 .as_ref()
569 })
570 .and_then(|image_ref| parse_image_ref(image_ref));
571
572 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
573 parse_image_ref(&self.spec.environmentd_image_ref),
574 active_environmentd_version,
575 ) {
576 Self::is_valid_upgrade_version(
577 &active_environmentd_version,
578 &next_environmentd_version,
579 )
580 } else {
581 true
584 }
585 }
586
587 pub fn status(&self) -> MaterializeStatus {
588 self.status.clone().unwrap_or_else(|| {
589 let mut status = MaterializeStatus::default();
590
591 status.resource_id = new_resource_id();
592
593 if let Some(last_active_generation) = self
598 .annotations()
599 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
600 {
601 status.active_generation = last_active_generation
602 .parse()
603 .expect("valid int generation");
604 }
605
606 status.last_completed_rollout_environmentd_image_ref =
609 Some(self.spec.environmentd_image_ref.clone());
610
611 status
612 })
613 }
614 }
615
616 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
617 #[serde(rename_all = "camelCase")]
618 pub struct MaterializeStatus {
619 pub resource_id: String,
621 pub active_generation: u64,
623 pub last_completed_rollout_request: Uuid,
625 pub last_completed_rollout_environmentd_image_ref: Option<String>,
629 pub resources_hash: String,
634 pub conditions: Vec<Condition>,
635 }
636
637 impl MaterializeStatus {
638 pub fn needs_update(&self, other: &Self) -> bool {
639 let now = Timestamp::now();
640 let mut a = self.clone();
641 for condition in &mut a.conditions {
642 condition.last_transition_time = Time(now);
643 }
644 let mut b = other.clone();
645 for condition in &mut b.conditions {
646 condition.last_transition_time = Time(now);
647 }
648 a != b
649 }
650 }
651
652 impl ManagedResource for Materialize {
653 fn default_labels(&self) -> BTreeMap<String, String> {
654 BTreeMap::from_iter([
655 (
656 "materialize.cloud/organization-name".to_owned(),
657 self.name_unchecked(),
658 ),
659 (
660 "materialize.cloud/organization-namespace".to_owned(),
661 self.namespace(),
662 ),
663 (
664 "materialize.cloud/mz-resource-id".to_owned(),
665 self.resource_id().to_owned(),
666 ),
667 ])
668 }
669
670 fn app_name(&self) -> Option<&str> {
671 Some("environmentd")
672 }
673 }
674}
675
676fn parse_image_ref(image_ref: &str) -> Option<Version> {
677 image_ref
678 .rsplit_once(':')
679 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
680 .and_then(|tag| {
681 let tag = tag.replace("--", "+");
686 Version::parse(&tag).ok()
687 })
688}
689
690#[cfg(test)]
691mod tests {
692 use kube::core::ObjectMeta;
693 use semver::Version;
694
695 use super::v1alpha1::{Materialize, MaterializeSpec};
696
697 #[mz_ore::test]
698 fn meets_minimum_version() {
699 let mut mz = Materialize {
700 spec: MaterializeSpec {
701 environmentd_image_ref:
702 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
703 .to_owned(),
704 ..Default::default()
705 },
706 metadata: ObjectMeta {
707 ..Default::default()
708 },
709 status: None,
710 };
711
712 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
714 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
715 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
716 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
717 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
718 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
719 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
720 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
721 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
722 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
723 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
724 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
725 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
726 mz.spec.environmentd_image_ref =
727 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
728 .to_owned();
729 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
730
731 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
733 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
734 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
735 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
736 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
737 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
738 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
739 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
740 }
741
742 #[mz_ore::test]
743 fn within_upgrade_window() {
744 use super::v1alpha1::MaterializeStatus;
745
746 let mut mz = Materialize {
747 spec: MaterializeSpec {
748 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
749 ..Default::default()
750 },
751 metadata: ObjectMeta {
752 ..Default::default()
753 },
754 status: Some(MaterializeStatus {
755 last_completed_rollout_environmentd_image_ref: Some(
756 "materialize/environmentd:v26.0.0".to_owned(),
757 ),
758 ..Default::default()
759 }),
760 };
761
762 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
764 assert!(mz.within_upgrade_window());
765
766 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
768 assert!(mz.within_upgrade_window());
769
770 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
772 assert!(!mz.within_upgrade_window());
773
774 mz.spec.environmentd_image_ref =
776 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
777 assert!(mz.within_upgrade_window());
778
779 mz.status
781 .as_mut()
782 .unwrap()
783 .last_completed_rollout_environmentd_image_ref =
784 Some("materialize/environmentd:v0.147.20".to_owned());
785 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
786 assert!(mz.within_upgrade_window());
787
788 mz.status
790 .as_mut()
791 .unwrap()
792 .last_completed_rollout_environmentd_image_ref =
793 Some("materialize/environmentd:v26.11.0-dev.0+b".to_owned());
794 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.11.0-dev.0+a".to_owned();
795 assert!(mz.within_upgrade_window());
796 }
797
798 #[mz_ore::test]
799 fn is_valid_upgrade_version() {
800 let success_tests = [
801 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
802 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
803 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
804 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
805 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
806 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
807 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
808 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
810 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
811 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
812 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
813 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
814 ];
815 for (active_version, next_version) in success_tests {
816 assert!(
817 Materialize::is_valid_upgrade_version(&active_version, &next_version),
818 "v{active_version} can upgrade to v{next_version}"
819 );
820 }
821
822 let failure_tests = [
823 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
824 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
825 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
826 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
827 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
828 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
829 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
831 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
833 ];
834 for (active_version, next_version) in failure_tests {
835 assert!(
836 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
837 "v{active_version} can't upgrade to v{next_version}"
838 );
839 }
840 }
841
842 #[mz_ore::test]
843 fn active_environmentd_image_ref() {
844 use super::v1alpha1::MaterializeStatus;
845
846 const OLD: &str = "materialize/environmentd:v26.0.0";
847 const NEW: &str = "materialize/environmentd:v27.0.0";
848
849 let mz_with = |spec_image: &str, status: Option<MaterializeStatus>| Materialize {
850 spec: MaterializeSpec {
851 environmentd_image_ref: spec_image.to_owned(),
852 ..Default::default()
853 },
854 metadata: ObjectMeta::default(),
855 status,
856 };
857
858 let mz = mz_with(NEW, None);
860 assert_eq!(mz.active_environmentd_image_ref(), NEW);
861
862 let mz = mz_with(
866 NEW,
867 Some(MaterializeStatus {
868 last_completed_rollout_environmentd_image_ref: None,
869 ..Default::default()
870 }),
871 );
872 assert_eq!(mz.active_environmentd_image_ref(), NEW);
873
874 let mz = mz_with(
877 NEW,
878 Some(MaterializeStatus {
879 last_completed_rollout_environmentd_image_ref: Some(NEW.to_owned()),
880 ..Default::default()
881 }),
882 );
883 assert_eq!(mz.active_environmentd_image_ref(), NEW);
884
885 let mz = mz_with(
894 NEW,
895 Some(MaterializeStatus {
896 last_completed_rollout_environmentd_image_ref: Some(OLD.to_owned()),
897 ..Default::default()
898 }),
899 );
900 assert_eq!(mz.active_environmentd_image_ref(), OLD);
901 }
902}