1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29 "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32 use super::*;
33
34 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35 pub enum MaterializeRolloutStrategy {
36 #[default]
40 WaitUntilReady,
41
42 ManuallyPromote,
64
65 ImmediatelyPromoteCausingDowntime,
76 }
77
78 #[derive(
79 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
80 )]
81 #[serde(rename_all = "camelCase")]
82 #[kube(
83 namespaced,
84 group = "materialize.cloud",
85 version = "v1alpha1",
86 kind = "Materialize",
87 singular = "materialize",
88 plural = "materializes",
89 shortname = "mzs",
90 status = "MaterializeStatus",
91 printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
92 printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
93 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
94 )]
95 pub struct MaterializeSpec {
96 pub environmentd_image_ref: String,
98 pub environmentd_extra_args: Option<Vec<String>>,
100 pub environmentd_extra_env: Option<Vec<EnvVar>>,
102 #[kube(deprecated)]
111 pub environmentd_iam_role_arn: Option<String>,
112 pub environmentd_connection_role_arn: Option<String>,
115 pub environmentd_resource_requirements: Option<ResourceRequirements>,
117 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
119 pub balancerd_resource_requirements: Option<ResourceRequirements>,
121 pub console_resource_requirements: Option<ResourceRequirements>,
123 pub balancerd_replicas: Option<i32>,
125 pub console_replicas: Option<i32>,
127
128 pub service_account_name: Option<String>,
131 pub service_account_annotations: Option<BTreeMap<String, String>>,
138 pub service_account_labels: Option<BTreeMap<String, String>>,
140 pub pod_annotations: Option<BTreeMap<String, String>>,
142 pub pod_labels: Option<BTreeMap<String, String>>,
144
145 #[serde(default)]
157 pub request_rollout: Uuid,
158 #[serde(default)]
163 pub force_promote: Uuid,
164 #[serde(default)]
171 pub force_rollout: Uuid,
172 #[kube(deprecated)]
176 #[serde(default)]
177 pub in_place_rollout: bool,
178 #[serde(default)]
180 pub rollout_strategy: MaterializeRolloutStrategy,
181 pub backend_secret_name: String,
185 #[serde(default)]
187 pub authenticator_kind: AuthenticatorKind,
188 #[serde(default)]
190 pub enable_rbac: bool,
191
192 #[serde(default)]
199 pub environment_id: Uuid,
200
201 pub system_parameter_configmap_name: Option<String>,
216
217 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
221 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
226 pub internal_certificate_spec: Option<MaterializeCertSpec>,
231 }
232
233 impl Materialize {
234 pub fn backend_secret_name(&self) -> String {
235 self.spec.backend_secret_name.clone()
236 }
237
238 pub fn namespace(&self) -> String {
239 self.meta().namespace.clone().unwrap()
240 }
241
242 pub fn create_service_account(&self) -> bool {
243 self.spec.service_account_name.is_none()
244 }
245
246 pub fn service_account_name(&self) -> String {
247 self.spec
248 .service_account_name
249 .clone()
250 .unwrap_or_else(|| self.name_unchecked())
251 }
252
253 pub fn role_name(&self) -> String {
254 self.name_unchecked()
255 }
256
257 pub fn role_binding_name(&self) -> String {
258 self.name_unchecked()
259 }
260
261 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
262 self.name_prefixed(&format!("environmentd-{generation}"))
263 }
264
265 pub fn environmentd_app_name(&self) -> String {
266 "environmentd".to_owned()
267 }
268
269 pub fn environmentd_service_name(&self) -> String {
270 self.name_prefixed("environmentd")
271 }
272
273 pub fn environmentd_service_internal_fqdn(&self) -> String {
274 format!(
275 "{}.{}.svc.cluster.local",
276 self.environmentd_service_name(),
277 self.meta().namespace.as_ref().unwrap()
278 )
279 }
280
281 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
282 self.name_prefixed(&format!("environmentd-{generation}"))
283 }
284
285 pub fn balancerd_app_name(&self) -> String {
286 "balancerd".to_owned()
287 }
288
289 pub fn environmentd_certificate_name(&self) -> String {
290 self.name_prefixed("environmentd-external")
291 }
292
293 pub fn environmentd_certificate_secret_name(&self) -> String {
294 self.name_prefixed("environmentd-tls")
295 }
296
297 pub fn balancerd_deployment_name(&self) -> String {
298 self.name_prefixed("balancerd")
299 }
300
301 pub fn balancerd_service_name(&self) -> String {
302 self.name_prefixed("balancerd")
303 }
304
305 pub fn console_app_name(&self) -> String {
306 "console".to_owned()
307 }
308
309 pub fn balancerd_external_certificate_name(&self) -> String {
310 self.name_prefixed("balancerd-external")
311 }
312
313 pub fn balancerd_external_certificate_secret_name(&self) -> String {
314 self.name_prefixed("balancerd-external-tls")
315 }
316
317 pub fn balancerd_replicas(&self) -> i32 {
318 self.spec.balancerd_replicas.unwrap_or(2)
319 }
320
321 pub fn console_replicas(&self) -> i32 {
322 self.spec.console_replicas.unwrap_or(2)
323 }
324
325 pub fn console_configmap_name(&self) -> String {
326 self.name_prefixed("console")
327 }
328
329 pub fn console_deployment_name(&self) -> String {
330 self.name_prefixed("console")
331 }
332
333 pub fn console_service_name(&self) -> String {
334 self.name_prefixed("console")
335 }
336
337 pub fn console_external_certificate_name(&self) -> String {
338 self.name_prefixed("console-external")
339 }
340
341 pub fn console_external_certificate_secret_name(&self) -> String {
342 self.name_prefixed("console-external-tls")
343 }
344
345 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
346 self.name_prefixed(&format!("persist-pubsub-{generation}"))
347 }
348
349 pub fn listeners_configmap_name(&self, generation: u64) -> String {
350 self.name_prefixed(&format!("listeners-{generation}"))
351 }
352
353 pub fn name_prefixed(&self, suffix: &str) -> String {
354 format!("mz{}-{}", self.resource_id(), suffix)
355 }
356
357 pub fn resource_id(&self) -> &str {
358 &self.status.as_ref().unwrap().resource_id
359 }
360
361 pub fn system_parameter_configmap_name(&self) -> Option<String> {
362 self.spec.system_parameter_configmap_name.clone()
363 }
364
365 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
366 self.spec
367 .environmentd_scratch_volume_storage_requirement
368 .clone()
369 .unwrap_or_else(|| {
370 self.spec
371 .environmentd_resource_requirements
372 .as_ref()
373 .and_then(|requirements| {
374 requirements
375 .requests
376 .as_ref()
377 .or(requirements.limits.as_ref())
378 })
379 .and_then(|requirements| requirements.get("memory").cloned())
384 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
386 })
387 }
388
389 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
390 format!(
391 "{}-{}-{}-0",
392 cloud_provider, region, self.spec.environment_id,
393 )
394 }
395
396 pub fn requested_reconciliation_id(&self) -> Uuid {
397 self.spec.request_rollout
398 }
399
400 pub fn rollout_requested(&self) -> bool {
401 self.requested_reconciliation_id()
402 != self
403 .status
404 .as_ref()
405 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
406 }
407
408 pub fn set_force_promote(&mut self) {
409 self.spec.force_promote = self.spec.request_rollout;
410 }
411
412 pub fn should_force_promote(&self) -> bool {
413 self.spec.force_promote == self.spec.request_rollout
414 || self.spec.rollout_strategy
415 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
416 }
417
418 pub fn conditions_need_update(&self) -> bool {
419 let Some(status) = self.status.as_ref() else {
420 return true;
421 };
422 if status.conditions.is_empty() {
423 return true;
424 }
425 for condition in &status.conditions {
426 if condition.observed_generation != self.meta().generation {
427 return true;
428 }
429 }
430 false
431 }
432
433 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
434 let Some(status) = self.status.as_ref() else {
435 return false;
436 };
437 if status.conditions.is_empty() {
438 return false;
439 }
440 status
441 .conditions
442 .iter()
443 .any(|condition| condition.reason == "ReadyToPromote")
444 && &status.resources_hash == resources_hash
445 }
446
447 pub fn is_promoting(&self) -> bool {
448 let Some(status) = self.status.as_ref() else {
449 return false;
450 };
451 if status.conditions.is_empty() {
452 return false;
453 }
454 status
455 .conditions
456 .iter()
457 .any(|condition| condition.reason == "Promoting")
458 }
459
460 pub fn update_in_progress(&self) -> bool {
461 let Some(status) = self.status.as_ref() else {
462 return false;
463 };
464 if status.conditions.is_empty() {
465 return false;
466 }
467 for condition in &status.conditions {
468 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
469 return true;
470 }
471 }
472 false
473 }
474
475 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
479 let version = parse_image_ref(&self.spec.environmentd_image_ref);
480 match version {
481 Some(version) => &version >= minimum,
482 None => {
488 tracing::warn!(
489 image_ref = %self.spec.environmentd_image_ref,
490 "failed to parse image ref",
491 );
492 true
493 }
494 }
495 }
496
497 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
501 if next_version < active_version {
505 return false;
506 }
507
508 if active_version.major == 0 {
509 if next_version.major != active_version.major {
510 if next_version.major == 26 {
511 return (active_version.minor == 147 && active_version.patch >= 20)
515 || active_version.minor >= 164;
516 } else {
517 return false;
518 }
519 }
520 if next_version.minor == 147 && active_version.minor == 130 {
522 return true;
523 }
524 return next_version.minor <= active_version.minor + 1;
526 } else if active_version.major >= 26 {
527 return next_version.major <= active_version.major + 1;
529 }
530
531 true
532 }
533
534 pub fn within_upgrade_window(&self) -> bool {
537 let active_environmentd_version = self
538 .status
539 .as_ref()
540 .and_then(|status| {
541 status
542 .last_completed_rollout_environmentd_image_ref
543 .as_ref()
544 })
545 .and_then(|image_ref| parse_image_ref(image_ref));
546
547 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
548 parse_image_ref(&self.spec.environmentd_image_ref),
549 active_environmentd_version,
550 ) {
551 Self::is_valid_upgrade_version(
552 &active_environmentd_version,
553 &next_environmentd_version,
554 )
555 } else {
556 true
559 }
560 }
561
562 pub fn status(&self) -> MaterializeStatus {
563 self.status.clone().unwrap_or_else(|| {
564 let mut status = MaterializeStatus::default();
565
566 status.resource_id = new_resource_id();
567
568 if let Some(last_active_generation) = self
573 .annotations()
574 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
575 {
576 status.active_generation = last_active_generation
577 .parse()
578 .expect("valid int generation");
579 }
580
581 status.last_completed_rollout_environmentd_image_ref =
584 Some(self.spec.environmentd_image_ref.clone());
585
586 status
587 })
588 }
589 }
590
591 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
592 #[serde(rename_all = "camelCase")]
593 pub struct MaterializeStatus {
594 pub resource_id: String,
596 pub active_generation: u64,
598 pub last_completed_rollout_request: Uuid,
600 pub last_completed_rollout_environmentd_image_ref: Option<String>,
604 pub resources_hash: String,
609 pub conditions: Vec<Condition>,
610 }
611
612 impl MaterializeStatus {
613 pub fn needs_update(&self, other: &Self) -> bool {
614 let now = chrono::offset::Utc::now();
615 let mut a = self.clone();
616 for condition in &mut a.conditions {
617 condition.last_transition_time = Time(now);
618 }
619 let mut b = other.clone();
620 for condition in &mut b.conditions {
621 condition.last_transition_time = Time(now);
622 }
623 a != b
624 }
625 }
626
627 impl ManagedResource for Materialize {
628 fn default_labels(&self) -> BTreeMap<String, String> {
629 BTreeMap::from_iter([
630 (
631 "materialize.cloud/organization-name".to_owned(),
632 self.name_unchecked(),
633 ),
634 (
635 "materialize.cloud/organization-namespace".to_owned(),
636 self.namespace(),
637 ),
638 (
639 "materialize.cloud/mz-resource-id".to_owned(),
640 self.resource_id().to_owned(),
641 ),
642 ])
643 }
644 }
645}
646
647fn parse_image_ref(image_ref: &str) -> Option<Version> {
648 image_ref
649 .rsplit_once(':')
650 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
651 .and_then(|tag| {
652 let tag = tag.replace("--", "+");
657 Version::parse(&tag).ok()
658 })
659}
660
661#[cfg(test)]
662mod tests {
663 use kube::core::ObjectMeta;
664 use semver::Version;
665
666 use super::v1alpha1::{Materialize, MaterializeSpec};
667
668 #[mz_ore::test]
669 fn meets_minimum_version() {
670 let mut mz = Materialize {
671 spec: MaterializeSpec {
672 environmentd_image_ref:
673 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
674 .to_owned(),
675 ..Default::default()
676 },
677 metadata: ObjectMeta {
678 ..Default::default()
679 },
680 status: None,
681 };
682
683 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
685 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
686 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
687 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
688 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
689 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
690 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
691 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
692 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
693 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
694 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
695 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
696 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
697 mz.spec.environmentd_image_ref =
698 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
699 .to_owned();
700 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
701
702 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
704 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
706 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
707 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
708 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
709 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
710 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
711 }
712
713 #[mz_ore::test]
714 fn within_upgrade_window() {
715 use super::v1alpha1::MaterializeStatus;
716
717 let mut mz = Materialize {
718 spec: MaterializeSpec {
719 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
720 ..Default::default()
721 },
722 metadata: ObjectMeta {
723 ..Default::default()
724 },
725 status: Some(MaterializeStatus {
726 last_completed_rollout_environmentd_image_ref: Some(
727 "materialize/environmentd:v26.0.0".to_owned(),
728 ),
729 ..Default::default()
730 }),
731 };
732
733 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
735 assert!(mz.within_upgrade_window());
736
737 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
739 assert!(mz.within_upgrade_window());
740
741 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
743 assert!(!mz.within_upgrade_window());
744
745 mz.spec.environmentd_image_ref =
747 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
748 assert!(mz.within_upgrade_window());
749
750 mz.status
752 .as_mut()
753 .unwrap()
754 .last_completed_rollout_environmentd_image_ref =
755 Some("materialize/environmentd:v0.147.20".to_owned());
756 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
757 assert!(mz.within_upgrade_window());
758 }
759
760 #[mz_ore::test]
761 fn is_valid_upgrade_version() {
762 let success_tests = [
763 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
764 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
765 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
766 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
767 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
768 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
769 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
770 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
772 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
773 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
774 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
775 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
776 ];
777 for (active_version, next_version) in success_tests {
778 assert!(
779 Materialize::is_valid_upgrade_version(&active_version, &next_version),
780 "v{active_version} can upgrade to v{next_version}"
781 );
782 }
783
784 let failure_tests = [
785 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
786 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
787 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
788 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
789 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
790 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
791 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
793 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
795 ];
796 for (active_version, next_version) in failure_tests {
797 assert!(
798 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
799 "v{active_version} can't upgrade to v{next_version}"
800 );
801 }
802 }
803}