1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29 "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32 use super::*;
33
34 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35 pub enum MaterializeRolloutStrategy {
36 #[default]
40 WaitUntilReady,
41
42 ManuallyPromote,
58
59 ImmediatelyPromoteCausingDowntime,
70 }
71
72 #[derive(
73 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
74 )]
75 #[serde(rename_all = "camelCase")]
76 #[kube(
77 namespaced,
78 group = "materialize.cloud",
79 version = "v1alpha1",
80 kind = "Materialize",
81 singular = "materialize",
82 plural = "materializes",
83 shortname = "mzs",
84 status = "MaterializeStatus",
85 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
86 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
87 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
88 )]
89 pub struct MaterializeSpec {
90 pub environmentd_image_ref: String,
92 pub environmentd_extra_args: Option<Vec<String>>,
94 pub environmentd_extra_env: Option<Vec<EnvVar>>,
96 #[kube(deprecated)]
105 pub environmentd_iam_role_arn: Option<String>,
106 pub environmentd_connection_role_arn: Option<String>,
109 pub environmentd_resource_requirements: Option<ResourceRequirements>,
111 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
113 pub balancerd_resource_requirements: Option<ResourceRequirements>,
115 pub console_resource_requirements: Option<ResourceRequirements>,
117 pub balancerd_replicas: Option<i32>,
119 pub console_replicas: Option<i32>,
121
122 pub service_account_name: Option<String>,
125 pub service_account_annotations: Option<BTreeMap<String, String>>,
132 pub service_account_labels: Option<BTreeMap<String, String>>,
134 pub pod_annotations: Option<BTreeMap<String, String>>,
136 pub pod_labels: Option<BTreeMap<String, String>>,
138
139 #[serde(default)]
151 pub request_rollout: Uuid,
152 #[serde(default)]
157 pub force_promote: Uuid,
158 #[serde(default)]
165 pub force_rollout: Uuid,
166 #[kube(deprecated)]
170 #[serde(default)]
171 pub in_place_rollout: bool,
172 #[serde(default)]
174 pub rollout_strategy: MaterializeRolloutStrategy,
175 pub backend_secret_name: String,
179 #[serde(default)]
181 pub authenticator_kind: AuthenticatorKind,
182 #[serde(default)]
184 pub enable_rbac: bool,
185
186 #[serde(default)]
193 pub environment_id: Uuid,
194
195 pub system_parameter_configmap_name: Option<String>,
210
211 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
215 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
220 pub internal_certificate_spec: Option<MaterializeCertSpec>,
225 }
226
227 impl Materialize {
228 pub fn backend_secret_name(&self) -> String {
229 self.spec.backend_secret_name.clone()
230 }
231
232 pub fn namespace(&self) -> String {
233 self.meta().namespace.clone().unwrap()
234 }
235
236 pub fn create_service_account(&self) -> bool {
237 self.spec.service_account_name.is_none()
238 }
239
240 pub fn service_account_name(&self) -> String {
241 self.spec
242 .service_account_name
243 .clone()
244 .unwrap_or_else(|| self.name_unchecked())
245 }
246
247 pub fn role_name(&self) -> String {
248 self.name_unchecked()
249 }
250
251 pub fn role_binding_name(&self) -> String {
252 self.name_unchecked()
253 }
254
255 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
256 self.name_prefixed(&format!("environmentd-{generation}"))
257 }
258
259 pub fn environmentd_app_name(&self) -> String {
260 "environmentd".to_owned()
261 }
262
263 pub fn environmentd_service_name(&self) -> String {
264 self.name_prefixed("environmentd")
265 }
266
267 pub fn environmentd_service_internal_fqdn(&self) -> String {
268 format!(
269 "{}.{}.svc.cluster.local",
270 self.environmentd_service_name(),
271 self.meta().namespace.as_ref().unwrap()
272 )
273 }
274
275 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
276 self.name_prefixed(&format!("environmentd-{generation}"))
277 }
278
279 pub fn balancerd_app_name(&self) -> String {
280 "balancerd".to_owned()
281 }
282
283 pub fn environmentd_certificate_name(&self) -> String {
284 self.name_prefixed("environmentd-external")
285 }
286
287 pub fn environmentd_certificate_secret_name(&self) -> String {
288 self.name_prefixed("environmentd-tls")
289 }
290
291 pub fn balancerd_deployment_name(&self) -> String {
292 self.name_prefixed("balancerd")
293 }
294
295 pub fn balancerd_service_name(&self) -> String {
296 self.name_prefixed("balancerd")
297 }
298
299 pub fn console_app_name(&self) -> String {
300 "console".to_owned()
301 }
302
303 pub fn balancerd_external_certificate_name(&self) -> String {
304 self.name_prefixed("balancerd-external")
305 }
306
307 pub fn balancerd_external_certificate_secret_name(&self) -> String {
308 self.name_prefixed("balancerd-external-tls")
309 }
310
311 pub fn balancerd_replicas(&self) -> i32 {
312 self.spec.balancerd_replicas.unwrap_or(2)
313 }
314
315 pub fn console_replicas(&self) -> i32 {
316 self.spec.console_replicas.unwrap_or(2)
317 }
318
319 pub fn console_configmap_name(&self) -> String {
320 self.name_prefixed("console")
321 }
322
323 pub fn console_deployment_name(&self) -> String {
324 self.name_prefixed("console")
325 }
326
327 pub fn console_service_name(&self) -> String {
328 self.name_prefixed("console")
329 }
330
331 pub fn console_external_certificate_name(&self) -> String {
332 self.name_prefixed("console-external")
333 }
334
335 pub fn console_external_certificate_secret_name(&self) -> String {
336 self.name_prefixed("console-external-tls")
337 }
338
339 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
340 self.name_prefixed(&format!("persist-pubsub-{generation}"))
341 }
342
343 pub fn listeners_configmap_name(&self, generation: u64) -> String {
344 self.name_prefixed(&format!("listeners-{generation}"))
345 }
346
347 pub fn name_prefixed(&self, suffix: &str) -> String {
348 format!("mz{}-{}", self.resource_id(), suffix)
349 }
350
351 pub fn resource_id(&self) -> &str {
352 &self.status.as_ref().unwrap().resource_id
353 }
354
355 pub fn system_parameter_configmap_name(&self) -> Option<String> {
356 self.spec.system_parameter_configmap_name.clone()
357 }
358
359 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
360 self.spec
361 .environmentd_scratch_volume_storage_requirement
362 .clone()
363 .unwrap_or_else(|| {
364 self.spec
365 .environmentd_resource_requirements
366 .as_ref()
367 .and_then(|requirements| {
368 requirements
369 .requests
370 .as_ref()
371 .or(requirements.limits.as_ref())
372 })
373 .and_then(|requirements| requirements.get("memory").cloned())
378 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
380 })
381 }
382
383 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
384 format!(
385 "{}-{}-{}-0",
386 cloud_provider, region, self.spec.environment_id,
387 )
388 }
389
390 pub fn requested_reconciliation_id(&self) -> Uuid {
391 self.spec.request_rollout
392 }
393
394 pub fn rollout_requested(&self) -> bool {
395 self.requested_reconciliation_id()
396 != self
397 .status
398 .as_ref()
399 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
400 }
401
402 pub fn set_force_promote(&mut self) {
403 self.spec.force_promote = self.spec.request_rollout;
404 }
405
406 pub fn should_force_promote(&self) -> bool {
407 self.spec.force_promote == self.spec.request_rollout
408 || self.spec.rollout_strategy
409 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
410 }
411
412 pub fn conditions_need_update(&self) -> bool {
413 let Some(status) = self.status.as_ref() else {
414 return true;
415 };
416 if status.conditions.is_empty() {
417 return true;
418 }
419 for condition in &status.conditions {
420 if condition.observed_generation != self.meta().generation {
421 return true;
422 }
423 }
424 false
425 }
426
427 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
428 let Some(status) = self.status.as_ref() else {
429 return false;
430 };
431 if status.conditions.is_empty() {
432 return false;
433 }
434 status
435 .conditions
436 .iter()
437 .any(|condition| condition.reason == "ReadyToPromote")
438 && &status.resources_hash == resources_hash
439 }
440
441 pub fn is_promoting(&self) -> bool {
442 let Some(status) = self.status.as_ref() else {
443 return false;
444 };
445 if status.conditions.is_empty() {
446 return false;
447 }
448 status
449 .conditions
450 .iter()
451 .any(|condition| condition.reason == "Promoting")
452 }
453
454 pub fn update_in_progress(&self) -> bool {
455 let Some(status) = self.status.as_ref() else {
456 return false;
457 };
458 if status.conditions.is_empty() {
459 return false;
460 }
461 for condition in &status.conditions {
462 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
463 return true;
464 }
465 }
466 false
467 }
468
469 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
473 let version = parse_image_ref(&self.spec.environmentd_image_ref);
474 match version {
475 Some(version) => &version >= minimum,
476 None => {
482 tracing::warn!(
483 image_ref = %self.spec.environmentd_image_ref,
484 "failed to parse image ref",
485 );
486 true
487 }
488 }
489 }
490
491 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
495 if next_version < active_version {
499 return false;
500 }
501
502 if active_version.major == 0 {
503 if next_version.major != active_version.major {
504 if next_version.major == 26 {
505 return (active_version.minor == 147 && active_version.patch >= 20)
509 || active_version.minor >= 164;
510 } else {
511 return false;
512 }
513 }
514 if next_version.minor == 147 && active_version.minor == 130 {
516 return true;
517 }
518 return next_version.minor <= active_version.minor + 1;
520 } else if active_version.major >= 26 {
521 return next_version.major <= active_version.major + 1;
523 }
524
525 true
526 }
527
528 pub fn within_upgrade_window(&self) -> bool {
531 let active_environmentd_version = self
532 .status
533 .as_ref()
534 .and_then(|status| {
535 status
536 .last_completed_rollout_environmentd_image_ref
537 .as_ref()
538 })
539 .and_then(|image_ref| parse_image_ref(image_ref));
540
541 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
542 parse_image_ref(&self.spec.environmentd_image_ref),
543 active_environmentd_version,
544 ) {
545 Self::is_valid_upgrade_version(
546 &active_environmentd_version,
547 &next_environmentd_version,
548 )
549 } else {
550 true
553 }
554 }
555
556 pub fn status(&self) -> MaterializeStatus {
557 self.status.clone().unwrap_or_else(|| {
558 let mut status = MaterializeStatus::default();
559
560 status.resource_id = new_resource_id();
561
562 if let Some(last_active_generation) = self
567 .annotations()
568 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
569 {
570 status.active_generation = last_active_generation
571 .parse()
572 .expect("valid int generation");
573 }
574
575 status.last_completed_rollout_environmentd_image_ref =
578 Some(self.spec.environmentd_image_ref.clone());
579
580 status
581 })
582 }
583 }
584
585 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
586 #[serde(rename_all = "camelCase")]
587 pub struct MaterializeStatus {
588 pub resource_id: String,
590 pub active_generation: u64,
592 pub last_completed_rollout_request: Uuid,
594 pub last_completed_rollout_environmentd_image_ref: Option<String>,
598 pub resources_hash: String,
603 pub conditions: Vec<Condition>,
604 }
605
606 impl MaterializeStatus {
607 pub fn needs_update(&self, other: &Self) -> bool {
608 let now = chrono::offset::Utc::now();
609 let mut a = self.clone();
610 for condition in &mut a.conditions {
611 condition.last_transition_time = Time(now);
612 }
613 let mut b = other.clone();
614 for condition in &mut b.conditions {
615 condition.last_transition_time = Time(now);
616 }
617 a != b
618 }
619 }
620
621 impl ManagedResource for Materialize {
622 fn default_labels(&self) -> BTreeMap<String, String> {
623 BTreeMap::from_iter([
624 (
625 "materialize.cloud/organization-name".to_owned(),
626 self.name_unchecked(),
627 ),
628 (
629 "materialize.cloud/organization-namespace".to_owned(),
630 self.namespace(),
631 ),
632 (
633 "materialize.cloud/mz-resource-id".to_owned(),
634 self.resource_id().to_owned(),
635 ),
636 ])
637 }
638 }
639}
640
641fn parse_image_ref(image_ref: &str) -> Option<Version> {
642 image_ref
643 .rsplit_once(':')
644 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
645 .and_then(|tag| {
646 let tag = tag.replace("--", "+");
651 Version::parse(&tag).ok()
652 })
653}
654
655#[cfg(test)]
656mod tests {
657 use kube::core::ObjectMeta;
658 use semver::Version;
659
660 use super::v1alpha1::{Materialize, MaterializeSpec};
661
662 #[mz_ore::test]
663 fn meets_minimum_version() {
664 let mut mz = Materialize {
665 spec: MaterializeSpec {
666 environmentd_image_ref:
667 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
668 .to_owned(),
669 ..Default::default()
670 },
671 metadata: ObjectMeta {
672 ..Default::default()
673 },
674 status: None,
675 };
676
677 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
679 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
680 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
681 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
682 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
683 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
684 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
685 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
686 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
687 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
688 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
689 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
690 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
691 mz.spec.environmentd_image_ref =
692 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
693 .to_owned();
694 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
695
696 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
698 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
699 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
700 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
701 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
702 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
703 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
704 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705 }
706
707 #[mz_ore::test]
708 fn within_upgrade_window() {
709 use super::v1alpha1::MaterializeStatus;
710
711 let mut mz = Materialize {
712 spec: MaterializeSpec {
713 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
714 ..Default::default()
715 },
716 metadata: ObjectMeta {
717 ..Default::default()
718 },
719 status: Some(MaterializeStatus {
720 last_completed_rollout_environmentd_image_ref: Some(
721 "materialize/environmentd:v26.0.0".to_owned(),
722 ),
723 ..Default::default()
724 }),
725 };
726
727 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
729 assert!(mz.within_upgrade_window());
730
731 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
733 assert!(mz.within_upgrade_window());
734
735 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
737 assert!(!mz.within_upgrade_window());
738
739 mz.spec.environmentd_image_ref =
741 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
742 assert!(mz.within_upgrade_window());
743
744 mz.status
746 .as_mut()
747 .unwrap()
748 .last_completed_rollout_environmentd_image_ref =
749 Some("materialize/environmentd:v0.147.20".to_owned());
750 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
751 assert!(mz.within_upgrade_window());
752 }
753
754 #[mz_ore::test]
755 fn is_valid_upgrade_version() {
756 let success_tests = [
757 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
758 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
759 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
760 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
761 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
762 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
763 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
764 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
766 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
767 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
768 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
769 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
770 ];
771 for (active_version, next_version) in success_tests {
772 assert!(
773 Materialize::is_valid_upgrade_version(&active_version, &next_version),
774 "v{active_version} can upgrade to v{next_version}"
775 );
776 }
777
778 let failure_tests = [
779 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
780 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
781 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
782 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
783 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
784 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
785 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
787 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
789 ];
790 for (active_version, next_version) in failure_tests {
791 assert!(
792 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
793 "v{active_version} can't upgrade to v{next_version}"
794 );
795 }
796 }
797}