mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29    "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32    use super::*;
33
34    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35    pub enum MaterializeRolloutStrategy {
36        /// Create a new generation of pods, leaving the old generation around until the
37        /// new ones are ready to take over.
38        /// This minimizes downtime, and is what almost everyone should use.
39        #[default]
40        WaitUntilReady,
41
42        /// Create a new generation of pods, leaving the old generation as the serving generation
43        /// until the user manually promotes the new generation.
44        ///
45        /// Users can promote the new generation at any time, even if the new generation pods are
46        /// not fully caught up, by setting `forcePromote` to the same value as `requestRollout` in
47        /// the Materialize spec.
48        ///
49        /// {{<warning>}}
50        /// Do not leave new generations unpromoted indefinitely.
51        ///
52        /// The new generation keeps open read holds which prevent compaction. Once promoted or
53        /// cancelled, those read holds are released. If left unpromoted for an extended time, this
54        /// data can build up, and can cause extreme deletion load on the metadata backend database
55        /// when finally promoted or cancelled.
56        /// {{</warning>}}
57        ManuallyPromote,
58
59        /// {{<warning>}}
60        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
61        ///
62        /// This strategy should ONLY be used by customers with physical hardware who do not have
63        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
64        /// consult with Materialize engineering to discuss your situation.
65        /// {{</warning>}}
66        ///
67        /// Tear down the old generation of pods and promote the new generation of pods immediately,
68        /// without waiting for the new generation of pods to be ready.
69        ImmediatelyPromoteCausingDowntime,
70    }
71
72    #[derive(
73        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
74    )]
75    #[serde(rename_all = "camelCase")]
76    #[kube(
77        namespaced,
78        group = "materialize.cloud",
79        version = "v1alpha1",
80        kind = "Materialize",
81        singular = "materialize",
82        plural = "materializes",
83        shortname = "mzs",
84        status = "MaterializeStatus",
85        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
86        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
87        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
88    )]
89    pub struct MaterializeSpec {
90        /// The environmentd image to run.
91        pub environmentd_image_ref: String,
92        /// Extra args to pass to the environmentd binary.
93        pub environmentd_extra_args: Option<Vec<String>>,
94        /// Extra environment variables to pass to the environmentd binary.
95        pub environmentd_extra_env: Option<Vec<EnvVar>>,
96        /// {{<warning>}}
97        /// Deprecated.
98        ///
99        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
100        /// {{</warning>}}
101        ///
102        /// If running in AWS, override the IAM role to use to give
103        /// environmentd access to the persist S3 bucket.
104        #[kube(deprecated)]
105        pub environmentd_iam_role_arn: Option<String>,
106        /// If running in AWS, override the IAM role to use to support
107        /// the CREATE CONNECTION feature.
108        pub environmentd_connection_role_arn: Option<String>,
109        /// Resource requirements for the environmentd pod.
110        pub environmentd_resource_requirements: Option<ResourceRequirements>,
111        /// Amount of disk to allocate, if a storage class is provided.
112        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
113        /// Resource requirements for the balancerd pod.
114        pub balancerd_resource_requirements: Option<ResourceRequirements>,
115        /// Resource requirements for the console pod.
116        pub console_resource_requirements: Option<ResourceRequirements>,
117        /// Number of balancerd pods to create.
118        pub balancerd_replicas: Option<i32>,
119        /// Number of console pods to create.
120        pub console_replicas: Option<i32>,
121
122        /// Name of the kubernetes service account to use.
123        /// If not set, we will create one with the same name as this Materialize object.
124        pub service_account_name: Option<String>,
125        /// Annotations to apply to the service account.
126        ///
127        /// Annotations on service accounts are commonly used by cloud providers for IAM.
128        /// AWS uses "eks.amazonaws.com/role-arn".
129        /// Azure uses "azure.workload.identity/client-id", but
130        /// additionally requires "azure.workload.identity/use": "true" on the pods.
131        pub service_account_annotations: Option<BTreeMap<String, String>>,
132        /// Labels to apply to the service account.
133        pub service_account_labels: Option<BTreeMap<String, String>>,
134        /// Annotations to apply to the pods.
135        pub pod_annotations: Option<BTreeMap<String, String>>,
136        /// Labels to apply to the pods.
137        pub pod_labels: Option<BTreeMap<String, String>>,
138
139        /// When changes are made to the environmentd resources (either via
140        /// modifying fields in the spec here or by deploying a new
141        /// orchestratord version which changes how resources are generated),
142        /// existing environmentd processes won't be automatically restarted.
143        /// In order to trigger a restart, the request_rollout field should be
144        /// set to a new (random) value. Once the rollout completes, the value
145        /// of `status.lastCompletedRolloutRequest` will be set to this value
146        /// to indicate completion.
147        ///
148        /// Defaults to a random value in order to ensure that the first
149        /// generation rollout is automatically triggered.
150        #[serde(default)]
151        pub request_rollout: Uuid,
152        /// If `forcePromote` is set to the same value as `requestRollout`, the
153        /// current rollout will skip waiting for clusters in the new
154        /// generation to rehydrate before promoting the new environmentd to
155        /// leader.
156        #[serde(default)]
157        pub force_promote: Uuid,
158        /// This value will be written to an annotation in the generated
159        /// environmentd statefulset, in order to force the controller to
160        /// detect the generated resources as changed even if no other changes
161        /// happened. This can be used to force a rollout to a new generation
162        /// even without making any meaningful changes, by setting it to the
163        /// same value as `requestRollout`.
164        #[serde(default)]
165        pub force_rollout: Uuid,
166        /// {{<warning>}}
167        /// Deprecated and ignored. Use `rolloutStrategy` instead.
168        /// {{</warning>}}
169        #[kube(deprecated)]
170        #[serde(default)]
171        pub in_place_rollout: bool,
172        /// Rollout strategy to use when upgrading this Materialize instance.
173        #[serde(default)]
174        pub rollout_strategy: MaterializeRolloutStrategy,
175        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
176        /// It may also contain `external_login_password_mz_system`, which will be used as
177        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
178        pub backend_secret_name: String,
179        /// How to authenticate with Materialize.
180        #[serde(default)]
181        pub authenticator_kind: AuthenticatorKind,
182        /// Whether to enable role based access control. Defaults to false.
183        #[serde(default)]
184        pub enable_rbac: bool,
185
186        /// The value used by environmentd (via the --environment-id flag) to
187        /// uniquely identify this instance. Must be globally unique, and
188        /// is required if a license key is not provided.
189        /// NOTE: This value MUST NOT be changed in an existing instance,
190        /// since it affects things like the way data is stored in the persist
191        /// backend.
192        #[serde(default)]
193        pub environment_id: Uuid,
194
195        /// The name of a ConfigMap containing system parameters in JSON format.
196        /// The ConfigMap must contain a `system-params.json` key whose value
197        /// is a valid JSON object containing valid system parameters.
198        ///
199        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
200        ///
201        /// Example ConfigMap:
202        /// ```yaml
203        /// data:
204        ///   system-params.json: |
205        ///     {
206        ///       "max_connections": 1000
207        ///     }
208        /// ```
209        pub system_parameter_configmap_name: Option<String>,
210
211        /// The configuration for generating an x509 certificate using cert-manager for balancerd
212        /// to present to incoming connections.
213        /// The `dnsNames` and `issuerRef` fields are required.
214        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
215        /// The configuration for generating an x509 certificate using cert-manager for the console
216        /// to present to incoming connections.
217        /// The `dnsNames` and `issuerRef` fields are required.
218        /// Not yet implemented.
219        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
220        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
221        /// The `issuerRef` field is required.
222        /// This currently is only used for environmentd, but will eventually support clusterd.
223        /// Not yet implemented.
224        pub internal_certificate_spec: Option<MaterializeCertSpec>,
225    }
226
227    impl Materialize {
228        pub fn backend_secret_name(&self) -> String {
229            self.spec.backend_secret_name.clone()
230        }
231
232        pub fn namespace(&self) -> String {
233            self.meta().namespace.clone().unwrap()
234        }
235
236        pub fn create_service_account(&self) -> bool {
237            self.spec.service_account_name.is_none()
238        }
239
240        pub fn service_account_name(&self) -> String {
241            self.spec
242                .service_account_name
243                .clone()
244                .unwrap_or_else(|| self.name_unchecked())
245        }
246
247        pub fn role_name(&self) -> String {
248            self.name_unchecked()
249        }
250
251        pub fn role_binding_name(&self) -> String {
252            self.name_unchecked()
253        }
254
255        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
256            self.name_prefixed(&format!("environmentd-{generation}"))
257        }
258
259        pub fn environmentd_app_name(&self) -> String {
260            "environmentd".to_owned()
261        }
262
263        pub fn environmentd_service_name(&self) -> String {
264            self.name_prefixed("environmentd")
265        }
266
267        pub fn environmentd_service_internal_fqdn(&self) -> String {
268            format!(
269                "{}.{}.svc.cluster.local",
270                self.environmentd_service_name(),
271                self.meta().namespace.as_ref().unwrap()
272            )
273        }
274
275        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
276            self.name_prefixed(&format!("environmentd-{generation}"))
277        }
278
279        pub fn balancerd_app_name(&self) -> String {
280            "balancerd".to_owned()
281        }
282
283        pub fn environmentd_certificate_name(&self) -> String {
284            self.name_prefixed("environmentd-external")
285        }
286
287        pub fn environmentd_certificate_secret_name(&self) -> String {
288            self.name_prefixed("environmentd-tls")
289        }
290
291        pub fn balancerd_deployment_name(&self) -> String {
292            self.name_prefixed("balancerd")
293        }
294
295        pub fn balancerd_service_name(&self) -> String {
296            self.name_prefixed("balancerd")
297        }
298
299        pub fn console_app_name(&self) -> String {
300            "console".to_owned()
301        }
302
303        pub fn balancerd_external_certificate_name(&self) -> String {
304            self.name_prefixed("balancerd-external")
305        }
306
307        pub fn balancerd_external_certificate_secret_name(&self) -> String {
308            self.name_prefixed("balancerd-external-tls")
309        }
310
311        pub fn balancerd_replicas(&self) -> i32 {
312            self.spec.balancerd_replicas.unwrap_or(2)
313        }
314
315        pub fn console_replicas(&self) -> i32 {
316            self.spec.console_replicas.unwrap_or(2)
317        }
318
319        pub fn console_configmap_name(&self) -> String {
320            self.name_prefixed("console")
321        }
322
323        pub fn console_deployment_name(&self) -> String {
324            self.name_prefixed("console")
325        }
326
327        pub fn console_service_name(&self) -> String {
328            self.name_prefixed("console")
329        }
330
331        pub fn console_external_certificate_name(&self) -> String {
332            self.name_prefixed("console-external")
333        }
334
335        pub fn console_external_certificate_secret_name(&self) -> String {
336            self.name_prefixed("console-external-tls")
337        }
338
339        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
340            self.name_prefixed(&format!("persist-pubsub-{generation}"))
341        }
342
343        pub fn listeners_configmap_name(&self, generation: u64) -> String {
344            self.name_prefixed(&format!("listeners-{generation}"))
345        }
346
347        pub fn name_prefixed(&self, suffix: &str) -> String {
348            format!("mz{}-{}", self.resource_id(), suffix)
349        }
350
351        pub fn resource_id(&self) -> &str {
352            &self.status.as_ref().unwrap().resource_id
353        }
354
355        pub fn system_parameter_configmap_name(&self) -> Option<String> {
356            self.spec.system_parameter_configmap_name.clone()
357        }
358
359        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
360            self.spec
361                .environmentd_scratch_volume_storage_requirement
362                .clone()
363                .unwrap_or_else(|| {
364                    self.spec
365                        .environmentd_resource_requirements
366                        .as_ref()
367                        .and_then(|requirements| {
368                            requirements
369                                .requests
370                                .as_ref()
371                                .or(requirements.limits.as_ref())
372                        })
373                        // TODO: in cloud, we've been defaulting to twice the
374                        // memory limit, but k8s-openapi doesn't seem to
375                        // provide any way to parse Quantity values, so there
376                        // isn't an easy way to do arithmetic on it
377                        .and_then(|requirements| requirements.get("memory").cloned())
378                        // TODO: is there a better default to use here?
379                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
380                })
381        }
382
383        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
384            format!(
385                "{}-{}-{}-0",
386                cloud_provider, region, self.spec.environment_id,
387            )
388        }
389
390        pub fn requested_reconciliation_id(&self) -> Uuid {
391            self.spec.request_rollout
392        }
393
394        pub fn rollout_requested(&self) -> bool {
395            self.requested_reconciliation_id()
396                != self
397                    .status
398                    .as_ref()
399                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
400        }
401
402        pub fn set_force_promote(&mut self) {
403            self.spec.force_promote = self.spec.request_rollout;
404        }
405
406        pub fn should_force_promote(&self) -> bool {
407            self.spec.force_promote == self.spec.request_rollout
408                || self.spec.rollout_strategy
409                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
410        }
411
412        pub fn conditions_need_update(&self) -> bool {
413            let Some(status) = self.status.as_ref() else {
414                return true;
415            };
416            if status.conditions.is_empty() {
417                return true;
418            }
419            for condition in &status.conditions {
420                if condition.observed_generation != self.meta().generation {
421                    return true;
422                }
423            }
424            false
425        }
426
427        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
428            let Some(status) = self.status.as_ref() else {
429                return false;
430            };
431            if status.conditions.is_empty() {
432                return false;
433            }
434            status
435                .conditions
436                .iter()
437                .any(|condition| condition.reason == "ReadyToPromote")
438                && &status.resources_hash == resources_hash
439        }
440
441        pub fn is_promoting(&self) -> bool {
442            let Some(status) = self.status.as_ref() else {
443                return false;
444            };
445            if status.conditions.is_empty() {
446                return false;
447            }
448            status
449                .conditions
450                .iter()
451                .any(|condition| condition.reason == "Promoting")
452        }
453
454        pub fn update_in_progress(&self) -> bool {
455            let Some(status) = self.status.as_ref() else {
456                return false;
457            };
458            if status.conditions.is_empty() {
459                return false;
460            }
461            for condition in &status.conditions {
462                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
463                    return true;
464                }
465            }
466            false
467        }
468
469        /// Checks that the given version is greater than or equal
470        /// to the existing version, if the existing version
471        /// can be parsed.
472        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
473            let version = parse_image_ref(&self.spec.environmentd_image_ref);
474            match version {
475                Some(version) => &version >= minimum,
476                // In the rare case that we see an image reference
477                // that we can't parse, we assume that it satisfies all
478                // version checks. Usually these are custom images that have
479                // been by a developer on a branch forked from a recent copy
480                // of main, and so this works out reasonably well in practice.
481                None => {
482                    tracing::warn!(
483                        image_ref = %self.spec.environmentd_image_ref,
484                        "failed to parse image ref",
485                    );
486                    true
487                }
488            }
489        }
490
491        /// This check isn't strictly required since environmentd will still be able to determine
492        /// if the upgrade is allowed or not. However, doing this check allows us to provide
493        /// the error as soon as possible and in a more user friendly way.
494        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
495            // Don't allow rolling back
496            // Note: semver comparison handles RC versions correctly:
497            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
498            if next_version < active_version {
499                return false;
500            }
501
502            if active_version.major == 0 {
503                if next_version.major != active_version.major {
504                    if next_version.major == 26 {
505                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
506                        // before upgrading to 26.0.0
507
508                        return (active_version.minor == 147 && active_version.patch >= 20)
509                            || active_version.minor >= 164;
510                    } else {
511                        return false;
512                    }
513                }
514                // Self managed 25.1 to 25.2
515                if next_version.minor == 147 && active_version.minor == 130 {
516                    return true;
517                }
518                // only allow upgrading a single minor version at a time
519                return next_version.minor <= active_version.minor + 1;
520            } else if active_version.major >= 26 {
521                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
522                return next_version.major <= active_version.major + 1;
523            }
524
525            true
526        }
527
528        /// Checks if the current environmentd image ref is within the upgrade window of the last
529        /// successful rollout.
530        pub fn within_upgrade_window(&self) -> bool {
531            let active_environmentd_version = self
532                .status
533                .as_ref()
534                .and_then(|status| {
535                    status
536                        .last_completed_rollout_environmentd_image_ref
537                        .as_ref()
538                })
539                .and_then(|image_ref| parse_image_ref(image_ref));
540
541            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
542                parse_image_ref(&self.spec.environmentd_image_ref),
543                active_environmentd_version,
544            ) {
545                Self::is_valid_upgrade_version(
546                    &active_environmentd_version,
547                    &next_environmentd_version,
548                )
549            } else {
550                // If we fail to parse either version,
551                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
552                true
553            }
554        }
555
556        pub fn status(&self) -> MaterializeStatus {
557            self.status.clone().unwrap_or_else(|| {
558                let mut status = MaterializeStatus::default();
559
560                status.resource_id = new_resource_id();
561
562                // If we're creating the initial status on an un-soft-deleted
563                // Environment we need to ensure that the last active generation
564                // is restored, otherwise the env will crash loop indefinitely
565                // as its catalog would have durably recorded a greater generation
566                if let Some(last_active_generation) = self
567                    .annotations()
568                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
569                {
570                    status.active_generation = last_active_generation
571                        .parse()
572                        .expect("valid int generation");
573                }
574
575                // Initialize the last completed rollout environmentd image ref to
576                // the current image ref if not already set.
577                status.last_completed_rollout_environmentd_image_ref =
578                    Some(self.spec.environmentd_image_ref.clone());
579
580                status
581            })
582        }
583    }
584
585    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
586    #[serde(rename_all = "camelCase")]
587    pub struct MaterializeStatus {
588        /// Resource identifier used as a name prefix to avoid pod name collisions.
589        pub resource_id: String,
590        /// The generation of Materialize pods actively capable of servicing requests.
591        pub active_generation: u64,
592        /// The UUID of the last successfully completed rollout.
593        pub last_completed_rollout_request: Uuid,
594        /// The image ref of the environmentd image that was last successfully rolled out.
595        /// Used to deny upgrades past 1 major version from the last successful rollout.
596        /// When None, we upgrade anyways.
597        pub last_completed_rollout_environmentd_image_ref: Option<String>,
598        /// A hash calculated from the spec of resources to be created based on this Materialize
599        /// spec. This is used for detecting when the existing resources are up to date.
600        /// If you want to trigger a rollout without making other changes that would cause this
601        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
602        pub resources_hash: String,
603        pub conditions: Vec<Condition>,
604    }
605
606    impl MaterializeStatus {
607        pub fn needs_update(&self, other: &Self) -> bool {
608            let now = chrono::offset::Utc::now();
609            let mut a = self.clone();
610            for condition in &mut a.conditions {
611                condition.last_transition_time = Time(now);
612            }
613            let mut b = other.clone();
614            for condition in &mut b.conditions {
615                condition.last_transition_time = Time(now);
616            }
617            a != b
618        }
619    }
620
621    impl ManagedResource for Materialize {
622        fn default_labels(&self) -> BTreeMap<String, String> {
623            BTreeMap::from_iter([
624                (
625                    "materialize.cloud/organization-name".to_owned(),
626                    self.name_unchecked(),
627                ),
628                (
629                    "materialize.cloud/organization-namespace".to_owned(),
630                    self.namespace(),
631                ),
632                (
633                    "materialize.cloud/mz-resource-id".to_owned(),
634                    self.resource_id().to_owned(),
635                ),
636            ])
637        }
638    }
639}
640
641fn parse_image_ref(image_ref: &str) -> Option<Version> {
642    image_ref
643        .rsplit_once(':')
644        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
645        .and_then(|tag| {
646            // To work around Docker tag restrictions, build metadata in
647            // a Docker tag is delimited by `--` rather than the SemVer
648            // `+` delimiter. So we need to swap the delimiter back to
649            // `+` before parsing it as SemVer.
650            let tag = tag.replace("--", "+");
651            Version::parse(&tag).ok()
652        })
653}
654
655#[cfg(test)]
656mod tests {
657    use kube::core::ObjectMeta;
658    use semver::Version;
659
660    use super::v1alpha1::{Materialize, MaterializeSpec};
661
662    #[mz_ore::test]
663    fn meets_minimum_version() {
664        let mut mz = Materialize {
665            spec: MaterializeSpec {
666                environmentd_image_ref:
667                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
668                        .to_owned(),
669                ..Default::default()
670            },
671            metadata: ObjectMeta {
672                ..Default::default()
673            },
674            status: None,
675        };
676
677        // true cases
678        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
679        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
680        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
681        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
682        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
683        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
684        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
685        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
686        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
687        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
688        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
689        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
690        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
691        mz.spec.environmentd_image_ref =
692            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
693                .to_owned();
694        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
695
696        // false cases
697        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
698        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
699        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
700        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
701        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
702        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
703        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
704        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705    }
706
707    #[mz_ore::test]
708    fn within_upgrade_window() {
709        use super::v1alpha1::MaterializeStatus;
710
711        let mut mz = Materialize {
712            spec: MaterializeSpec {
713                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
714                ..Default::default()
715            },
716            metadata: ObjectMeta {
717                ..Default::default()
718            },
719            status: Some(MaterializeStatus {
720                last_completed_rollout_environmentd_image_ref: Some(
721                    "materialize/environmentd:v26.0.0".to_owned(),
722                ),
723                ..Default::default()
724            }),
725        };
726
727        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
728        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
729        assert!(mz.within_upgrade_window());
730
731        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
732        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
733        assert!(mz.within_upgrade_window());
734
735        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
736        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
737        assert!(!mz.within_upgrade_window());
738
739        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
740        mz.spec.environmentd_image_ref =
741            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
742        assert!(mz.within_upgrade_window());
743
744        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
745        mz.status
746            .as_mut()
747            .unwrap()
748            .last_completed_rollout_environmentd_image_ref =
749            Some("materialize/environmentd:v0.147.20".to_owned());
750        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
751        assert!(mz.within_upgrade_window());
752    }
753
754    #[mz_ore::test]
755    fn is_valid_upgrade_version() {
756        let success_tests = [
757            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
758            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
759            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
760            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
761            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
762            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
763            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
764            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
765            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
766            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
767            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
768            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
769            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
770        ];
771        for (active_version, next_version) in success_tests {
772            assert!(
773                Materialize::is_valid_upgrade_version(&active_version, &next_version),
774                "v{active_version} can upgrade to v{next_version}"
775            );
776        }
777
778        let failure_tests = [
779            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
780            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
781            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
782            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
783            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
784            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
785            // Disallow anything before 0.147.20 to upgrade
786            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
787            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
788            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
789        ];
790        for (active_version, next_version) in failure_tests {
791            assert!(
792                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
793                "v{active_version} can't upgrade to v{next_version}"
794            );
795        }
796    }
797}