mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29    "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32    use super::*;
33
34    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35    pub enum MaterializeRolloutStrategy {
36        /// Create a new generation of pods, leaving the old generation around until the
37        /// new ones are ready to take over.
38        /// This minimizes downtime, and is what almost everyone should use.
39        #[default]
40        WaitUntilReady,
41
42        /// {{<warning>}}
43        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
44        ///
45        /// This strategy should ONLY be used by customers with physical hardware who do not have
46        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
47        /// consult with Materialize engineering to discuss your situation.
48        /// {{</warning>}}
49        ///
50        /// Tear down the old generation of pods and promote the new generation of pods immediately,
51        /// without waiting for the new generation of pods to be ready.
52        ImmediatelyPromoteCausingDowntime,
53    }
54
55    #[derive(
56        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
57    )]
58    #[serde(rename_all = "camelCase")]
59    #[kube(
60        namespaced,
61        group = "materialize.cloud",
62        version = "v1alpha1",
63        kind = "Materialize",
64        singular = "materialize",
65        plural = "materializes",
66        shortname = "mzs",
67        status = "MaterializeStatus",
68        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
69        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
70    )]
71    pub struct MaterializeSpec {
72        /// The environmentd image to run.
73        pub environmentd_image_ref: String,
74        /// Extra args to pass to the environmentd binary.
75        pub environmentd_extra_args: Option<Vec<String>>,
76        /// Extra environment variables to pass to the environmentd binary.
77        pub environmentd_extra_env: Option<Vec<EnvVar>>,
78        /// {{<warning>}}
79        /// Deprecated.
80        ///
81        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
82        /// {{</warning>}}
83        ///
84        /// If running in AWS, override the IAM role to use to give
85        /// environmentd access to the persist S3 bucket.
86        #[kube(deprecated)]
87        pub environmentd_iam_role_arn: Option<String>,
88        /// If running in AWS, override the IAM role to use to support
89        /// the CREATE CONNECTION feature.
90        pub environmentd_connection_role_arn: Option<String>,
91        /// Resource requirements for the environmentd pod.
92        pub environmentd_resource_requirements: Option<ResourceRequirements>,
93        /// Amount of disk to allocate, if a storage class is provided.
94        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
95        /// Resource requirements for the balancerd pod.
96        pub balancerd_resource_requirements: Option<ResourceRequirements>,
97        /// Resource requirements for the console pod.
98        pub console_resource_requirements: Option<ResourceRequirements>,
99        /// Number of balancerd pods to create.
100        pub balancerd_replicas: Option<i32>,
101        /// Number of console pods to create.
102        pub console_replicas: Option<i32>,
103
104        /// Name of the kubernetes service account to use.
105        /// If not set, we will create one with the same name as this Materialize object.
106        pub service_account_name: Option<String>,
107        /// Annotations to apply to the service account.
108        ///
109        /// Annotations on service accounts are commonly used by cloud providers for IAM.
110        /// AWS uses "eks.amazonaws.com/role-arn".
111        /// Azure uses "azure.workload.identity/client-id", but
112        /// additionally requires "azure.workload.identity/use": "true" on the pods.
113        pub service_account_annotations: Option<BTreeMap<String, String>>,
114        /// Labels to apply to the service account.
115        pub service_account_labels: Option<BTreeMap<String, String>>,
116        /// Annotations to apply to the pods.
117        pub pod_annotations: Option<BTreeMap<String, String>>,
118        /// Labels to apply to the pods.
119        pub pod_labels: Option<BTreeMap<String, String>>,
120
121        /// When changes are made to the environmentd resources (either via
122        /// modifying fields in the spec here or by deploying a new
123        /// orchestratord version which changes how resources are generated),
124        /// existing environmentd processes won't be automatically restarted.
125        /// In order to trigger a restart, the request_rollout field should be
126        /// set to a new (random) value. Once the rollout completes, the value
127        /// of `status.lastCompletedRolloutRequest` will be set to this value
128        /// to indicate completion.
129        ///
130        /// Defaults to a random value in order to ensure that the first
131        /// generation rollout is automatically triggered.
132        #[serde(default)]
133        pub request_rollout: Uuid,
134        /// If `forcePromote` is set to the same value as `requestRollout`, the
135        /// current rollout will skip waiting for clusters in the new
136        /// generation to rehydrate before promoting the new environmentd to
137        /// leader.
138        #[serde(default)]
139        pub force_promote: Uuid,
140        /// This value will be written to an annotation in the generated
141        /// environmentd statefulset, in order to force the controller to
142        /// detect the generated resources as changed even if no other changes
143        /// happened. This can be used to force a rollout to a new generation
144        /// even without making any meaningful changes, by setting it to the
145        /// same value as `requestRollout`.
146        #[serde(default)]
147        pub force_rollout: Uuid,
148        /// {{<warning>}}
149        /// Deprecated and ignored. Use `rolloutStrategy` instead.
150        /// {{</warning>}}
151        #[kube(deprecated)]
152        #[serde(default)]
153        pub in_place_rollout: bool,
154        /// Rollout strategy to use when upgrading this Materialize instance.
155        #[serde(default)]
156        pub rollout_strategy: MaterializeRolloutStrategy,
157        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
158        /// It may also contain `external_login_password_mz_system`, which will be used as
159        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
160        pub backend_secret_name: String,
161        /// How to authenticate with Materialize.
162        #[serde(default)]
163        pub authenticator_kind: AuthenticatorKind,
164        /// Whether to enable role based access control. Defaults to false.
165        #[serde(default)]
166        pub enable_rbac: bool,
167
168        /// The value used by environmentd (via the --environment-id flag) to
169        /// uniquely identify this instance. Must be globally unique, and
170        /// is required if a license key is not provided.
171        /// NOTE: This value MUST NOT be changed in an existing instance,
172        /// since it affects things like the way data is stored in the persist
173        /// backend.
174        #[serde(default)]
175        pub environment_id: Uuid,
176
177        /// The configuration for generating an x509 certificate using cert-manager for balancerd
178        /// to present to incoming connections.
179        /// The `dnsNames` and `issuerRef` fields are required.
180        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
181        /// The configuration for generating an x509 certificate using cert-manager for the console
182        /// to present to incoming connections.
183        /// The `dnsNames` and `issuerRef` fields are required.
184        /// Not yet implemented.
185        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
186        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
187        /// The `issuerRef` field is required.
188        /// This currently is only used for environmentd, but will eventually support clusterd.
189        /// Not yet implemented.
190        pub internal_certificate_spec: Option<MaterializeCertSpec>,
191    }
192
193    impl Materialize {
194        pub fn backend_secret_name(&self) -> String {
195            self.spec.backend_secret_name.clone()
196        }
197
198        pub fn namespace(&self) -> String {
199            self.meta().namespace.clone().unwrap()
200        }
201
202        pub fn create_service_account(&self) -> bool {
203            self.spec.service_account_name.is_none()
204        }
205
206        pub fn service_account_name(&self) -> String {
207            self.spec
208                .service_account_name
209                .clone()
210                .unwrap_or_else(|| self.name_unchecked())
211        }
212
213        pub fn role_name(&self) -> String {
214            self.name_unchecked()
215        }
216
217        pub fn role_binding_name(&self) -> String {
218            self.name_unchecked()
219        }
220
221        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
222            self.name_prefixed(&format!("environmentd-{generation}"))
223        }
224
225        pub fn environmentd_app_name(&self) -> String {
226            "environmentd".to_owned()
227        }
228
229        pub fn environmentd_service_name(&self) -> String {
230            self.name_prefixed("environmentd")
231        }
232
233        pub fn environmentd_service_internal_fqdn(&self) -> String {
234            format!(
235                "{}.{}.svc.cluster.local",
236                self.environmentd_service_name(),
237                self.meta().namespace.as_ref().unwrap()
238            )
239        }
240
241        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
242            self.name_prefixed(&format!("environmentd-{generation}"))
243        }
244
245        pub fn balancerd_app_name(&self) -> String {
246            "balancerd".to_owned()
247        }
248
249        pub fn environmentd_certificate_name(&self) -> String {
250            self.name_prefixed("environmentd-external")
251        }
252
253        pub fn environmentd_certificate_secret_name(&self) -> String {
254            self.name_prefixed("environmentd-tls")
255        }
256
257        pub fn balancerd_deployment_name(&self) -> String {
258            self.name_prefixed("balancerd")
259        }
260
261        pub fn balancerd_service_name(&self) -> String {
262            self.name_prefixed("balancerd")
263        }
264
265        pub fn console_app_name(&self) -> String {
266            "console".to_owned()
267        }
268
269        pub fn balancerd_external_certificate_name(&self) -> String {
270            self.name_prefixed("balancerd-external")
271        }
272
273        pub fn balancerd_external_certificate_secret_name(&self) -> String {
274            self.name_prefixed("balancerd-external-tls")
275        }
276
277        pub fn balancerd_replicas(&self) -> i32 {
278            self.spec.balancerd_replicas.unwrap_or(2)
279        }
280
281        pub fn console_replicas(&self) -> i32 {
282            self.spec.console_replicas.unwrap_or(2)
283        }
284
285        pub fn console_configmap_name(&self) -> String {
286            self.name_prefixed("console")
287        }
288
289        pub fn console_deployment_name(&self) -> String {
290            self.name_prefixed("console")
291        }
292
293        pub fn console_service_name(&self) -> String {
294            self.name_prefixed("console")
295        }
296
297        pub fn console_external_certificate_name(&self) -> String {
298            self.name_prefixed("console-external")
299        }
300
301        pub fn console_external_certificate_secret_name(&self) -> String {
302            self.name_prefixed("console-external-tls")
303        }
304
305        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
306            self.name_prefixed(&format!("persist-pubsub-{generation}"))
307        }
308
309        pub fn listeners_configmap_name(&self, generation: u64) -> String {
310            self.name_prefixed(&format!("listeners-{generation}"))
311        }
312
313        pub fn name_prefixed(&self, suffix: &str) -> String {
314            format!("mz{}-{}", self.resource_id(), suffix)
315        }
316
317        pub fn resource_id(&self) -> &str {
318            &self.status.as_ref().unwrap().resource_id
319        }
320
321        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
322            self.spec
323                .environmentd_scratch_volume_storage_requirement
324                .clone()
325                .unwrap_or_else(|| {
326                    self.spec
327                        .environmentd_resource_requirements
328                        .as_ref()
329                        .and_then(|requirements| {
330                            requirements
331                                .requests
332                                .as_ref()
333                                .or(requirements.limits.as_ref())
334                        })
335                        // TODO: in cloud, we've been defaulting to twice the
336                        // memory limit, but k8s-openapi doesn't seem to
337                        // provide any way to parse Quantity values, so there
338                        // isn't an easy way to do arithmetic on it
339                        .and_then(|requirements| requirements.get("memory").cloned())
340                        // TODO: is there a better default to use here?
341                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
342                })
343        }
344
345        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
346            format!(
347                "{}-{}-{}-0",
348                cloud_provider, region, self.spec.environment_id,
349            )
350        }
351
352        pub fn requested_reconciliation_id(&self) -> Uuid {
353            self.spec.request_rollout
354        }
355
356        pub fn rollout_requested(&self) -> bool {
357            self.requested_reconciliation_id()
358                != self
359                    .status
360                    .as_ref()
361                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
362        }
363
364        pub fn set_force_promote(&mut self) {
365            self.spec.force_promote = self.spec.request_rollout;
366        }
367
368        pub fn should_force_promote(&self) -> bool {
369            self.spec.force_promote == self.spec.request_rollout
370                || self.spec.rollout_strategy
371                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
372        }
373
374        pub fn conditions_need_update(&self) -> bool {
375            let Some(status) = self.status.as_ref() else {
376                return true;
377            };
378            if status.conditions.is_empty() {
379                return true;
380            }
381            for condition in &status.conditions {
382                if condition.observed_generation != self.meta().generation {
383                    return true;
384                }
385            }
386            false
387        }
388
389        pub fn is_promoting(&self) -> bool {
390            let Some(status) = self.status.as_ref() else {
391                return false;
392            };
393            if status.conditions.is_empty() {
394                return false;
395            }
396            status
397                .conditions
398                .iter()
399                .any(|condition| condition.reason == "Promoting")
400        }
401
402        pub fn update_in_progress(&self) -> bool {
403            let Some(status) = self.status.as_ref() else {
404                return false;
405            };
406            if status.conditions.is_empty() {
407                return false;
408            }
409            for condition in &status.conditions {
410                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
411                    return true;
412                }
413            }
414            false
415        }
416
417        /// Checks that the given version is greater than or equal
418        /// to the existing version, if the existing version
419        /// can be parsed.
420        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
421            let version = parse_image_ref(&self.spec.environmentd_image_ref);
422            match version {
423                Some(version) => &version >= minimum,
424                // In the rare case that we see an image reference
425                // that we can't parse, we assume that it satisfies all
426                // version checks. Usually these are custom images that have
427                // been by a developer on a branch forked from a recent copy
428                // of main, and so this works out reasonably well in practice.
429                None => {
430                    tracing::warn!(
431                        image_ref = %self.spec.environmentd_image_ref,
432                        "failed to parse image ref",
433                    );
434                    true
435                }
436            }
437        }
438
439        /// This check isn't strictly required since environmentd will still be able to determine
440        /// if the upgrade is allowed or not. However, doing this check allows us to provide
441        /// the error as soon as possible and in a more user friendly way.
442        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
443            // Don't allow rolling back
444            // Note: semver comparison handles RC versions correctly:
445            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
446            if next_version < active_version {
447                return false;
448            }
449
450            if active_version.major == 0 {
451                if next_version.major != active_version.major {
452                    if next_version.major == 26 {
453                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
454                        // before upgrading to 26.0.0
455
456                        return (active_version.minor == 147 && active_version.patch >= 20)
457                            || active_version.minor >= 164;
458                    } else {
459                        return false;
460                    }
461                }
462                // Self managed 25.1 to 25.2
463                if next_version.minor == 147 && active_version.minor == 130 {
464                    return true;
465                }
466                // only allow upgrading a single minor version at a time
467                return next_version.minor <= active_version.minor + 1;
468            } else if active_version.major >= 26 {
469                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
470                return next_version.major <= active_version.major + 1;
471            }
472
473            true
474        }
475
476        /// Checks if the current environmentd image ref is within the upgrade window of the last
477        /// successful rollout.
478        pub fn within_upgrade_window(&self) -> bool {
479            let active_environmentd_version = self
480                .status
481                .as_ref()
482                .and_then(|status| {
483                    status
484                        .last_completed_rollout_environmentd_image_ref
485                        .as_ref()
486                })
487                .and_then(|image_ref| parse_image_ref(image_ref));
488
489            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
490                parse_image_ref(&self.spec.environmentd_image_ref),
491                active_environmentd_version,
492            ) {
493                Self::is_valid_upgrade_version(
494                    &active_environmentd_version,
495                    &next_environmentd_version,
496                )
497            } else {
498                // If we fail to parse either version,
499                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
500                true
501            }
502        }
503
504        pub fn status(&self) -> MaterializeStatus {
505            self.status.clone().unwrap_or_else(|| {
506                let mut status = MaterializeStatus::default();
507
508                status.resource_id = new_resource_id();
509
510                // If we're creating the initial status on an un-soft-deleted
511                // Environment we need to ensure that the last active generation
512                // is restored, otherwise the env will crash loop indefinitely
513                // as its catalog would have durably recorded a greater generation
514                if let Some(last_active_generation) = self
515                    .annotations()
516                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
517                {
518                    status.active_generation = last_active_generation
519                        .parse()
520                        .expect("valid int generation");
521                }
522
523                // Initialize the last completed rollout environmentd image ref to
524                // the current image ref if not already set.
525                status.last_completed_rollout_environmentd_image_ref =
526                    Some(self.spec.environmentd_image_ref.clone());
527
528                status
529            })
530        }
531    }
532
533    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
534    #[serde(rename_all = "camelCase")]
535    pub struct MaterializeStatus {
536        /// Resource identifier used as a name prefix to avoid pod name collisions.
537        pub resource_id: String,
538        /// The generation of Materialize pods actively capable of servicing requests.
539        pub active_generation: u64,
540        /// The UUID of the last successfully completed rollout.
541        pub last_completed_rollout_request: Uuid,
542        /// The image ref of the environmentd image that was last successfully rolled out.
543        /// Used to deny upgrades past 1 major version from the last successful rollout.
544        /// When None, we upgrade anyways.
545        pub last_completed_rollout_environmentd_image_ref: Option<String>,
546        /// A hash calculated from the spec of resources to be created based on this Materialize
547        /// spec. This is used for detecting when the existing resources are up to date.
548        /// If you want to trigger a rollout without making other changes that would cause this
549        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
550        pub resources_hash: String,
551        pub conditions: Vec<Condition>,
552    }
553
554    impl MaterializeStatus {
555        pub fn needs_update(&self, other: &Self) -> bool {
556            let now = chrono::offset::Utc::now();
557            let mut a = self.clone();
558            for condition in &mut a.conditions {
559                condition.last_transition_time = Time(now);
560            }
561            let mut b = other.clone();
562            for condition in &mut b.conditions {
563                condition.last_transition_time = Time(now);
564            }
565            a != b
566        }
567    }
568
569    impl ManagedResource for Materialize {
570        fn default_labels(&self) -> BTreeMap<String, String> {
571            BTreeMap::from_iter([
572                (
573                    "materialize.cloud/organization-name".to_owned(),
574                    self.name_unchecked(),
575                ),
576                (
577                    "materialize.cloud/organization-namespace".to_owned(),
578                    self.namespace(),
579                ),
580                (
581                    "materialize.cloud/mz-resource-id".to_owned(),
582                    self.resource_id().to_owned(),
583                ),
584            ])
585        }
586    }
587}
588
589fn parse_image_ref(image_ref: &str) -> Option<Version> {
590    image_ref
591        .rsplit_once(':')
592        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
593        .and_then(|tag| {
594            // To work around Docker tag restrictions, build metadata in
595            // a Docker tag is delimited by `--` rather than the SemVer
596            // `+` delimiter. So we need to swap the delimiter back to
597            // `+` before parsing it as SemVer.
598            let tag = tag.replace("--", "+");
599            Version::parse(&tag).ok()
600        })
601}
602
603#[cfg(test)]
604mod tests {
605    use kube::core::ObjectMeta;
606    use semver::Version;
607
608    use super::v1alpha1::{Materialize, MaterializeSpec};
609
610    #[mz_ore::test]
611    fn meets_minimum_version() {
612        let mut mz = Materialize {
613            spec: MaterializeSpec {
614                environmentd_image_ref:
615                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
616                        .to_owned(),
617                ..Default::default()
618            },
619            metadata: ObjectMeta {
620                ..Default::default()
621            },
622            status: None,
623        };
624
625        // true cases
626        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
627        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
628        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
629        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
630        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
631        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
632        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
633        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
634        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
635        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
636        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
637        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
638        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
639        mz.spec.environmentd_image_ref =
640            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
641                .to_owned();
642        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
643
644        // false cases
645        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
646        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
647        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
648        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
649        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
650        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
651        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
652        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
653    }
654
655    #[mz_ore::test]
656    fn within_upgrade_window() {
657        use super::v1alpha1::MaterializeStatus;
658
659        let mut mz = Materialize {
660            spec: MaterializeSpec {
661                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
662                ..Default::default()
663            },
664            metadata: ObjectMeta {
665                ..Default::default()
666            },
667            status: Some(MaterializeStatus {
668                last_completed_rollout_environmentd_image_ref: Some(
669                    "materialize/environmentd:v26.0.0".to_owned(),
670                ),
671                ..Default::default()
672            }),
673        };
674
675        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
676        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
677        assert!(mz.within_upgrade_window());
678
679        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
680        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
681        assert!(mz.within_upgrade_window());
682
683        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
684        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
685        assert!(!mz.within_upgrade_window());
686
687        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
688        mz.spec.environmentd_image_ref =
689            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
690        assert!(mz.within_upgrade_window());
691
692        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
693        mz.status
694            .as_mut()
695            .unwrap()
696            .last_completed_rollout_environmentd_image_ref =
697            Some("materialize/environmentd:v0.147.20".to_owned());
698        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
699        assert!(mz.within_upgrade_window());
700    }
701
702    #[mz_ore::test]
703    fn is_valid_upgrade_version() {
704        let success_tests = [
705            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
706            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
707            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
708            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
709            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
710            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
711            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
712            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
713            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
714            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
715            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
716            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
717            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
718        ];
719        for (active_version, next_version) in success_tests {
720            assert!(
721                Materialize::is_valid_upgrade_version(&active_version, &next_version),
722                "v{active_version} can upgrade to v{next_version}"
723            );
724        }
725
726        let failure_tests = [
727            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
728            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
729            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
730            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
731            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
732            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
733            // Disallow anything before 0.147.20 to upgrade
734            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
735            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
736            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
737        ];
738        for (active_version, next_version) in failure_tests {
739            assert!(
740                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
741                "v{active_version} can't upgrade to v{next_version}"
742            );
743        }
744    }
745}