1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18 jiff::Timestamp,
19};
20use kube::{CustomResource, Resource, ResourceExt};
21use schemars::JsonSchema;
22use semver::Version;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
27use mz_server_core::listeners::AuthenticatorKind;
28
29pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
30 "materialize.cloud/last-known-active-generation";
31
32pub mod v1alpha1 {
33 use super::*;
34
35 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36 pub enum MaterializeRolloutStrategy {
37 #[default]
41 WaitUntilReady,
42
43 ManuallyPromote,
65
66 ImmediatelyPromoteCausingDowntime,
77 }
78
79 #[derive(
80 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
81 )]
82 #[serde(rename_all = "camelCase")]
83 #[kube(
84 namespaced,
85 group = "materialize.cloud",
86 version = "v1alpha1",
87 kind = "Materialize",
88 singular = "materialize",
89 plural = "materializes",
90 shortname = "mzs",
91 status = "MaterializeStatus",
92 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
93 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
94 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
95 )]
96 pub struct MaterializeSpec {
97 pub environmentd_image_ref: String,
99 pub environmentd_extra_args: Option<Vec<String>>,
101 pub environmentd_extra_env: Option<Vec<EnvVar>>,
103 #[kube(deprecated)]
112 pub environmentd_iam_role_arn: Option<String>,
113 pub environmentd_connection_role_arn: Option<String>,
116 pub environmentd_resource_requirements: Option<ResourceRequirements>,
118 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
120 pub balancerd_resource_requirements: Option<ResourceRequirements>,
122 pub console_resource_requirements: Option<ResourceRequirements>,
124 pub balancerd_replicas: Option<i32>,
126 pub console_replicas: Option<i32>,
128
129 pub service_account_name: Option<String>,
132 pub service_account_annotations: Option<BTreeMap<String, String>>,
139 pub service_account_labels: Option<BTreeMap<String, String>>,
141 pub pod_annotations: Option<BTreeMap<String, String>>,
143 pub pod_labels: Option<BTreeMap<String, String>>,
145
146 #[serde(default)]
158 pub request_rollout: Uuid,
159 #[serde(default)]
164 pub force_promote: Uuid,
165 #[serde(default)]
172 pub force_rollout: Uuid,
173 #[kube(deprecated)]
177 #[serde(default)]
178 pub in_place_rollout: bool,
179 #[serde(default)]
181 pub rollout_strategy: MaterializeRolloutStrategy,
182 pub backend_secret_name: String,
186 #[serde(default)]
188 pub authenticator_kind: AuthenticatorKind,
189 #[serde(default)]
191 pub enable_rbac: bool,
192
193 #[serde(default)]
200 pub environment_id: Uuid,
201
202 pub system_parameter_configmap_name: Option<String>,
217
218 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
222 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
227 pub internal_certificate_spec: Option<MaterializeCertSpec>,
232 }
233
234 impl Materialize {
235 pub fn backend_secret_name(&self) -> String {
236 self.spec.backend_secret_name.clone()
237 }
238
239 pub fn namespace(&self) -> String {
240 self.meta().namespace.clone().unwrap()
241 }
242
243 pub fn create_service_account(&self) -> bool {
244 self.spec.service_account_name.is_none()
245 }
246
247 pub fn service_account_name(&self) -> String {
248 self.spec
249 .service_account_name
250 .clone()
251 .unwrap_or_else(|| self.name_unchecked())
252 }
253
254 pub fn role_name(&self) -> String {
255 self.name_unchecked()
256 }
257
258 pub fn role_binding_name(&self) -> String {
259 self.name_unchecked()
260 }
261
262 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
263 self.name_prefixed(&format!("environmentd-{generation}"))
264 }
265
266 pub fn environmentd_app_name(&self) -> String {
267 "environmentd".to_owned()
268 }
269
270 pub fn environmentd_service_name(&self) -> String {
271 self.name_prefixed("environmentd")
272 }
273
274 pub fn environmentd_service_internal_fqdn(&self) -> String {
275 format!(
276 "{}.{}.svc.cluster.local",
277 self.environmentd_service_name(),
278 self.meta().namespace.as_ref().unwrap()
279 )
280 }
281
282 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
283 self.name_prefixed(&format!("environmentd-{generation}"))
284 }
285
286 pub fn balancerd_app_name(&self) -> String {
287 "balancerd".to_owned()
288 }
289
290 pub fn environmentd_certificate_name(&self) -> String {
291 self.name_prefixed("environmentd-external")
292 }
293
294 pub fn environmentd_certificate_secret_name(&self) -> String {
295 self.name_prefixed("environmentd-tls")
296 }
297
298 pub fn balancerd_deployment_name(&self) -> String {
299 self.name_prefixed("balancerd")
300 }
301
302 pub fn balancerd_service_name(&self) -> String {
303 self.name_prefixed("balancerd")
304 }
305
306 pub fn console_app_name(&self) -> String {
307 "console".to_owned()
308 }
309
310 pub fn balancerd_external_certificate_name(&self) -> String {
311 self.name_prefixed("balancerd-external")
312 }
313
314 pub fn balancerd_external_certificate_secret_name(&self) -> String {
315 self.name_prefixed("balancerd-external-tls")
316 }
317
318 pub fn balancerd_replicas(&self) -> i32 {
319 self.spec.balancerd_replicas.unwrap_or(2)
320 }
321
322 pub fn console_replicas(&self) -> i32 {
323 self.spec.console_replicas.unwrap_or(2)
324 }
325
326 pub fn console_configmap_name(&self) -> String {
327 self.name_prefixed("console")
328 }
329
330 pub fn console_deployment_name(&self) -> String {
331 self.name_prefixed("console")
332 }
333
334 pub fn console_service_name(&self) -> String {
335 self.name_prefixed("console")
336 }
337
338 pub fn console_external_certificate_name(&self) -> String {
339 self.name_prefixed("console-external")
340 }
341
342 pub fn console_external_certificate_secret_name(&self) -> String {
343 self.name_prefixed("console-external-tls")
344 }
345
346 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
347 self.name_prefixed(&format!("persist-pubsub-{generation}"))
348 }
349
350 pub fn listeners_configmap_name(&self, generation: u64) -> String {
351 self.name_prefixed(&format!("listeners-{generation}"))
352 }
353
354 pub fn name_prefixed(&self, suffix: &str) -> String {
355 format!("mz{}-{}", self.resource_id(), suffix)
356 }
357
358 pub fn resource_id(&self) -> &str {
359 &self.status.as_ref().unwrap().resource_id
360 }
361
362 pub fn system_parameter_configmap_name(&self) -> Option<String> {
363 self.spec.system_parameter_configmap_name.clone()
364 }
365
366 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
367 self.spec
368 .environmentd_scratch_volume_storage_requirement
369 .clone()
370 .unwrap_or_else(|| {
371 self.spec
372 .environmentd_resource_requirements
373 .as_ref()
374 .and_then(|requirements| {
375 requirements
376 .requests
377 .as_ref()
378 .or(requirements.limits.as_ref())
379 })
380 .and_then(|requirements| requirements.get("memory").cloned())
385 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
387 })
388 }
389
390 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
391 format!(
392 "{}-{}-{}-0",
393 cloud_provider, region, self.spec.environment_id,
394 )
395 }
396
397 pub fn requested_reconciliation_id(&self) -> Uuid {
398 self.spec.request_rollout
399 }
400
401 pub fn rollout_requested(&self) -> bool {
402 self.requested_reconciliation_id()
403 != self
404 .status
405 .as_ref()
406 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
407 }
408
409 pub fn set_force_promote(&mut self) {
410 self.spec.force_promote = self.spec.request_rollout;
411 }
412
413 pub fn should_force_promote(&self) -> bool {
414 self.spec.force_promote == self.spec.request_rollout
415 || self.spec.rollout_strategy
416 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
417 }
418
419 pub fn conditions_need_update(&self) -> bool {
420 let Some(status) = self.status.as_ref() else {
421 return true;
422 };
423 if status.conditions.is_empty() {
424 return true;
425 }
426 for condition in &status.conditions {
427 if condition.observed_generation != self.meta().generation {
428 return true;
429 }
430 }
431 false
432 }
433
434 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
435 let Some(status) = self.status.as_ref() else {
436 return false;
437 };
438 if status.conditions.is_empty() {
439 return false;
440 }
441 status
442 .conditions
443 .iter()
444 .any(|condition| condition.reason == "ReadyToPromote")
445 && &status.resources_hash == resources_hash
446 }
447
448 pub fn is_promoting(&self) -> bool {
449 let Some(status) = self.status.as_ref() else {
450 return false;
451 };
452 if status.conditions.is_empty() {
453 return false;
454 }
455 status
456 .conditions
457 .iter()
458 .any(|condition| condition.reason == "Promoting")
459 }
460
461 pub fn update_in_progress(&self) -> bool {
462 let Some(status) = self.status.as_ref() else {
463 return false;
464 };
465 if status.conditions.is_empty() {
466 return false;
467 }
468 for condition in &status.conditions {
469 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
470 return true;
471 }
472 }
473 false
474 }
475
476 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
480 let version = parse_image_ref(&self.spec.environmentd_image_ref);
481 match version {
482 Some(version) => &version >= minimum,
483 None => {
489 tracing::warn!(
490 image_ref = %self.spec.environmentd_image_ref,
491 "failed to parse image ref",
492 );
493 true
494 }
495 }
496 }
497
498 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
502 if next_version < active_version {
506 return false;
507 }
508
509 if active_version.major == 0 {
510 if next_version.major != active_version.major {
511 if next_version.major == 26 {
512 return (active_version.minor == 147 && active_version.patch >= 20)
516 || active_version.minor >= 164;
517 } else {
518 return false;
519 }
520 }
521 if next_version.minor == 147 && active_version.minor == 130 {
523 return true;
524 }
525 return next_version.minor <= active_version.minor + 1;
527 } else if active_version.major >= 26 {
528 return next_version.major <= active_version.major + 1;
530 }
531
532 true
533 }
534
535 pub fn within_upgrade_window(&self) -> bool {
538 let active_environmentd_version = self
539 .status
540 .as_ref()
541 .and_then(|status| {
542 status
543 .last_completed_rollout_environmentd_image_ref
544 .as_ref()
545 })
546 .and_then(|image_ref| parse_image_ref(image_ref));
547
548 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
549 parse_image_ref(&self.spec.environmentd_image_ref),
550 active_environmentd_version,
551 ) {
552 Self::is_valid_upgrade_version(
553 &active_environmentd_version,
554 &next_environmentd_version,
555 )
556 } else {
557 true
560 }
561 }
562
563 pub fn status(&self) -> MaterializeStatus {
564 self.status.clone().unwrap_or_else(|| {
565 let mut status = MaterializeStatus::default();
566
567 status.resource_id = new_resource_id();
568
569 if let Some(last_active_generation) = self
574 .annotations()
575 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
576 {
577 status.active_generation = last_active_generation
578 .parse()
579 .expect("valid int generation");
580 }
581
582 status.last_completed_rollout_environmentd_image_ref =
585 Some(self.spec.environmentd_image_ref.clone());
586
587 status
588 })
589 }
590 }
591
592 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
593 #[serde(rename_all = "camelCase")]
594 pub struct MaterializeStatus {
595 pub resource_id: String,
597 pub active_generation: u64,
599 pub last_completed_rollout_request: Uuid,
601 pub last_completed_rollout_environmentd_image_ref: Option<String>,
605 pub resources_hash: String,
610 pub conditions: Vec<Condition>,
611 }
612
613 impl MaterializeStatus {
614 pub fn needs_update(&self, other: &Self) -> bool {
615 let now = Timestamp::now();
616 let mut a = self.clone();
617 for condition in &mut a.conditions {
618 condition.last_transition_time = Time(now);
619 }
620 let mut b = other.clone();
621 for condition in &mut b.conditions {
622 condition.last_transition_time = Time(now);
623 }
624 a != b
625 }
626 }
627
628 impl ManagedResource for Materialize {
629 fn default_labels(&self) -> BTreeMap<String, String> {
630 BTreeMap::from_iter([
631 (
632 "materialize.cloud/organization-name".to_owned(),
633 self.name_unchecked(),
634 ),
635 (
636 "materialize.cloud/organization-namespace".to_owned(),
637 self.namespace(),
638 ),
639 (
640 "materialize.cloud/mz-resource-id".to_owned(),
641 self.resource_id().to_owned(),
642 ),
643 ])
644 }
645 }
646}
647
648fn parse_image_ref(image_ref: &str) -> Option<Version> {
649 image_ref
650 .rsplit_once(':')
651 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
652 .and_then(|tag| {
653 let tag = tag.replace("--", "+");
658 Version::parse(&tag).ok()
659 })
660}
661
662#[cfg(test)]
663mod tests {
664 use kube::core::ObjectMeta;
665 use semver::Version;
666
667 use super::v1alpha1::{Materialize, MaterializeSpec};
668
669 #[mz_ore::test]
670 fn meets_minimum_version() {
671 let mut mz = Materialize {
672 spec: MaterializeSpec {
673 environmentd_image_ref:
674 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
675 .to_owned(),
676 ..Default::default()
677 },
678 metadata: ObjectMeta {
679 ..Default::default()
680 },
681 status: None,
682 };
683
684 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
686 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
687 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
688 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
689 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
690 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
691 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
692 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
693 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
694 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
695 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
696 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
697 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
698 mz.spec.environmentd_image_ref =
699 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
700 .to_owned();
701 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
702
703 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
705 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
706 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
707 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
708 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
709 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
710 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
711 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
712 }
713
714 #[mz_ore::test]
715 fn within_upgrade_window() {
716 use super::v1alpha1::MaterializeStatus;
717
718 let mut mz = Materialize {
719 spec: MaterializeSpec {
720 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
721 ..Default::default()
722 },
723 metadata: ObjectMeta {
724 ..Default::default()
725 },
726 status: Some(MaterializeStatus {
727 last_completed_rollout_environmentd_image_ref: Some(
728 "materialize/environmentd:v26.0.0".to_owned(),
729 ),
730 ..Default::default()
731 }),
732 };
733
734 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
736 assert!(mz.within_upgrade_window());
737
738 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
740 assert!(mz.within_upgrade_window());
741
742 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
744 assert!(!mz.within_upgrade_window());
745
746 mz.spec.environmentd_image_ref =
748 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
749 assert!(mz.within_upgrade_window());
750
751 mz.status
753 .as_mut()
754 .unwrap()
755 .last_completed_rollout_environmentd_image_ref =
756 Some("materialize/environmentd:v0.147.20".to_owned());
757 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
758 assert!(mz.within_upgrade_window());
759 }
760
761 #[mz_ore::test]
762 fn is_valid_upgrade_version() {
763 let success_tests = [
764 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
765 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
766 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
767 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
768 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
769 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
770 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
771 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
773 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
774 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
775 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
776 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
777 ];
778 for (active_version, next_version) in success_tests {
779 assert!(
780 Materialize::is_valid_upgrade_version(&active_version, &next_version),
781 "v{active_version} can upgrade to v{next_version}"
782 );
783 }
784
785 let failure_tests = [
786 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
787 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
788 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
789 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
790 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
791 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
792 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
794 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
796 ];
797 for (active_version, next_version) in failure_tests {
798 assert!(
799 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
800 "v{active_version} can't upgrade to v{next_version}"
801 );
802 }
803 }
804}