mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29    "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32    use super::*;
33
34    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35    pub enum MaterializeRolloutStrategy {
36        /// Create a new generation of pods, leaving the old generation around until the
37        /// new ones are ready to take over.
38        /// This minimizes downtime, and is what almost everyone should use.
39        #[default]
40        WaitUntilReady,
41
42        /// Create a new generation of pods, leaving the old generation as the serving generation
43        /// until the user manually promotes the new generation.
44        ///
45        /// Users can promote the new generation at any time, even if the new generation pods are
46        /// not fully caught up, by setting `forcePromote` to the same value as `requestRollout` in
47        /// the Materialize spec.
48        ///
49        /// {{<warning>}}
50        /// Do not leave new generations unpromoted indefinitely.
51        ///
52        /// The new generation keeps open read holds which prevent compaction. Once promoted or
53        /// cancelled, those read holds are released. If left unpromoted for an extended time, this
54        /// data can build up, and can cause extreme deletion load on the metadata backend database
55        /// when finally promoted or cancelled.
56        /// {{</warning>}}
57        ManuallyPromote,
58
59        /// {{<warning>}}
60        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
61        ///
62        /// This strategy should ONLY be used by customers with physical hardware who do not have
63        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
64        /// consult with Materialize engineering to discuss your situation.
65        /// {{</warning>}}
66        ///
67        /// Tear down the old generation of pods and promote the new generation of pods immediately,
68        /// without waiting for the new generation of pods to be ready.
69        ImmediatelyPromoteCausingDowntime,
70    }
71
72    #[derive(
73        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
74    )]
75    #[serde(rename_all = "camelCase")]
76    #[kube(
77        namespaced,
78        group = "materialize.cloud",
79        version = "v1alpha1",
80        kind = "Materialize",
81        singular = "materialize",
82        plural = "materializes",
83        shortname = "mzs",
84        status = "MaterializeStatus",
85        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
86        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
87    )]
88    pub struct MaterializeSpec {
89        /// The environmentd image to run.
90        pub environmentd_image_ref: String,
91        /// Extra args to pass to the environmentd binary.
92        pub environmentd_extra_args: Option<Vec<String>>,
93        /// Extra environment variables to pass to the environmentd binary.
94        pub environmentd_extra_env: Option<Vec<EnvVar>>,
95        /// {{<warning>}}
96        /// Deprecated.
97        ///
98        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
99        /// {{</warning>}}
100        ///
101        /// If running in AWS, override the IAM role to use to give
102        /// environmentd access to the persist S3 bucket.
103        #[kube(deprecated)]
104        pub environmentd_iam_role_arn: Option<String>,
105        /// If running in AWS, override the IAM role to use to support
106        /// the CREATE CONNECTION feature.
107        pub environmentd_connection_role_arn: Option<String>,
108        /// Resource requirements for the environmentd pod.
109        pub environmentd_resource_requirements: Option<ResourceRequirements>,
110        /// Amount of disk to allocate, if a storage class is provided.
111        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
112        /// Resource requirements for the balancerd pod.
113        pub balancerd_resource_requirements: Option<ResourceRequirements>,
114        /// Resource requirements for the console pod.
115        pub console_resource_requirements: Option<ResourceRequirements>,
116        /// Number of balancerd pods to create.
117        pub balancerd_replicas: Option<i32>,
118        /// Number of console pods to create.
119        pub console_replicas: Option<i32>,
120
121        /// Name of the kubernetes service account to use.
122        /// If not set, we will create one with the same name as this Materialize object.
123        pub service_account_name: Option<String>,
124        /// Annotations to apply to the service account.
125        ///
126        /// Annotations on service accounts are commonly used by cloud providers for IAM.
127        /// AWS uses "eks.amazonaws.com/role-arn".
128        /// Azure uses "azure.workload.identity/client-id", but
129        /// additionally requires "azure.workload.identity/use": "true" on the pods.
130        pub service_account_annotations: Option<BTreeMap<String, String>>,
131        /// Labels to apply to the service account.
132        pub service_account_labels: Option<BTreeMap<String, String>>,
133        /// Annotations to apply to the pods.
134        pub pod_annotations: Option<BTreeMap<String, String>>,
135        /// Labels to apply to the pods.
136        pub pod_labels: Option<BTreeMap<String, String>>,
137
138        /// When changes are made to the environmentd resources (either via
139        /// modifying fields in the spec here or by deploying a new
140        /// orchestratord version which changes how resources are generated),
141        /// existing environmentd processes won't be automatically restarted.
142        /// In order to trigger a restart, the request_rollout field should be
143        /// set to a new (random) value. Once the rollout completes, the value
144        /// of `status.lastCompletedRolloutRequest` will be set to this value
145        /// to indicate completion.
146        ///
147        /// Defaults to a random value in order to ensure that the first
148        /// generation rollout is automatically triggered.
149        #[serde(default)]
150        pub request_rollout: Uuid,
151        /// If `forcePromote` is set to the same value as `requestRollout`, the
152        /// current rollout will skip waiting for clusters in the new
153        /// generation to rehydrate before promoting the new environmentd to
154        /// leader.
155        #[serde(default)]
156        pub force_promote: Uuid,
157        /// This value will be written to an annotation in the generated
158        /// environmentd statefulset, in order to force the controller to
159        /// detect the generated resources as changed even if no other changes
160        /// happened. This can be used to force a rollout to a new generation
161        /// even without making any meaningful changes, by setting it to the
162        /// same value as `requestRollout`.
163        #[serde(default)]
164        pub force_rollout: Uuid,
165        /// {{<warning>}}
166        /// Deprecated and ignored. Use `rolloutStrategy` instead.
167        /// {{</warning>}}
168        #[kube(deprecated)]
169        #[serde(default)]
170        pub in_place_rollout: bool,
171        /// Rollout strategy to use when upgrading this Materialize instance.
172        #[serde(default)]
173        pub rollout_strategy: MaterializeRolloutStrategy,
174        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
175        /// It may also contain `external_login_password_mz_system`, which will be used as
176        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
177        pub backend_secret_name: String,
178        /// How to authenticate with Materialize.
179        #[serde(default)]
180        pub authenticator_kind: AuthenticatorKind,
181        /// Whether to enable role based access control. Defaults to false.
182        #[serde(default)]
183        pub enable_rbac: bool,
184
185        /// The value used by environmentd (via the --environment-id flag) to
186        /// uniquely identify this instance. Must be globally unique, and
187        /// is required if a license key is not provided.
188        /// NOTE: This value MUST NOT be changed in an existing instance,
189        /// since it affects things like the way data is stored in the persist
190        /// backend.
191        #[serde(default)]
192        pub environment_id: Uuid,
193
194        /// The name of a ConfigMap containing system parameters in JSON format.
195        /// The ConfigMap must contain a `system-params.json` key whose value
196        /// is a valid JSON object containing valid system parameters.
197        ///
198        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
199        ///
200        /// Example ConfigMap:
201        /// ```yaml
202        /// data:
203        ///   system-params.json: |
204        ///     {
205        ///       "max_connections": 1000
206        ///     }
207        /// ```
208        pub system_parameter_configmap_name: Option<String>,
209
210        /// The configuration for generating an x509 certificate using cert-manager for balancerd
211        /// to present to incoming connections.
212        /// The `dnsNames` and `issuerRef` fields are required.
213        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
214        /// The configuration for generating an x509 certificate using cert-manager for the console
215        /// to present to incoming connections.
216        /// The `dnsNames` and `issuerRef` fields are required.
217        /// Not yet implemented.
218        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
219        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
220        /// The `issuerRef` field is required.
221        /// This currently is only used for environmentd, but will eventually support clusterd.
222        /// Not yet implemented.
223        pub internal_certificate_spec: Option<MaterializeCertSpec>,
224    }
225
226    impl Materialize {
227        pub fn backend_secret_name(&self) -> String {
228            self.spec.backend_secret_name.clone()
229        }
230
231        pub fn namespace(&self) -> String {
232            self.meta().namespace.clone().unwrap()
233        }
234
235        pub fn create_service_account(&self) -> bool {
236            self.spec.service_account_name.is_none()
237        }
238
239        pub fn service_account_name(&self) -> String {
240            self.spec
241                .service_account_name
242                .clone()
243                .unwrap_or_else(|| self.name_unchecked())
244        }
245
246        pub fn role_name(&self) -> String {
247            self.name_unchecked()
248        }
249
250        pub fn role_binding_name(&self) -> String {
251            self.name_unchecked()
252        }
253
254        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
255            self.name_prefixed(&format!("environmentd-{generation}"))
256        }
257
258        pub fn environmentd_app_name(&self) -> String {
259            "environmentd".to_owned()
260        }
261
262        pub fn environmentd_service_name(&self) -> String {
263            self.name_prefixed("environmentd")
264        }
265
266        pub fn environmentd_service_internal_fqdn(&self) -> String {
267            format!(
268                "{}.{}.svc.cluster.local",
269                self.environmentd_service_name(),
270                self.meta().namespace.as_ref().unwrap()
271            )
272        }
273
274        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
275            self.name_prefixed(&format!("environmentd-{generation}"))
276        }
277
278        pub fn balancerd_app_name(&self) -> String {
279            "balancerd".to_owned()
280        }
281
282        pub fn environmentd_certificate_name(&self) -> String {
283            self.name_prefixed("environmentd-external")
284        }
285
286        pub fn environmentd_certificate_secret_name(&self) -> String {
287            self.name_prefixed("environmentd-tls")
288        }
289
290        pub fn balancerd_deployment_name(&self) -> String {
291            self.name_prefixed("balancerd")
292        }
293
294        pub fn balancerd_service_name(&self) -> String {
295            self.name_prefixed("balancerd")
296        }
297
298        pub fn console_app_name(&self) -> String {
299            "console".to_owned()
300        }
301
302        pub fn balancerd_external_certificate_name(&self) -> String {
303            self.name_prefixed("balancerd-external")
304        }
305
306        pub fn balancerd_external_certificate_secret_name(&self) -> String {
307            self.name_prefixed("balancerd-external-tls")
308        }
309
310        pub fn balancerd_replicas(&self) -> i32 {
311            self.spec.balancerd_replicas.unwrap_or(2)
312        }
313
314        pub fn console_replicas(&self) -> i32 {
315            self.spec.console_replicas.unwrap_or(2)
316        }
317
318        pub fn console_configmap_name(&self) -> String {
319            self.name_prefixed("console")
320        }
321
322        pub fn console_deployment_name(&self) -> String {
323            self.name_prefixed("console")
324        }
325
326        pub fn console_service_name(&self) -> String {
327            self.name_prefixed("console")
328        }
329
330        pub fn console_external_certificate_name(&self) -> String {
331            self.name_prefixed("console-external")
332        }
333
334        pub fn console_external_certificate_secret_name(&self) -> String {
335            self.name_prefixed("console-external-tls")
336        }
337
338        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
339            self.name_prefixed(&format!("persist-pubsub-{generation}"))
340        }
341
342        pub fn listeners_configmap_name(&self, generation: u64) -> String {
343            self.name_prefixed(&format!("listeners-{generation}"))
344        }
345
346        pub fn name_prefixed(&self, suffix: &str) -> String {
347            format!("mz{}-{}", self.resource_id(), suffix)
348        }
349
350        pub fn resource_id(&self) -> &str {
351            &self.status.as_ref().unwrap().resource_id
352        }
353
354        pub fn system_parameter_configmap_name(&self) -> Option<String> {
355            self.spec.system_parameter_configmap_name.clone()
356        }
357
358        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
359            self.spec
360                .environmentd_scratch_volume_storage_requirement
361                .clone()
362                .unwrap_or_else(|| {
363                    self.spec
364                        .environmentd_resource_requirements
365                        .as_ref()
366                        .and_then(|requirements| {
367                            requirements
368                                .requests
369                                .as_ref()
370                                .or(requirements.limits.as_ref())
371                        })
372                        // TODO: in cloud, we've been defaulting to twice the
373                        // memory limit, but k8s-openapi doesn't seem to
374                        // provide any way to parse Quantity values, so there
375                        // isn't an easy way to do arithmetic on it
376                        .and_then(|requirements| requirements.get("memory").cloned())
377                        // TODO: is there a better default to use here?
378                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
379                })
380        }
381
382        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
383            format!(
384                "{}-{}-{}-0",
385                cloud_provider, region, self.spec.environment_id,
386            )
387        }
388
389        pub fn requested_reconciliation_id(&self) -> Uuid {
390            self.spec.request_rollout
391        }
392
393        pub fn rollout_requested(&self) -> bool {
394            self.requested_reconciliation_id()
395                != self
396                    .status
397                    .as_ref()
398                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
399        }
400
401        pub fn set_force_promote(&mut self) {
402            self.spec.force_promote = self.spec.request_rollout;
403        }
404
405        pub fn should_force_promote(&self) -> bool {
406            self.spec.force_promote == self.spec.request_rollout
407                || self.spec.rollout_strategy
408                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
409        }
410
411        pub fn conditions_need_update(&self) -> bool {
412            let Some(status) = self.status.as_ref() else {
413                return true;
414            };
415            if status.conditions.is_empty() {
416                return true;
417            }
418            for condition in &status.conditions {
419                if condition.observed_generation != self.meta().generation {
420                    return true;
421                }
422            }
423            false
424        }
425
426        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
427            let Some(status) = self.status.as_ref() else {
428                return false;
429            };
430            if status.conditions.is_empty() {
431                return false;
432            }
433            status
434                .conditions
435                .iter()
436                .any(|condition| condition.reason == "ReadyToPromote")
437                && &status.resources_hash == resources_hash
438        }
439
440        pub fn is_promoting(&self) -> bool {
441            let Some(status) = self.status.as_ref() else {
442                return false;
443            };
444            if status.conditions.is_empty() {
445                return false;
446            }
447            status
448                .conditions
449                .iter()
450                .any(|condition| condition.reason == "Promoting")
451        }
452
453        pub fn update_in_progress(&self) -> bool {
454            let Some(status) = self.status.as_ref() else {
455                return false;
456            };
457            if status.conditions.is_empty() {
458                return false;
459            }
460            for condition in &status.conditions {
461                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
462                    return true;
463                }
464            }
465            false
466        }
467
468        /// Checks that the given version is greater than or equal
469        /// to the existing version, if the existing version
470        /// can be parsed.
471        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
472            let version = parse_image_ref(&self.spec.environmentd_image_ref);
473            match version {
474                Some(version) => &version >= minimum,
475                // In the rare case that we see an image reference
476                // that we can't parse, we assume that it satisfies all
477                // version checks. Usually these are custom images that have
478                // been by a developer on a branch forked from a recent copy
479                // of main, and so this works out reasonably well in practice.
480                None => {
481                    tracing::warn!(
482                        image_ref = %self.spec.environmentd_image_ref,
483                        "failed to parse image ref",
484                    );
485                    true
486                }
487            }
488        }
489
490        /// This check isn't strictly required since environmentd will still be able to determine
491        /// if the upgrade is allowed or not. However, doing this check allows us to provide
492        /// the error as soon as possible and in a more user friendly way.
493        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
494            // Don't allow rolling back
495            // Note: semver comparison handles RC versions correctly:
496            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
497            if next_version < active_version {
498                return false;
499            }
500
501            if active_version.major == 0 {
502                if next_version.major != active_version.major {
503                    if next_version.major == 26 {
504                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
505                        // before upgrading to 26.0.0
506
507                        return (active_version.minor == 147 && active_version.patch >= 20)
508                            || active_version.minor >= 164;
509                    } else {
510                        return false;
511                    }
512                }
513                // Self managed 25.1 to 25.2
514                if next_version.minor == 147 && active_version.minor == 130 {
515                    return true;
516                }
517                // only allow upgrading a single minor version at a time
518                return next_version.minor <= active_version.minor + 1;
519            } else if active_version.major >= 26 {
520                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
521                return next_version.major <= active_version.major + 1;
522            }
523
524            true
525        }
526
527        /// Checks if the current environmentd image ref is within the upgrade window of the last
528        /// successful rollout.
529        pub fn within_upgrade_window(&self) -> bool {
530            let active_environmentd_version = self
531                .status
532                .as_ref()
533                .and_then(|status| {
534                    status
535                        .last_completed_rollout_environmentd_image_ref
536                        .as_ref()
537                })
538                .and_then(|image_ref| parse_image_ref(image_ref));
539
540            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
541                parse_image_ref(&self.spec.environmentd_image_ref),
542                active_environmentd_version,
543            ) {
544                Self::is_valid_upgrade_version(
545                    &active_environmentd_version,
546                    &next_environmentd_version,
547                )
548            } else {
549                // If we fail to parse either version,
550                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
551                true
552            }
553        }
554
555        pub fn status(&self) -> MaterializeStatus {
556            self.status.clone().unwrap_or_else(|| {
557                let mut status = MaterializeStatus::default();
558
559                status.resource_id = new_resource_id();
560
561                // If we're creating the initial status on an un-soft-deleted
562                // Environment we need to ensure that the last active generation
563                // is restored, otherwise the env will crash loop indefinitely
564                // as its catalog would have durably recorded a greater generation
565                if let Some(last_active_generation) = self
566                    .annotations()
567                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
568                {
569                    status.active_generation = last_active_generation
570                        .parse()
571                        .expect("valid int generation");
572                }
573
574                // Initialize the last completed rollout environmentd image ref to
575                // the current image ref if not already set.
576                status.last_completed_rollout_environmentd_image_ref =
577                    Some(self.spec.environmentd_image_ref.clone());
578
579                status
580            })
581        }
582    }
583
584    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
585    #[serde(rename_all = "camelCase")]
586    pub struct MaterializeStatus {
587        /// Resource identifier used as a name prefix to avoid pod name collisions.
588        pub resource_id: String,
589        /// The generation of Materialize pods actively capable of servicing requests.
590        pub active_generation: u64,
591        /// The UUID of the last successfully completed rollout.
592        pub last_completed_rollout_request: Uuid,
593        /// The image ref of the environmentd image that was last successfully rolled out.
594        /// Used to deny upgrades past 1 major version from the last successful rollout.
595        /// When None, we upgrade anyways.
596        pub last_completed_rollout_environmentd_image_ref: Option<String>,
597        /// A hash calculated from the spec of resources to be created based on this Materialize
598        /// spec. This is used for detecting when the existing resources are up to date.
599        /// If you want to trigger a rollout without making other changes that would cause this
600        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
601        pub resources_hash: String,
602        pub conditions: Vec<Condition>,
603    }
604
605    impl MaterializeStatus {
606        pub fn needs_update(&self, other: &Self) -> bool {
607            let now = chrono::offset::Utc::now();
608            let mut a = self.clone();
609            for condition in &mut a.conditions {
610                condition.last_transition_time = Time(now);
611            }
612            let mut b = other.clone();
613            for condition in &mut b.conditions {
614                condition.last_transition_time = Time(now);
615            }
616            a != b
617        }
618    }
619
620    impl ManagedResource for Materialize {
621        fn default_labels(&self) -> BTreeMap<String, String> {
622            BTreeMap::from_iter([
623                (
624                    "materialize.cloud/organization-name".to_owned(),
625                    self.name_unchecked(),
626                ),
627                (
628                    "materialize.cloud/organization-namespace".to_owned(),
629                    self.namespace(),
630                ),
631                (
632                    "materialize.cloud/mz-resource-id".to_owned(),
633                    self.resource_id().to_owned(),
634                ),
635            ])
636        }
637    }
638}
639
640fn parse_image_ref(image_ref: &str) -> Option<Version> {
641    image_ref
642        .rsplit_once(':')
643        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
644        .and_then(|tag| {
645            // To work around Docker tag restrictions, build metadata in
646            // a Docker tag is delimited by `--` rather than the SemVer
647            // `+` delimiter. So we need to swap the delimiter back to
648            // `+` before parsing it as SemVer.
649            let tag = tag.replace("--", "+");
650            Version::parse(&tag).ok()
651        })
652}
653
654#[cfg(test)]
655mod tests {
656    use kube::core::ObjectMeta;
657    use semver::Version;
658
659    use super::v1alpha1::{Materialize, MaterializeSpec};
660
661    #[mz_ore::test]
662    fn meets_minimum_version() {
663        let mut mz = Materialize {
664            spec: MaterializeSpec {
665                environmentd_image_ref:
666                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
667                        .to_owned(),
668                ..Default::default()
669            },
670            metadata: ObjectMeta {
671                ..Default::default()
672            },
673            status: None,
674        };
675
676        // true cases
677        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
678        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
679        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
680        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
681        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
682        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
683        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
684        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
685        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
686        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
687        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
688        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
689        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
690        mz.spec.environmentd_image_ref =
691            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
692                .to_owned();
693        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
694
695        // false cases
696        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
697        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
698        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
699        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
700        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
701        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
702        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
703        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
704    }
705
706    #[mz_ore::test]
707    fn within_upgrade_window() {
708        use super::v1alpha1::MaterializeStatus;
709
710        let mut mz = Materialize {
711            spec: MaterializeSpec {
712                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
713                ..Default::default()
714            },
715            metadata: ObjectMeta {
716                ..Default::default()
717            },
718            status: Some(MaterializeStatus {
719                last_completed_rollout_environmentd_image_ref: Some(
720                    "materialize/environmentd:v26.0.0".to_owned(),
721                ),
722                ..Default::default()
723            }),
724        };
725
726        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
727        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
728        assert!(mz.within_upgrade_window());
729
730        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
731        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
732        assert!(mz.within_upgrade_window());
733
734        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
735        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
736        assert!(!mz.within_upgrade_window());
737
738        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
739        mz.spec.environmentd_image_ref =
740            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
741        assert!(mz.within_upgrade_window());
742
743        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
744        mz.status
745            .as_mut()
746            .unwrap()
747            .last_completed_rollout_environmentd_image_ref =
748            Some("materialize/environmentd:v0.147.20".to_owned());
749        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
750        assert!(mz.within_upgrade_window());
751    }
752
753    #[mz_ore::test]
754    fn is_valid_upgrade_version() {
755        let success_tests = [
756            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
757            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
758            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
759            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
760            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
761            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
762            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
763            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
764            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
765            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
766            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
767            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
768            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
769        ];
770        for (active_version, next_version) in success_tests {
771            assert!(
772                Materialize::is_valid_upgrade_version(&active_version, &next_version),
773                "v{active_version} can upgrade to v{next_version}"
774            );
775        }
776
777        let failure_tests = [
778            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
779            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
780            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
781            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
782            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
783            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
784            // Disallow anything before 0.147.20 to upgrade
785            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
786            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
787            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
788        ];
789        for (active_version, next_version) in failure_tests {
790            assert!(
791                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
792                "v{active_version} can't upgrade to v{next_version}"
793            );
794        }
795    }
796}