1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18 jiff::Timestamp,
19};
20use kube::{CustomResource, Resource, ResourceExt};
21use schemars::JsonSchema;
22use semver::Version;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
27use mz_server_core::listeners::AuthenticatorKind;
28
29pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
30 "materialize.cloud/last-known-active-generation";
31
32pub mod v1alpha1 {
33 use super::*;
34
35 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36 pub enum MaterializeRolloutStrategy {
37 #[default]
41 WaitUntilReady,
42
43 ManuallyPromote,
65
66 ImmediatelyPromoteCausingDowntime,
77 }
78
79 #[derive(
80 CustomResource,
81 Clone,
82 Debug,
83 Default,
84 PartialEq,
85 Deserialize,
86 Serialize,
87 JsonSchema
88 )]
89 #[serde(rename_all = "camelCase")]
90 #[kube(
91 namespaced,
92 group = "materialize.cloud",
93 version = "v1alpha1",
94 kind = "Materialize",
95 singular = "materialize",
96 plural = "materializes",
97 shortname = "mzs",
98 status = "MaterializeStatus",
99 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
100 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
101 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
102 )]
103 pub struct MaterializeSpec {
104 pub environmentd_image_ref: String,
106 pub environmentd_extra_args: Option<Vec<String>>,
108 pub environmentd_extra_env: Option<Vec<EnvVar>>,
110 #[kube(deprecated)]
119 pub environmentd_iam_role_arn: Option<String>,
120 pub environmentd_connection_role_arn: Option<String>,
123 pub environmentd_resource_requirements: Option<ResourceRequirements>,
125 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
127 pub balancerd_resource_requirements: Option<ResourceRequirements>,
129 pub console_resource_requirements: Option<ResourceRequirements>,
131 pub balancerd_replicas: Option<i32>,
133 pub console_replicas: Option<i32>,
135
136 pub service_account_name: Option<String>,
139 pub service_account_annotations: Option<BTreeMap<String, String>>,
146 pub service_account_labels: Option<BTreeMap<String, String>>,
148 pub pod_annotations: Option<BTreeMap<String, String>>,
150 pub pod_labels: Option<BTreeMap<String, String>>,
152
153 #[serde(default)]
165 pub request_rollout: Uuid,
166 #[serde(default)]
171 pub force_promote: Uuid,
172 #[serde(default)]
179 pub force_rollout: Uuid,
180 #[kube(deprecated)]
184 #[serde(default)]
185 pub in_place_rollout: bool,
186 #[serde(default)]
188 pub rollout_strategy: MaterializeRolloutStrategy,
189 pub backend_secret_name: String,
193 #[serde(default)]
195 pub authenticator_kind: AuthenticatorKind,
196 #[serde(default)]
198 pub enable_rbac: bool,
199
200 #[serde(default)]
207 pub environment_id: Uuid,
208
209 pub system_parameter_configmap_name: Option<String>,
224
225 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
229 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
234 pub internal_certificate_spec: Option<MaterializeCertSpec>,
239 }
240
241 impl Materialize {
242 pub fn backend_secret_name(&self) -> String {
243 self.spec.backend_secret_name.clone()
244 }
245
246 pub fn namespace(&self) -> String {
247 self.meta().namespace.clone().unwrap()
248 }
249
250 pub fn create_service_account(&self) -> bool {
251 self.spec.service_account_name.is_none()
252 }
253
254 pub fn service_account_name(&self) -> String {
255 self.spec
256 .service_account_name
257 .clone()
258 .unwrap_or_else(|| self.name_unchecked())
259 }
260
261 pub fn role_name(&self) -> String {
262 self.name_unchecked()
263 }
264
265 pub fn role_binding_name(&self) -> String {
266 self.name_unchecked()
267 }
268
269 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
270 self.name_prefixed(&format!("environmentd-{generation}"))
271 }
272
273 pub fn environmentd_app_name(&self) -> String {
274 "environmentd".to_owned()
275 }
276
277 pub fn environmentd_service_name(&self) -> String {
278 self.name_prefixed("environmentd")
279 }
280
281 pub fn environmentd_service_internal_fqdn(&self) -> String {
282 format!(
283 "{}.{}.svc.cluster.local",
284 self.environmentd_service_name(),
285 self.meta().namespace.as_ref().unwrap()
286 )
287 }
288
289 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
290 self.name_prefixed(&format!("environmentd-{generation}"))
291 }
292
293 pub fn balancerd_app_name(&self) -> String {
294 "balancerd".to_owned()
295 }
296
297 pub fn environmentd_certificate_name(&self) -> String {
298 self.name_prefixed("environmentd-external")
299 }
300
301 pub fn environmentd_certificate_secret_name(&self) -> String {
302 self.name_prefixed("environmentd-tls")
303 }
304
305 pub fn balancerd_deployment_name(&self) -> String {
306 self.name_prefixed("balancerd")
307 }
308
309 pub fn balancerd_service_name(&self) -> String {
310 self.name_prefixed("balancerd")
311 }
312
313 pub fn console_app_name(&self) -> String {
314 "console".to_owned()
315 }
316
317 pub fn balancerd_external_certificate_name(&self) -> String {
318 self.name_prefixed("balancerd-external")
319 }
320
321 pub fn balancerd_external_certificate_secret_name(&self) -> String {
322 self.name_prefixed("balancerd-external-tls")
323 }
324
325 pub fn balancerd_replicas(&self) -> i32 {
326 self.spec.balancerd_replicas.unwrap_or(2)
327 }
328
329 pub fn console_replicas(&self) -> i32 {
330 self.spec.console_replicas.unwrap_or(2)
331 }
332
333 pub fn console_configmap_name(&self) -> String {
334 self.name_prefixed("console")
335 }
336
337 pub fn console_deployment_name(&self) -> String {
338 self.name_prefixed("console")
339 }
340
341 pub fn console_service_name(&self) -> String {
342 self.name_prefixed("console")
343 }
344
345 pub fn console_external_certificate_name(&self) -> String {
346 self.name_prefixed("console-external")
347 }
348
349 pub fn console_external_certificate_secret_name(&self) -> String {
350 self.name_prefixed("console-external-tls")
351 }
352
353 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
354 self.name_prefixed(&format!("persist-pubsub-{generation}"))
355 }
356
357 pub fn listeners_configmap_name(&self, generation: u64) -> String {
358 self.name_prefixed(&format!("listeners-{generation}"))
359 }
360
361 pub fn name_prefixed(&self, suffix: &str) -> String {
362 format!("mz{}-{}", self.resource_id(), suffix)
363 }
364
365 pub fn resource_id(&self) -> &str {
366 &self.status.as_ref().unwrap().resource_id
367 }
368
369 pub fn system_parameter_configmap_name(&self) -> Option<String> {
370 self.spec.system_parameter_configmap_name.clone()
371 }
372
373 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
374 self.spec
375 .environmentd_scratch_volume_storage_requirement
376 .clone()
377 .unwrap_or_else(|| {
378 self.spec
379 .environmentd_resource_requirements
380 .as_ref()
381 .and_then(|requirements| {
382 requirements
383 .requests
384 .as_ref()
385 .or(requirements.limits.as_ref())
386 })
387 .and_then(|requirements| requirements.get("memory").cloned())
392 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
394 })
395 }
396
397 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
398 format!(
399 "{}-{}-{}-0",
400 cloud_provider, region, self.spec.environment_id,
401 )
402 }
403
404 pub fn requested_reconciliation_id(&self) -> Uuid {
405 self.spec.request_rollout
406 }
407
408 pub fn rollout_requested(&self) -> bool {
409 self.requested_reconciliation_id()
410 != self
411 .status
412 .as_ref()
413 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
414 }
415
416 pub fn set_force_promote(&mut self) {
417 self.spec.force_promote = self.spec.request_rollout;
418 }
419
420 pub fn should_force_promote(&self) -> bool {
421 self.spec.force_promote == self.spec.request_rollout
422 || self.spec.rollout_strategy
423 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
424 }
425
426 pub fn conditions_need_update(&self) -> bool {
427 let Some(status) = self.status.as_ref() else {
428 return true;
429 };
430 if status.conditions.is_empty() {
431 return true;
432 }
433 for condition in &status.conditions {
434 if condition.observed_generation != self.meta().generation {
435 return true;
436 }
437 }
438 false
439 }
440
441 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
442 let Some(status) = self.status.as_ref() else {
443 return false;
444 };
445 if status.conditions.is_empty() {
446 return false;
447 }
448 status
449 .conditions
450 .iter()
451 .any(|condition| condition.reason == "ReadyToPromote")
452 && &status.resources_hash == resources_hash
453 }
454
455 pub fn is_promoting(&self) -> bool {
456 let Some(status) = self.status.as_ref() else {
457 return false;
458 };
459 if status.conditions.is_empty() {
460 return false;
461 }
462 status
463 .conditions
464 .iter()
465 .any(|condition| condition.reason == "Promoting")
466 }
467
468 pub fn update_in_progress(&self) -> bool {
469 let Some(status) = self.status.as_ref() else {
470 return false;
471 };
472 if status.conditions.is_empty() {
473 return false;
474 }
475 for condition in &status.conditions {
476 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
477 return true;
478 }
479 }
480 false
481 }
482
483 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
487 let version = parse_image_ref(&self.spec.environmentd_image_ref);
488 match version {
489 Some(version) => version.cmp_precedence(minimum).is_ge(),
491 None => {
497 tracing::warn!(
498 image_ref = %self.spec.environmentd_image_ref,
499 "failed to parse image ref",
500 );
501 true
502 }
503 }
504 }
505
506 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
510 if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
515 return false;
516 }
517
518 if active_version.major == 0 {
519 if next_version.major != active_version.major {
520 if next_version.major == 26 {
521 return (active_version.minor == 147 && active_version.patch >= 20)
525 || active_version.minor >= 164;
526 } else {
527 return false;
528 }
529 }
530 if next_version.minor == 147 && active_version.minor == 130 {
532 return true;
533 }
534 return next_version.minor <= active_version.minor + 1;
536 } else if active_version.major >= 26 {
537 return next_version.major <= active_version.major + 1;
539 }
540
541 true
542 }
543
544 pub fn within_upgrade_window(&self) -> bool {
547 let active_environmentd_version = self
548 .status
549 .as_ref()
550 .and_then(|status| {
551 status
552 .last_completed_rollout_environmentd_image_ref
553 .as_ref()
554 })
555 .and_then(|image_ref| parse_image_ref(image_ref));
556
557 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
558 parse_image_ref(&self.spec.environmentd_image_ref),
559 active_environmentd_version,
560 ) {
561 Self::is_valid_upgrade_version(
562 &active_environmentd_version,
563 &next_environmentd_version,
564 )
565 } else {
566 true
569 }
570 }
571
572 pub fn status(&self) -> MaterializeStatus {
573 self.status.clone().unwrap_or_else(|| {
574 let mut status = MaterializeStatus::default();
575
576 status.resource_id = new_resource_id();
577
578 if let Some(last_active_generation) = self
583 .annotations()
584 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
585 {
586 status.active_generation = last_active_generation
587 .parse()
588 .expect("valid int generation");
589 }
590
591 status.last_completed_rollout_environmentd_image_ref =
594 Some(self.spec.environmentd_image_ref.clone());
595
596 status
597 })
598 }
599 }
600
601 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
602 #[serde(rename_all = "camelCase")]
603 pub struct MaterializeStatus {
604 pub resource_id: String,
606 pub active_generation: u64,
608 pub last_completed_rollout_request: Uuid,
610 pub last_completed_rollout_environmentd_image_ref: Option<String>,
614 pub resources_hash: String,
619 pub conditions: Vec<Condition>,
620 }
621
622 impl MaterializeStatus {
623 pub fn needs_update(&self, other: &Self) -> bool {
624 let now = Timestamp::now();
625 let mut a = self.clone();
626 for condition in &mut a.conditions {
627 condition.last_transition_time = Time(now);
628 }
629 let mut b = other.clone();
630 for condition in &mut b.conditions {
631 condition.last_transition_time = Time(now);
632 }
633 a != b
634 }
635 }
636
637 impl ManagedResource for Materialize {
638 fn default_labels(&self) -> BTreeMap<String, String> {
639 BTreeMap::from_iter([
640 (
641 "materialize.cloud/organization-name".to_owned(),
642 self.name_unchecked(),
643 ),
644 (
645 "materialize.cloud/organization-namespace".to_owned(),
646 self.namespace(),
647 ),
648 (
649 "materialize.cloud/mz-resource-id".to_owned(),
650 self.resource_id().to_owned(),
651 ),
652 ])
653 }
654 }
655}
656
657fn parse_image_ref(image_ref: &str) -> Option<Version> {
658 image_ref
659 .rsplit_once(':')
660 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
661 .and_then(|tag| {
662 let tag = tag.replace("--", "+");
667 Version::parse(&tag).ok()
668 })
669}
670
671#[cfg(test)]
672mod tests {
673 use kube::core::ObjectMeta;
674 use semver::Version;
675
676 use super::v1alpha1::{Materialize, MaterializeSpec};
677
678 #[mz_ore::test]
679 fn meets_minimum_version() {
680 let mut mz = Materialize {
681 spec: MaterializeSpec {
682 environmentd_image_ref:
683 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
684 .to_owned(),
685 ..Default::default()
686 },
687 metadata: ObjectMeta {
688 ..Default::default()
689 },
690 status: None,
691 };
692
693 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
695 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
696 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
697 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
698 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
699 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
700 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
701 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
702 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
703 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
704 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
706 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
707 mz.spec.environmentd_image_ref =
708 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
709 .to_owned();
710 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
711
712 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
714 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
715 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
716 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
717 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
718 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
719 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
720 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
721 }
722
723 #[mz_ore::test]
724 fn within_upgrade_window() {
725 use super::v1alpha1::MaterializeStatus;
726
727 let mut mz = Materialize {
728 spec: MaterializeSpec {
729 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
730 ..Default::default()
731 },
732 metadata: ObjectMeta {
733 ..Default::default()
734 },
735 status: Some(MaterializeStatus {
736 last_completed_rollout_environmentd_image_ref: Some(
737 "materialize/environmentd:v26.0.0".to_owned(),
738 ),
739 ..Default::default()
740 }),
741 };
742
743 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
745 assert!(mz.within_upgrade_window());
746
747 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
749 assert!(mz.within_upgrade_window());
750
751 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
753 assert!(!mz.within_upgrade_window());
754
755 mz.spec.environmentd_image_ref =
757 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
758 assert!(mz.within_upgrade_window());
759
760 mz.status
762 .as_mut()
763 .unwrap()
764 .last_completed_rollout_environmentd_image_ref =
765 Some("materialize/environmentd:v0.147.20".to_owned());
766 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
767 assert!(mz.within_upgrade_window());
768
769 mz.status
771 .as_mut()
772 .unwrap()
773 .last_completed_rollout_environmentd_image_ref =
774 Some("materialize/environmentd:v26.11.0-dev.0+b".to_owned());
775 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.11.0-dev.0+a".to_owned();
776 assert!(mz.within_upgrade_window());
777 }
778
779 #[mz_ore::test]
780 fn is_valid_upgrade_version() {
781 let success_tests = [
782 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
783 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
784 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
785 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
786 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
787 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
788 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
789 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
791 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
792 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
793 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
794 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
795 ];
796 for (active_version, next_version) in success_tests {
797 assert!(
798 Materialize::is_valid_upgrade_version(&active_version, &next_version),
799 "v{active_version} can upgrade to v{next_version}"
800 );
801 }
802
803 let failure_tests = [
804 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
805 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
806 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
807 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
808 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
809 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
810 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
812 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
814 ];
815 for (active_version, next_version) in failure_tests {
816 assert!(
817 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
818 "v{active_version} can't upgrade to v{next_version}"
819 );
820 }
821 }
822}