Skip to main content

mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18    jiff::Timestamp,
19};
20use kube::{CustomResource, Resource, ResourceExt};
21use schemars::JsonSchema;
22use semver::Version;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
27use mz_server_core::listeners::AuthenticatorKind;
28
29pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
30    "materialize.cloud/last-known-active-generation";
31
32pub mod v1alpha1 {
33    use super::*;
34
35    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36    pub enum MaterializeRolloutStrategy {
37        /// Create a new generation of pods, leaving the old generation around until the
38        /// new ones are ready to take over.
39        /// This minimizes downtime, and is what almost everyone should use.
40        #[default]
41        WaitUntilReady,
42
43        /// Create a new generation of pods, leaving the old generation as the serving generation
44        /// until the user manually promotes the new generation.
45        ///
46        /// When using `ManuallyPromote`, the new generation can be promoted at any
47        /// time, even if it has dataflows that are not fully caught up, by setting
48        /// `forcePromote` to the same value as `requestRollout` in the Materialize spec.
49        ///
50        /// To minimize downtime, promotion should occur when the new generation
51        /// has caught up to the prior generation. To determine if the new
52        /// generation has caught up, consult the `UpToDate` condition in the
53        /// status of the Materialize Resource. If the condition's reason is
54        /// `ReadyToPromote` the new generation is ready to promote.
55        ///
56        /// {{<warning>}}
57        /// Do not leave new generations unpromoted indefinitely.
58        ///
59        /// The new generation keeps open read holds which prevent compaction. Once promoted or
60        /// cancelled, those read holds are released. If left unpromoted for an extended time, this
61        /// data can build up, and can cause extreme deletion load on the metadata backend database
62        /// when finally promoted or cancelled.
63        /// {{</warning>}}
64        ManuallyPromote,
65
66        /// {{<warning>}}
67        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
68        ///
69        /// This strategy should ONLY be used by customers with physical hardware who do not have
70        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
71        /// consult with Materialize engineering to discuss your situation.
72        /// {{</warning>}}
73        ///
74        /// Tear down the old generation of pods and promote the new generation of pods immediately,
75        /// without waiting for the new generation of pods to be ready.
76        ImmediatelyPromoteCausingDowntime,
77    }
78
79    #[derive(
80        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
81    )]
82    #[serde(rename_all = "camelCase")]
83    #[kube(
84        namespaced,
85        group = "materialize.cloud",
86        version = "v1alpha1",
87        kind = "Materialize",
88        singular = "materialize",
89        plural = "materializes",
90        shortname = "mzs",
91        status = "MaterializeStatus",
92        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
93        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
94        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
95    )]
96    pub struct MaterializeSpec {
97        /// The environmentd image to run.
98        pub environmentd_image_ref: String,
99        /// Extra args to pass to the environmentd binary.
100        pub environmentd_extra_args: Option<Vec<String>>,
101        /// Extra environment variables to pass to the environmentd binary.
102        pub environmentd_extra_env: Option<Vec<EnvVar>>,
103        /// {{<warning>}}
104        /// Deprecated.
105        ///
106        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
107        /// {{</warning>}}
108        ///
109        /// If running in AWS, override the IAM role to use to give
110        /// environmentd access to the persist S3 bucket.
111        #[kube(deprecated)]
112        pub environmentd_iam_role_arn: Option<String>,
113        /// If running in AWS, override the IAM role to use to support
114        /// the CREATE CONNECTION feature.
115        pub environmentd_connection_role_arn: Option<String>,
116        /// Resource requirements for the environmentd pod.
117        pub environmentd_resource_requirements: Option<ResourceRequirements>,
118        /// Amount of disk to allocate, if a storage class is provided.
119        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
120        /// Resource requirements for the balancerd pod.
121        pub balancerd_resource_requirements: Option<ResourceRequirements>,
122        /// Resource requirements for the console pod.
123        pub console_resource_requirements: Option<ResourceRequirements>,
124        /// Number of balancerd pods to create.
125        pub balancerd_replicas: Option<i32>,
126        /// Number of console pods to create.
127        pub console_replicas: Option<i32>,
128
129        /// Name of the kubernetes service account to use.
130        /// If not set, we will create one with the same name as this Materialize object.
131        pub service_account_name: Option<String>,
132        /// Annotations to apply to the service account.
133        ///
134        /// Annotations on service accounts are commonly used by cloud providers for IAM.
135        /// AWS uses "eks.amazonaws.com/role-arn".
136        /// Azure uses "azure.workload.identity/client-id", but
137        /// additionally requires "azure.workload.identity/use": "true" on the pods.
138        pub service_account_annotations: Option<BTreeMap<String, String>>,
139        /// Labels to apply to the service account.
140        pub service_account_labels: Option<BTreeMap<String, String>>,
141        /// Annotations to apply to the pods.
142        pub pod_annotations: Option<BTreeMap<String, String>>,
143        /// Labels to apply to the pods.
144        pub pod_labels: Option<BTreeMap<String, String>>,
145
146        /// When changes are made to the environmentd resources (either via
147        /// modifying fields in the spec here or by deploying a new
148        /// orchestratord version which changes how resources are generated),
149        /// existing environmentd processes won't be automatically restarted.
150        /// In order to trigger a restart, the request_rollout field should be
151        /// set to a new (random) value. Once the rollout completes, the value
152        /// of `status.lastCompletedRolloutRequest` will be set to this value
153        /// to indicate completion.
154        ///
155        /// Defaults to a random value in order to ensure that the first
156        /// generation rollout is automatically triggered.
157        #[serde(default)]
158        pub request_rollout: Uuid,
159        /// If `forcePromote` is set to the same value as `requestRollout`, the
160        /// current rollout will skip waiting for clusters in the new
161        /// generation to rehydrate before promoting the new environmentd to
162        /// leader.
163        #[serde(default)]
164        pub force_promote: Uuid,
165        /// This value will be written to an annotation in the generated
166        /// environmentd statefulset, in order to force the controller to
167        /// detect the generated resources as changed even if no other changes
168        /// happened. This can be used to force a rollout to a new generation
169        /// even without making any meaningful changes, by setting it to the
170        /// same value as `requestRollout`.
171        #[serde(default)]
172        pub force_rollout: Uuid,
173        /// {{<warning>}}
174        /// Deprecated and ignored. Use `rolloutStrategy` instead.
175        /// {{</warning>}}
176        #[kube(deprecated)]
177        #[serde(default)]
178        pub in_place_rollout: bool,
179        /// Rollout strategy to use when upgrading this Materialize instance.
180        #[serde(default)]
181        pub rollout_strategy: MaterializeRolloutStrategy,
182        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
183        /// It may also contain `external_login_password_mz_system`, which will be used as
184        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
185        pub backend_secret_name: String,
186        /// How to authenticate with Materialize.
187        #[serde(default)]
188        pub authenticator_kind: AuthenticatorKind,
189        /// Whether to enable role based access control. Defaults to false.
190        #[serde(default)]
191        pub enable_rbac: bool,
192
193        /// The value used by environmentd (via the --environment-id flag) to
194        /// uniquely identify this instance. Must be globally unique, and
195        /// is required if a license key is not provided.
196        /// NOTE: This value MUST NOT be changed in an existing instance,
197        /// since it affects things like the way data is stored in the persist
198        /// backend.
199        #[serde(default)]
200        pub environment_id: Uuid,
201
202        /// The name of a ConfigMap containing system parameters in JSON format.
203        /// The ConfigMap must contain a `system-params.json` key whose value
204        /// is a valid JSON object containing valid system parameters.
205        ///
206        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
207        ///
208        /// Example ConfigMap:
209        /// ```yaml
210        /// data:
211        ///   system-params.json: |
212        ///     {
213        ///       "max_connections": 1000
214        ///     }
215        /// ```
216        pub system_parameter_configmap_name: Option<String>,
217
218        /// The configuration for generating an x509 certificate using cert-manager for balancerd
219        /// to present to incoming connections.
220        /// The `dnsNames` and `issuerRef` fields are required.
221        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
222        /// The configuration for generating an x509 certificate using cert-manager for the console
223        /// to present to incoming connections.
224        /// The `dnsNames` and `issuerRef` fields are required.
225        /// Not yet implemented.
226        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
227        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
228        /// The `issuerRef` field is required.
229        /// This currently is only used for environmentd, but will eventually support clusterd.
230        /// Not yet implemented.
231        pub internal_certificate_spec: Option<MaterializeCertSpec>,
232    }
233
234    impl Materialize {
235        pub fn backend_secret_name(&self) -> String {
236            self.spec.backend_secret_name.clone()
237        }
238
239        pub fn namespace(&self) -> String {
240            self.meta().namespace.clone().unwrap()
241        }
242
243        pub fn create_service_account(&self) -> bool {
244            self.spec.service_account_name.is_none()
245        }
246
247        pub fn service_account_name(&self) -> String {
248            self.spec
249                .service_account_name
250                .clone()
251                .unwrap_or_else(|| self.name_unchecked())
252        }
253
254        pub fn role_name(&self) -> String {
255            self.name_unchecked()
256        }
257
258        pub fn role_binding_name(&self) -> String {
259            self.name_unchecked()
260        }
261
262        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
263            self.name_prefixed(&format!("environmentd-{generation}"))
264        }
265
266        pub fn environmentd_app_name(&self) -> String {
267            "environmentd".to_owned()
268        }
269
270        pub fn environmentd_service_name(&self) -> String {
271            self.name_prefixed("environmentd")
272        }
273
274        pub fn environmentd_service_internal_fqdn(&self) -> String {
275            format!(
276                "{}.{}.svc.cluster.local",
277                self.environmentd_service_name(),
278                self.meta().namespace.as_ref().unwrap()
279            )
280        }
281
282        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
283            self.name_prefixed(&format!("environmentd-{generation}"))
284        }
285
286        pub fn balancerd_app_name(&self) -> String {
287            "balancerd".to_owned()
288        }
289
290        pub fn environmentd_certificate_name(&self) -> String {
291            self.name_prefixed("environmentd-external")
292        }
293
294        pub fn environmentd_certificate_secret_name(&self) -> String {
295            self.name_prefixed("environmentd-tls")
296        }
297
298        pub fn balancerd_deployment_name(&self) -> String {
299            self.name_prefixed("balancerd")
300        }
301
302        pub fn balancerd_service_name(&self) -> String {
303            self.name_prefixed("balancerd")
304        }
305
306        pub fn console_app_name(&self) -> String {
307            "console".to_owned()
308        }
309
310        pub fn balancerd_external_certificate_name(&self) -> String {
311            self.name_prefixed("balancerd-external")
312        }
313
314        pub fn balancerd_external_certificate_secret_name(&self) -> String {
315            self.name_prefixed("balancerd-external-tls")
316        }
317
318        pub fn balancerd_replicas(&self) -> i32 {
319            self.spec.balancerd_replicas.unwrap_or(2)
320        }
321
322        pub fn console_replicas(&self) -> i32 {
323            self.spec.console_replicas.unwrap_or(2)
324        }
325
326        pub fn console_configmap_name(&self) -> String {
327            self.name_prefixed("console")
328        }
329
330        pub fn console_deployment_name(&self) -> String {
331            self.name_prefixed("console")
332        }
333
334        pub fn console_service_name(&self) -> String {
335            self.name_prefixed("console")
336        }
337
338        pub fn console_external_certificate_name(&self) -> String {
339            self.name_prefixed("console-external")
340        }
341
342        pub fn console_external_certificate_secret_name(&self) -> String {
343            self.name_prefixed("console-external-tls")
344        }
345
346        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
347            self.name_prefixed(&format!("persist-pubsub-{generation}"))
348        }
349
350        pub fn listeners_configmap_name(&self, generation: u64) -> String {
351            self.name_prefixed(&format!("listeners-{generation}"))
352        }
353
354        pub fn name_prefixed(&self, suffix: &str) -> String {
355            format!("mz{}-{}", self.resource_id(), suffix)
356        }
357
358        pub fn resource_id(&self) -> &str {
359            &self.status.as_ref().unwrap().resource_id
360        }
361
362        pub fn system_parameter_configmap_name(&self) -> Option<String> {
363            self.spec.system_parameter_configmap_name.clone()
364        }
365
366        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
367            self.spec
368                .environmentd_scratch_volume_storage_requirement
369                .clone()
370                .unwrap_or_else(|| {
371                    self.spec
372                        .environmentd_resource_requirements
373                        .as_ref()
374                        .and_then(|requirements| {
375                            requirements
376                                .requests
377                                .as_ref()
378                                .or(requirements.limits.as_ref())
379                        })
380                        // TODO: in cloud, we've been defaulting to twice the
381                        // memory limit, but k8s-openapi doesn't seem to
382                        // provide any way to parse Quantity values, so there
383                        // isn't an easy way to do arithmetic on it
384                        .and_then(|requirements| requirements.get("memory").cloned())
385                        // TODO: is there a better default to use here?
386                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
387                })
388        }
389
390        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
391            format!(
392                "{}-{}-{}-0",
393                cloud_provider, region, self.spec.environment_id,
394            )
395        }
396
397        pub fn requested_reconciliation_id(&self) -> Uuid {
398            self.spec.request_rollout
399        }
400
401        pub fn rollout_requested(&self) -> bool {
402            self.requested_reconciliation_id()
403                != self
404                    .status
405                    .as_ref()
406                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
407        }
408
409        pub fn set_force_promote(&mut self) {
410            self.spec.force_promote = self.spec.request_rollout;
411        }
412
413        pub fn should_force_promote(&self) -> bool {
414            self.spec.force_promote == self.spec.request_rollout
415                || self.spec.rollout_strategy
416                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
417        }
418
419        pub fn conditions_need_update(&self) -> bool {
420            let Some(status) = self.status.as_ref() else {
421                return true;
422            };
423            if status.conditions.is_empty() {
424                return true;
425            }
426            for condition in &status.conditions {
427                if condition.observed_generation != self.meta().generation {
428                    return true;
429                }
430            }
431            false
432        }
433
434        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
435            let Some(status) = self.status.as_ref() else {
436                return false;
437            };
438            if status.conditions.is_empty() {
439                return false;
440            }
441            status
442                .conditions
443                .iter()
444                .any(|condition| condition.reason == "ReadyToPromote")
445                && &status.resources_hash == resources_hash
446        }
447
448        pub fn is_promoting(&self) -> bool {
449            let Some(status) = self.status.as_ref() else {
450                return false;
451            };
452            if status.conditions.is_empty() {
453                return false;
454            }
455            status
456                .conditions
457                .iter()
458                .any(|condition| condition.reason == "Promoting")
459        }
460
461        pub fn update_in_progress(&self) -> bool {
462            let Some(status) = self.status.as_ref() else {
463                return false;
464            };
465            if status.conditions.is_empty() {
466                return false;
467            }
468            for condition in &status.conditions {
469                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
470                    return true;
471                }
472            }
473            false
474        }
475
476        /// Checks that the given version is greater than or equal
477        /// to the existing version, if the existing version
478        /// can be parsed.
479        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
480            let version = parse_image_ref(&self.spec.environmentd_image_ref);
481            match version {
482                Some(version) => &version >= minimum,
483                // In the rare case that we see an image reference
484                // that we can't parse, we assume that it satisfies all
485                // version checks. Usually these are custom images that have
486                // been by a developer on a branch forked from a recent copy
487                // of main, and so this works out reasonably well in practice.
488                None => {
489                    tracing::warn!(
490                        image_ref = %self.spec.environmentd_image_ref,
491                        "failed to parse image ref",
492                    );
493                    true
494                }
495            }
496        }
497
498        /// This check isn't strictly required since environmentd will still be able to determine
499        /// if the upgrade is allowed or not. However, doing this check allows us to provide
500        /// the error as soon as possible and in a more user friendly way.
501        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
502            // Don't allow rolling back
503            // Note: semver comparison handles RC versions correctly:
504            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
505            if next_version < active_version {
506                return false;
507            }
508
509            if active_version.major == 0 {
510                if next_version.major != active_version.major {
511                    if next_version.major == 26 {
512                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
513                        // before upgrading to 26.0.0
514
515                        return (active_version.minor == 147 && active_version.patch >= 20)
516                            || active_version.minor >= 164;
517                    } else {
518                        return false;
519                    }
520                }
521                // Self managed 25.1 to 25.2
522                if next_version.minor == 147 && active_version.minor == 130 {
523                    return true;
524                }
525                // only allow upgrading a single minor version at a time
526                return next_version.minor <= active_version.minor + 1;
527            } else if active_version.major >= 26 {
528                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
529                return next_version.major <= active_version.major + 1;
530            }
531
532            true
533        }
534
535        /// Checks if the current environmentd image ref is within the upgrade window of the last
536        /// successful rollout.
537        pub fn within_upgrade_window(&self) -> bool {
538            let active_environmentd_version = self
539                .status
540                .as_ref()
541                .and_then(|status| {
542                    status
543                        .last_completed_rollout_environmentd_image_ref
544                        .as_ref()
545                })
546                .and_then(|image_ref| parse_image_ref(image_ref));
547
548            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
549                parse_image_ref(&self.spec.environmentd_image_ref),
550                active_environmentd_version,
551            ) {
552                Self::is_valid_upgrade_version(
553                    &active_environmentd_version,
554                    &next_environmentd_version,
555                )
556            } else {
557                // If we fail to parse either version,
558                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
559                true
560            }
561        }
562
563        pub fn status(&self) -> MaterializeStatus {
564            self.status.clone().unwrap_or_else(|| {
565                let mut status = MaterializeStatus::default();
566
567                status.resource_id = new_resource_id();
568
569                // If we're creating the initial status on an un-soft-deleted
570                // Environment we need to ensure that the last active generation
571                // is restored, otherwise the env will crash loop indefinitely
572                // as its catalog would have durably recorded a greater generation
573                if let Some(last_active_generation) = self
574                    .annotations()
575                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
576                {
577                    status.active_generation = last_active_generation
578                        .parse()
579                        .expect("valid int generation");
580                }
581
582                // Initialize the last completed rollout environmentd image ref to
583                // the current image ref if not already set.
584                status.last_completed_rollout_environmentd_image_ref =
585                    Some(self.spec.environmentd_image_ref.clone());
586
587                status
588            })
589        }
590    }
591
592    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
593    #[serde(rename_all = "camelCase")]
594    pub struct MaterializeStatus {
595        /// Resource identifier used as a name prefix to avoid pod name collisions.
596        pub resource_id: String,
597        /// The generation of Materialize pods actively capable of servicing requests.
598        pub active_generation: u64,
599        /// The UUID of the last successfully completed rollout.
600        pub last_completed_rollout_request: Uuid,
601        /// The image ref of the environmentd image that was last successfully rolled out.
602        /// Used to deny upgrades past 1 major version from the last successful rollout.
603        /// When None, we upgrade anyways.
604        pub last_completed_rollout_environmentd_image_ref: Option<String>,
605        /// A hash calculated from the spec of resources to be created based on this Materialize
606        /// spec. This is used for detecting when the existing resources are up to date.
607        /// If you want to trigger a rollout without making other changes that would cause this
608        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
609        pub resources_hash: String,
610        pub conditions: Vec<Condition>,
611    }
612
613    impl MaterializeStatus {
614        pub fn needs_update(&self, other: &Self) -> bool {
615            let now = Timestamp::now();
616            let mut a = self.clone();
617            for condition in &mut a.conditions {
618                condition.last_transition_time = Time(now);
619            }
620            let mut b = other.clone();
621            for condition in &mut b.conditions {
622                condition.last_transition_time = Time(now);
623            }
624            a != b
625        }
626    }
627
628    impl ManagedResource for Materialize {
629        fn default_labels(&self) -> BTreeMap<String, String> {
630            BTreeMap::from_iter([
631                (
632                    "materialize.cloud/organization-name".to_owned(),
633                    self.name_unchecked(),
634                ),
635                (
636                    "materialize.cloud/organization-namespace".to_owned(),
637                    self.namespace(),
638                ),
639                (
640                    "materialize.cloud/mz-resource-id".to_owned(),
641                    self.resource_id().to_owned(),
642                ),
643            ])
644        }
645    }
646}
647
648fn parse_image_ref(image_ref: &str) -> Option<Version> {
649    image_ref
650        .rsplit_once(':')
651        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
652        .and_then(|tag| {
653            // To work around Docker tag restrictions, build metadata in
654            // a Docker tag is delimited by `--` rather than the SemVer
655            // `+` delimiter. So we need to swap the delimiter back to
656            // `+` before parsing it as SemVer.
657            let tag = tag.replace("--", "+");
658            Version::parse(&tag).ok()
659        })
660}
661
662#[cfg(test)]
663mod tests {
664    use kube::core::ObjectMeta;
665    use semver::Version;
666
667    use super::v1alpha1::{Materialize, MaterializeSpec};
668
669    #[mz_ore::test]
670    fn meets_minimum_version() {
671        let mut mz = Materialize {
672            spec: MaterializeSpec {
673                environmentd_image_ref:
674                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
675                        .to_owned(),
676                ..Default::default()
677            },
678            metadata: ObjectMeta {
679                ..Default::default()
680            },
681            status: None,
682        };
683
684        // true cases
685        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
686        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
687        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
688        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
689        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
690        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
691        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
692        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
693        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
694        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
695        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
696        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
697        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
698        mz.spec.environmentd_image_ref =
699            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
700                .to_owned();
701        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
702
703        // false cases
704        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
705        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
706        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
707        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
708        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
709        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
710        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
711        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
712    }
713
714    #[mz_ore::test]
715    fn within_upgrade_window() {
716        use super::v1alpha1::MaterializeStatus;
717
718        let mut mz = Materialize {
719            spec: MaterializeSpec {
720                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
721                ..Default::default()
722            },
723            metadata: ObjectMeta {
724                ..Default::default()
725            },
726            status: Some(MaterializeStatus {
727                last_completed_rollout_environmentd_image_ref: Some(
728                    "materialize/environmentd:v26.0.0".to_owned(),
729                ),
730                ..Default::default()
731            }),
732        };
733
734        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
735        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
736        assert!(mz.within_upgrade_window());
737
738        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
739        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
740        assert!(mz.within_upgrade_window());
741
742        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
743        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
744        assert!(!mz.within_upgrade_window());
745
746        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
747        mz.spec.environmentd_image_ref =
748            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
749        assert!(mz.within_upgrade_window());
750
751        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
752        mz.status
753            .as_mut()
754            .unwrap()
755            .last_completed_rollout_environmentd_image_ref =
756            Some("materialize/environmentd:v0.147.20".to_owned());
757        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
758        assert!(mz.within_upgrade_window());
759    }
760
761    #[mz_ore::test]
762    fn is_valid_upgrade_version() {
763        let success_tests = [
764            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
765            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
766            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
767            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
768            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
769            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
770            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
771            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
772            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
773            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
774            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
775            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
776            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
777        ];
778        for (active_version, next_version) in success_tests {
779            assert!(
780                Materialize::is_valid_upgrade_version(&active_version, &next_version),
781                "v{active_version} can upgrade to v{next_version}"
782            );
783        }
784
785        let failure_tests = [
786            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
787            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
788            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
789            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
790            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
791            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
792            // Disallow anything before 0.147.20 to upgrade
793            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
794            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
795            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
796        ];
797        for (active_version, next_version) in failure_tests {
798            assert!(
799                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
800                "v{active_version} can't upgrade to v{next_version}"
801            );
802        }
803    }
804}