1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29 "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32 use super::*;
33
34 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35 pub enum MaterializeRolloutStrategy {
36 #[default]
40 WaitUntilReady,
41
42 ManuallyPromote,
58
59 ImmediatelyPromoteCausingDowntime,
70 }
71
72 #[derive(
73 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
74 )]
75 #[serde(rename_all = "camelCase")]
76 #[kube(
77 namespaced,
78 group = "materialize.cloud",
79 version = "v1alpha1",
80 kind = "Materialize",
81 singular = "materialize",
82 plural = "materializes",
83 shortname = "mzs",
84 status = "MaterializeStatus",
85 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
86 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
87 )]
88 pub struct MaterializeSpec {
89 pub environmentd_image_ref: String,
91 pub environmentd_extra_args: Option<Vec<String>>,
93 pub environmentd_extra_env: Option<Vec<EnvVar>>,
95 #[kube(deprecated)]
104 pub environmentd_iam_role_arn: Option<String>,
105 pub environmentd_connection_role_arn: Option<String>,
108 pub environmentd_resource_requirements: Option<ResourceRequirements>,
110 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
112 pub balancerd_resource_requirements: Option<ResourceRequirements>,
114 pub console_resource_requirements: Option<ResourceRequirements>,
116 pub balancerd_replicas: Option<i32>,
118 pub console_replicas: Option<i32>,
120
121 pub service_account_name: Option<String>,
124 pub service_account_annotations: Option<BTreeMap<String, String>>,
131 pub service_account_labels: Option<BTreeMap<String, String>>,
133 pub pod_annotations: Option<BTreeMap<String, String>>,
135 pub pod_labels: Option<BTreeMap<String, String>>,
137
138 #[serde(default)]
150 pub request_rollout: Uuid,
151 #[serde(default)]
156 pub force_promote: Uuid,
157 #[serde(default)]
164 pub force_rollout: Uuid,
165 #[kube(deprecated)]
169 #[serde(default)]
170 pub in_place_rollout: bool,
171 #[serde(default)]
173 pub rollout_strategy: MaterializeRolloutStrategy,
174 pub backend_secret_name: String,
178 #[serde(default)]
180 pub authenticator_kind: AuthenticatorKind,
181 #[serde(default)]
183 pub enable_rbac: bool,
184
185 #[serde(default)]
192 pub environment_id: Uuid,
193
194 pub system_parameter_configmap_name: Option<String>,
209
210 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
214 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
219 pub internal_certificate_spec: Option<MaterializeCertSpec>,
224 }
225
226 impl Materialize {
227 pub fn backend_secret_name(&self) -> String {
228 self.spec.backend_secret_name.clone()
229 }
230
231 pub fn namespace(&self) -> String {
232 self.meta().namespace.clone().unwrap()
233 }
234
235 pub fn create_service_account(&self) -> bool {
236 self.spec.service_account_name.is_none()
237 }
238
239 pub fn service_account_name(&self) -> String {
240 self.spec
241 .service_account_name
242 .clone()
243 .unwrap_or_else(|| self.name_unchecked())
244 }
245
246 pub fn role_name(&self) -> String {
247 self.name_unchecked()
248 }
249
250 pub fn role_binding_name(&self) -> String {
251 self.name_unchecked()
252 }
253
254 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
255 self.name_prefixed(&format!("environmentd-{generation}"))
256 }
257
258 pub fn environmentd_app_name(&self) -> String {
259 "environmentd".to_owned()
260 }
261
262 pub fn environmentd_service_name(&self) -> String {
263 self.name_prefixed("environmentd")
264 }
265
266 pub fn environmentd_service_internal_fqdn(&self) -> String {
267 format!(
268 "{}.{}.svc.cluster.local",
269 self.environmentd_service_name(),
270 self.meta().namespace.as_ref().unwrap()
271 )
272 }
273
274 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
275 self.name_prefixed(&format!("environmentd-{generation}"))
276 }
277
278 pub fn balancerd_app_name(&self) -> String {
279 "balancerd".to_owned()
280 }
281
282 pub fn environmentd_certificate_name(&self) -> String {
283 self.name_prefixed("environmentd-external")
284 }
285
286 pub fn environmentd_certificate_secret_name(&self) -> String {
287 self.name_prefixed("environmentd-tls")
288 }
289
290 pub fn balancerd_deployment_name(&self) -> String {
291 self.name_prefixed("balancerd")
292 }
293
294 pub fn balancerd_service_name(&self) -> String {
295 self.name_prefixed("balancerd")
296 }
297
298 pub fn console_app_name(&self) -> String {
299 "console".to_owned()
300 }
301
302 pub fn balancerd_external_certificate_name(&self) -> String {
303 self.name_prefixed("balancerd-external")
304 }
305
306 pub fn balancerd_external_certificate_secret_name(&self) -> String {
307 self.name_prefixed("balancerd-external-tls")
308 }
309
310 pub fn balancerd_replicas(&self) -> i32 {
311 self.spec.balancerd_replicas.unwrap_or(2)
312 }
313
314 pub fn console_replicas(&self) -> i32 {
315 self.spec.console_replicas.unwrap_or(2)
316 }
317
318 pub fn console_configmap_name(&self) -> String {
319 self.name_prefixed("console")
320 }
321
322 pub fn console_deployment_name(&self) -> String {
323 self.name_prefixed("console")
324 }
325
326 pub fn console_service_name(&self) -> String {
327 self.name_prefixed("console")
328 }
329
330 pub fn console_external_certificate_name(&self) -> String {
331 self.name_prefixed("console-external")
332 }
333
334 pub fn console_external_certificate_secret_name(&self) -> String {
335 self.name_prefixed("console-external-tls")
336 }
337
338 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
339 self.name_prefixed(&format!("persist-pubsub-{generation}"))
340 }
341
342 pub fn listeners_configmap_name(&self, generation: u64) -> String {
343 self.name_prefixed(&format!("listeners-{generation}"))
344 }
345
346 pub fn name_prefixed(&self, suffix: &str) -> String {
347 format!("mz{}-{}", self.resource_id(), suffix)
348 }
349
350 pub fn resource_id(&self) -> &str {
351 &self.status.as_ref().unwrap().resource_id
352 }
353
354 pub fn system_parameter_configmap_name(&self) -> Option<String> {
355 self.spec.system_parameter_configmap_name.clone()
356 }
357
358 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
359 self.spec
360 .environmentd_scratch_volume_storage_requirement
361 .clone()
362 .unwrap_or_else(|| {
363 self.spec
364 .environmentd_resource_requirements
365 .as_ref()
366 .and_then(|requirements| {
367 requirements
368 .requests
369 .as_ref()
370 .or(requirements.limits.as_ref())
371 })
372 .and_then(|requirements| requirements.get("memory").cloned())
377 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
379 })
380 }
381
382 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
383 format!(
384 "{}-{}-{}-0",
385 cloud_provider, region, self.spec.environment_id,
386 )
387 }
388
389 pub fn requested_reconciliation_id(&self) -> Uuid {
390 self.spec.request_rollout
391 }
392
393 pub fn rollout_requested(&self) -> bool {
394 self.requested_reconciliation_id()
395 != self
396 .status
397 .as_ref()
398 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
399 }
400
401 pub fn set_force_promote(&mut self) {
402 self.spec.force_promote = self.spec.request_rollout;
403 }
404
405 pub fn should_force_promote(&self) -> bool {
406 self.spec.force_promote == self.spec.request_rollout
407 || self.spec.rollout_strategy
408 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
409 }
410
411 pub fn conditions_need_update(&self) -> bool {
412 let Some(status) = self.status.as_ref() else {
413 return true;
414 };
415 if status.conditions.is_empty() {
416 return true;
417 }
418 for condition in &status.conditions {
419 if condition.observed_generation != self.meta().generation {
420 return true;
421 }
422 }
423 false
424 }
425
426 pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
427 let Some(status) = self.status.as_ref() else {
428 return false;
429 };
430 if status.conditions.is_empty() {
431 return false;
432 }
433 status
434 .conditions
435 .iter()
436 .any(|condition| condition.reason == "ReadyToPromote")
437 && &status.resources_hash == resources_hash
438 }
439
440 pub fn is_promoting(&self) -> bool {
441 let Some(status) = self.status.as_ref() else {
442 return false;
443 };
444 if status.conditions.is_empty() {
445 return false;
446 }
447 status
448 .conditions
449 .iter()
450 .any(|condition| condition.reason == "Promoting")
451 }
452
453 pub fn update_in_progress(&self) -> bool {
454 let Some(status) = self.status.as_ref() else {
455 return false;
456 };
457 if status.conditions.is_empty() {
458 return false;
459 }
460 for condition in &status.conditions {
461 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
462 return true;
463 }
464 }
465 false
466 }
467
468 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
472 let version = parse_image_ref(&self.spec.environmentd_image_ref);
473 match version {
474 Some(version) => &version >= minimum,
475 None => {
481 tracing::warn!(
482 image_ref = %self.spec.environmentd_image_ref,
483 "failed to parse image ref",
484 );
485 true
486 }
487 }
488 }
489
490 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
494 if next_version < active_version {
498 return false;
499 }
500
501 if active_version.major == 0 {
502 if next_version.major != active_version.major {
503 if next_version.major == 26 {
504 return (active_version.minor == 147 && active_version.patch >= 20)
508 || active_version.minor >= 164;
509 } else {
510 return false;
511 }
512 }
513 if next_version.minor == 147 && active_version.minor == 130 {
515 return true;
516 }
517 return next_version.minor <= active_version.minor + 1;
519 } else if active_version.major >= 26 {
520 return next_version.major <= active_version.major + 1;
522 }
523
524 true
525 }
526
527 pub fn within_upgrade_window(&self) -> bool {
530 let active_environmentd_version = self
531 .status
532 .as_ref()
533 .and_then(|status| {
534 status
535 .last_completed_rollout_environmentd_image_ref
536 .as_ref()
537 })
538 .and_then(|image_ref| parse_image_ref(image_ref));
539
540 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
541 parse_image_ref(&self.spec.environmentd_image_ref),
542 active_environmentd_version,
543 ) {
544 Self::is_valid_upgrade_version(
545 &active_environmentd_version,
546 &next_environmentd_version,
547 )
548 } else {
549 true
552 }
553 }
554
555 pub fn status(&self) -> MaterializeStatus {
556 self.status.clone().unwrap_or_else(|| {
557 let mut status = MaterializeStatus::default();
558
559 status.resource_id = new_resource_id();
560
561 if let Some(last_active_generation) = self
566 .annotations()
567 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
568 {
569 status.active_generation = last_active_generation
570 .parse()
571 .expect("valid int generation");
572 }
573
574 status.last_completed_rollout_environmentd_image_ref =
577 Some(self.spec.environmentd_image_ref.clone());
578
579 status
580 })
581 }
582 }
583
584 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
585 #[serde(rename_all = "camelCase")]
586 pub struct MaterializeStatus {
587 pub resource_id: String,
589 pub active_generation: u64,
591 pub last_completed_rollout_request: Uuid,
593 pub last_completed_rollout_environmentd_image_ref: Option<String>,
597 pub resources_hash: String,
602 pub conditions: Vec<Condition>,
603 }
604
605 impl MaterializeStatus {
606 pub fn needs_update(&self, other: &Self) -> bool {
607 let now = chrono::offset::Utc::now();
608 let mut a = self.clone();
609 for condition in &mut a.conditions {
610 condition.last_transition_time = Time(now);
611 }
612 let mut b = other.clone();
613 for condition in &mut b.conditions {
614 condition.last_transition_time = Time(now);
615 }
616 a != b
617 }
618 }
619
620 impl ManagedResource for Materialize {
621 fn default_labels(&self) -> BTreeMap<String, String> {
622 BTreeMap::from_iter([
623 (
624 "materialize.cloud/organization-name".to_owned(),
625 self.name_unchecked(),
626 ),
627 (
628 "materialize.cloud/organization-namespace".to_owned(),
629 self.namespace(),
630 ),
631 (
632 "materialize.cloud/mz-resource-id".to_owned(),
633 self.resource_id().to_owned(),
634 ),
635 ])
636 }
637 }
638}
639
640fn parse_image_ref(image_ref: &str) -> Option<Version> {
641 image_ref
642 .rsplit_once(':')
643 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
644 .and_then(|tag| {
645 let tag = tag.replace("--", "+");
650 Version::parse(&tag).ok()
651 })
652}
653
654#[cfg(test)]
655mod tests {
656 use kube::core::ObjectMeta;
657 use semver::Version;
658
659 use super::v1alpha1::{Materialize, MaterializeSpec};
660
661 #[mz_ore::test]
662 fn meets_minimum_version() {
663 let mut mz = Materialize {
664 spec: MaterializeSpec {
665 environmentd_image_ref:
666 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
667 .to_owned(),
668 ..Default::default()
669 },
670 metadata: ObjectMeta {
671 ..Default::default()
672 },
673 status: None,
674 };
675
676 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
678 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
679 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
680 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
681 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
682 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
683 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
684 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
685 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
686 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
687 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
688 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
689 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
690 mz.spec.environmentd_image_ref =
691 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
692 .to_owned();
693 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
694
695 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
697 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
698 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
699 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
700 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
701 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
702 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
703 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
704 }
705
706 #[mz_ore::test]
707 fn within_upgrade_window() {
708 use super::v1alpha1::MaterializeStatus;
709
710 let mut mz = Materialize {
711 spec: MaterializeSpec {
712 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
713 ..Default::default()
714 },
715 metadata: ObjectMeta {
716 ..Default::default()
717 },
718 status: Some(MaterializeStatus {
719 last_completed_rollout_environmentd_image_ref: Some(
720 "materialize/environmentd:v26.0.0".to_owned(),
721 ),
722 ..Default::default()
723 }),
724 };
725
726 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
728 assert!(mz.within_upgrade_window());
729
730 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
732 assert!(mz.within_upgrade_window());
733
734 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
736 assert!(!mz.within_upgrade_window());
737
738 mz.spec.environmentd_image_ref =
740 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
741 assert!(mz.within_upgrade_window());
742
743 mz.status
745 .as_mut()
746 .unwrap()
747 .last_completed_rollout_environmentd_image_ref =
748 Some("materialize/environmentd:v0.147.20".to_owned());
749 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
750 assert!(mz.within_upgrade_window());
751 }
752
753 #[mz_ore::test]
754 fn is_valid_upgrade_version() {
755 let success_tests = [
756 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
757 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
758 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
759 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
760 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
761 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
762 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
763 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
765 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
766 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
767 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
768 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
769 ];
770 for (active_version, next_version) in success_tests {
771 assert!(
772 Materialize::is_valid_upgrade_version(&active_version, &next_version),
773 "v{active_version} can upgrade to v{next_version}"
774 );
775 }
776
777 let failure_tests = [
778 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
779 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
780 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
781 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
782 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
783 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
784 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
786 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
788 ];
789 for (active_version, next_version) in failure_tests {
790 assert!(
791 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
792 "v{active_version} can't upgrade to v{next_version}"
793 );
794 }
795 }
796}