1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29 "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32 use super::*;
33
34 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35 pub enum MaterializeRolloutStrategy {
36 #[default]
40 WaitUntilReady,
41
42 ImmediatelyPromoteCausingDowntime,
53 }
54
55 #[derive(
56 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
57 )]
58 #[serde(rename_all = "camelCase")]
59 #[kube(
60 namespaced,
61 group = "materialize.cloud",
62 version = "v1alpha1",
63 kind = "Materialize",
64 singular = "materialize",
65 plural = "materializes",
66 shortname = "mzs",
67 status = "MaterializeStatus",
68 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
69 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
70 )]
71 pub struct MaterializeSpec {
72 pub environmentd_image_ref: String,
74 pub environmentd_extra_args: Option<Vec<String>>,
76 pub environmentd_extra_env: Option<Vec<EnvVar>>,
78 #[kube(deprecated)]
87 pub environmentd_iam_role_arn: Option<String>,
88 pub environmentd_connection_role_arn: Option<String>,
91 pub environmentd_resource_requirements: Option<ResourceRequirements>,
93 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
95 pub balancerd_resource_requirements: Option<ResourceRequirements>,
97 pub console_resource_requirements: Option<ResourceRequirements>,
99 pub balancerd_replicas: Option<i32>,
101 pub console_replicas: Option<i32>,
103
104 pub service_account_name: Option<String>,
107 pub service_account_annotations: Option<BTreeMap<String, String>>,
114 pub service_account_labels: Option<BTreeMap<String, String>>,
116 pub pod_annotations: Option<BTreeMap<String, String>>,
118 pub pod_labels: Option<BTreeMap<String, String>>,
120
121 #[serde(default)]
133 pub request_rollout: Uuid,
134 #[serde(default)]
139 pub force_promote: Uuid,
140 #[serde(default)]
147 pub force_rollout: Uuid,
148 #[kube(deprecated)]
152 #[serde(default)]
153 pub in_place_rollout: bool,
154 #[serde(default)]
156 pub rollout_strategy: MaterializeRolloutStrategy,
157 pub backend_secret_name: String,
161 #[serde(default)]
163 pub authenticator_kind: AuthenticatorKind,
164 #[serde(default)]
166 pub enable_rbac: bool,
167
168 #[serde(default)]
175 pub environment_id: Uuid,
176
177 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
181 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
186 pub internal_certificate_spec: Option<MaterializeCertSpec>,
191 }
192
193 impl Materialize {
194 pub fn backend_secret_name(&self) -> String {
195 self.spec.backend_secret_name.clone()
196 }
197
198 pub fn namespace(&self) -> String {
199 self.meta().namespace.clone().unwrap()
200 }
201
202 pub fn create_service_account(&self) -> bool {
203 self.spec.service_account_name.is_none()
204 }
205
206 pub fn service_account_name(&self) -> String {
207 self.spec
208 .service_account_name
209 .clone()
210 .unwrap_or_else(|| self.name_unchecked())
211 }
212
213 pub fn role_name(&self) -> String {
214 self.name_unchecked()
215 }
216
217 pub fn role_binding_name(&self) -> String {
218 self.name_unchecked()
219 }
220
221 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
222 self.name_prefixed(&format!("environmentd-{generation}"))
223 }
224
225 pub fn environmentd_app_name(&self) -> String {
226 "environmentd".to_owned()
227 }
228
229 pub fn environmentd_service_name(&self) -> String {
230 self.name_prefixed("environmentd")
231 }
232
233 pub fn environmentd_service_internal_fqdn(&self) -> String {
234 format!(
235 "{}.{}.svc.cluster.local",
236 self.environmentd_service_name(),
237 self.meta().namespace.as_ref().unwrap()
238 )
239 }
240
241 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
242 self.name_prefixed(&format!("environmentd-{generation}"))
243 }
244
245 pub fn balancerd_app_name(&self) -> String {
246 "balancerd".to_owned()
247 }
248
249 pub fn environmentd_certificate_name(&self) -> String {
250 self.name_prefixed("environmentd-external")
251 }
252
253 pub fn environmentd_certificate_secret_name(&self) -> String {
254 self.name_prefixed("environmentd-tls")
255 }
256
257 pub fn balancerd_deployment_name(&self) -> String {
258 self.name_prefixed("balancerd")
259 }
260
261 pub fn balancerd_service_name(&self) -> String {
262 self.name_prefixed("balancerd")
263 }
264
265 pub fn console_app_name(&self) -> String {
266 "console".to_owned()
267 }
268
269 pub fn balancerd_external_certificate_name(&self) -> String {
270 self.name_prefixed("balancerd-external")
271 }
272
273 pub fn balancerd_external_certificate_secret_name(&self) -> String {
274 self.name_prefixed("balancerd-external-tls")
275 }
276
277 pub fn balancerd_replicas(&self) -> i32 {
278 self.spec.balancerd_replicas.unwrap_or(2)
279 }
280
281 pub fn console_replicas(&self) -> i32 {
282 self.spec.console_replicas.unwrap_or(2)
283 }
284
285 pub fn console_configmap_name(&self) -> String {
286 self.name_prefixed("console")
287 }
288
289 pub fn console_deployment_name(&self) -> String {
290 self.name_prefixed("console")
291 }
292
293 pub fn console_service_name(&self) -> String {
294 self.name_prefixed("console")
295 }
296
297 pub fn console_external_certificate_name(&self) -> String {
298 self.name_prefixed("console-external")
299 }
300
301 pub fn console_external_certificate_secret_name(&self) -> String {
302 self.name_prefixed("console-external-tls")
303 }
304
305 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
306 self.name_prefixed(&format!("persist-pubsub-{generation}"))
307 }
308
309 pub fn listeners_configmap_name(&self, generation: u64) -> String {
310 self.name_prefixed(&format!("listeners-{generation}"))
311 }
312
313 pub fn name_prefixed(&self, suffix: &str) -> String {
314 format!("mz{}-{}", self.resource_id(), suffix)
315 }
316
317 pub fn resource_id(&self) -> &str {
318 &self.status.as_ref().unwrap().resource_id
319 }
320
321 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
322 self.spec
323 .environmentd_scratch_volume_storage_requirement
324 .clone()
325 .unwrap_or_else(|| {
326 self.spec
327 .environmentd_resource_requirements
328 .as_ref()
329 .and_then(|requirements| {
330 requirements
331 .requests
332 .as_ref()
333 .or(requirements.limits.as_ref())
334 })
335 .and_then(|requirements| requirements.get("memory").cloned())
340 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
342 })
343 }
344
345 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
346 format!(
347 "{}-{}-{}-0",
348 cloud_provider, region, self.spec.environment_id,
349 )
350 }
351
352 pub fn requested_reconciliation_id(&self) -> Uuid {
353 self.spec.request_rollout
354 }
355
356 pub fn rollout_requested(&self) -> bool {
357 self.requested_reconciliation_id()
358 != self
359 .status
360 .as_ref()
361 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
362 }
363
364 pub fn set_force_promote(&mut self) {
365 self.spec.force_promote = self.spec.request_rollout;
366 }
367
368 pub fn should_force_promote(&self) -> bool {
369 self.spec.force_promote == self.spec.request_rollout
370 || self.spec.rollout_strategy
371 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
372 }
373
374 pub fn conditions_need_update(&self) -> bool {
375 let Some(status) = self.status.as_ref() else {
376 return true;
377 };
378 if status.conditions.is_empty() {
379 return true;
380 }
381 for condition in &status.conditions {
382 if condition.observed_generation != self.meta().generation {
383 return true;
384 }
385 }
386 false
387 }
388
389 pub fn is_promoting(&self) -> bool {
390 let Some(status) = self.status.as_ref() else {
391 return false;
392 };
393 if status.conditions.is_empty() {
394 return false;
395 }
396 status
397 .conditions
398 .iter()
399 .any(|condition| condition.reason == "Promoting")
400 }
401
402 pub fn update_in_progress(&self) -> bool {
403 let Some(status) = self.status.as_ref() else {
404 return false;
405 };
406 if status.conditions.is_empty() {
407 return false;
408 }
409 for condition in &status.conditions {
410 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
411 return true;
412 }
413 }
414 false
415 }
416
417 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
421 let version = parse_image_ref(&self.spec.environmentd_image_ref);
422 match version {
423 Some(version) => &version >= minimum,
424 None => {
430 tracing::warn!(
431 image_ref = %self.spec.environmentd_image_ref,
432 "failed to parse image ref",
433 );
434 true
435 }
436 }
437 }
438
439 pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
443 if next_version < active_version {
447 return false;
448 }
449
450 if active_version.major == 0 {
451 if next_version.major != active_version.major {
452 if next_version.major == 26 {
453 return (active_version.minor == 147 && active_version.patch >= 20)
457 || active_version.minor >= 164;
458 } else {
459 return false;
460 }
461 }
462 if next_version.minor == 147 && active_version.minor == 130 {
464 return true;
465 }
466 return next_version.minor <= active_version.minor + 1;
468 } else if active_version.major >= 26 {
469 return next_version.major <= active_version.major + 1;
471 }
472
473 true
474 }
475
476 pub fn within_upgrade_window(&self) -> bool {
479 let active_environmentd_version = self
480 .status
481 .as_ref()
482 .and_then(|status| {
483 status
484 .last_completed_rollout_environmentd_image_ref
485 .as_ref()
486 })
487 .and_then(|image_ref| parse_image_ref(image_ref));
488
489 if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
490 parse_image_ref(&self.spec.environmentd_image_ref),
491 active_environmentd_version,
492 ) {
493 Self::is_valid_upgrade_version(
494 &active_environmentd_version,
495 &next_environmentd_version,
496 )
497 } else {
498 true
501 }
502 }
503
504 pub fn status(&self) -> MaterializeStatus {
505 self.status.clone().unwrap_or_else(|| {
506 let mut status = MaterializeStatus::default();
507
508 status.resource_id = new_resource_id();
509
510 if let Some(last_active_generation) = self
515 .annotations()
516 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
517 {
518 status.active_generation = last_active_generation
519 .parse()
520 .expect("valid int generation");
521 }
522
523 status.last_completed_rollout_environmentd_image_ref =
526 Some(self.spec.environmentd_image_ref.clone());
527
528 status
529 })
530 }
531 }
532
533 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
534 #[serde(rename_all = "camelCase")]
535 pub struct MaterializeStatus {
536 pub resource_id: String,
538 pub active_generation: u64,
540 pub last_completed_rollout_request: Uuid,
542 pub last_completed_rollout_environmentd_image_ref: Option<String>,
546 pub resources_hash: String,
551 pub conditions: Vec<Condition>,
552 }
553
554 impl MaterializeStatus {
555 pub fn needs_update(&self, other: &Self) -> bool {
556 let now = chrono::offset::Utc::now();
557 let mut a = self.clone();
558 for condition in &mut a.conditions {
559 condition.last_transition_time = Time(now);
560 }
561 let mut b = other.clone();
562 for condition in &mut b.conditions {
563 condition.last_transition_time = Time(now);
564 }
565 a != b
566 }
567 }
568
569 impl ManagedResource for Materialize {
570 fn default_labels(&self) -> BTreeMap<String, String> {
571 BTreeMap::from_iter([
572 (
573 "materialize.cloud/organization-name".to_owned(),
574 self.name_unchecked(),
575 ),
576 (
577 "materialize.cloud/organization-namespace".to_owned(),
578 self.namespace(),
579 ),
580 (
581 "materialize.cloud/mz-resource-id".to_owned(),
582 self.resource_id().to_owned(),
583 ),
584 ])
585 }
586 }
587}
588
589fn parse_image_ref(image_ref: &str) -> Option<Version> {
590 image_ref
591 .rsplit_once(':')
592 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
593 .and_then(|tag| {
594 let tag = tag.replace("--", "+");
599 Version::parse(&tag).ok()
600 })
601}
602
603#[cfg(test)]
604mod tests {
605 use kube::core::ObjectMeta;
606 use semver::Version;
607
608 use super::v1alpha1::{Materialize, MaterializeSpec};
609
610 #[mz_ore::test]
611 fn meets_minimum_version() {
612 let mut mz = Materialize {
613 spec: MaterializeSpec {
614 environmentd_image_ref:
615 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
616 .to_owned(),
617 ..Default::default()
618 },
619 metadata: ObjectMeta {
620 ..Default::default()
621 },
622 status: None,
623 };
624
625 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
627 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
628 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
629 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
630 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
631 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
632 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
633 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
634 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
635 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
636 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
637 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
638 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
639 mz.spec.environmentd_image_ref =
640 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
641 .to_owned();
642 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
643
644 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
646 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
647 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
648 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
649 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
650 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
651 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
652 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
653 }
654
655 #[mz_ore::test]
656 fn within_upgrade_window() {
657 use super::v1alpha1::MaterializeStatus;
658
659 let mut mz = Materialize {
660 spec: MaterializeSpec {
661 environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
662 ..Default::default()
663 },
664 metadata: ObjectMeta {
665 ..Default::default()
666 },
667 status: Some(MaterializeStatus {
668 last_completed_rollout_environmentd_image_ref: Some(
669 "materialize/environmentd:v26.0.0".to_owned(),
670 ),
671 ..Default::default()
672 }),
673 };
674
675 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
677 assert!(mz.within_upgrade_window());
678
679 mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
681 assert!(mz.within_upgrade_window());
682
683 mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
685 assert!(!mz.within_upgrade_window());
686
687 mz.spec.environmentd_image_ref =
689 "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
690 assert!(mz.within_upgrade_window());
691
692 mz.status
694 .as_mut()
695 .unwrap()
696 .last_completed_rollout_environmentd_image_ref =
697 Some("materialize/environmentd:v0.147.20".to_owned());
698 mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
699 assert!(mz.within_upgrade_window());
700 }
701
702 #[mz_ore::test]
703 fn is_valid_upgrade_version() {
704 let success_tests = [
705 (Version::new(0, 83, 0), Version::new(0, 83, 0)),
706 (Version::new(0, 83, 0), Version::new(0, 84, 0)),
707 (Version::new(0, 9, 0), Version::new(0, 10, 0)),
708 (Version::new(0, 99, 0), Version::new(0, 100, 0)),
709 (Version::new(0, 83, 0), Version::new(0, 83, 1)),
710 (Version::new(0, 83, 0), Version::new(0, 83, 2)),
711 (Version::new(0, 83, 2), Version::new(0, 83, 10)),
712 (Version::new(0, 147, 20), Version::new(26, 0, 0)),
714 (Version::new(0, 164, 0), Version::new(26, 0, 0)),
715 (Version::new(26, 0, 0), Version::new(26, 1, 0)),
716 (Version::new(26, 5, 3), Version::new(26, 10, 0)),
717 (Version::new(0, 130, 0), Version::new(0, 147, 0)),
718 ];
719 for (active_version, next_version) in success_tests {
720 assert!(
721 Materialize::is_valid_upgrade_version(&active_version, &next_version),
722 "v{active_version} can upgrade to v{next_version}"
723 );
724 }
725
726 let failure_tests = [
727 (Version::new(0, 83, 0), Version::new(0, 82, 0)),
728 (Version::new(0, 83, 3), Version::new(0, 83, 2)),
729 (Version::new(0, 83, 3), Version::new(1, 83, 3)),
730 (Version::new(0, 83, 0), Version::new(0, 85, 0)),
731 (Version::new(26, 0, 0), Version::new(28, 0, 0)),
732 (Version::new(0, 130, 0), Version::new(26, 1, 0)),
733 (Version::new(0, 147, 1), Version::new(26, 0, 0)),
735 (Version::new(0, 148, 0), Version::new(26, 0, 0)),
737 ];
738 for (active_version, next_version) in failure_tests {
739 assert!(
740 !Materialize::is_valid_upgrade_version(&active_version, &next_version),
741 "v{active_version} can't upgrade to v{next_version}"
742 );
743 }
744 }
745}