mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt};
20use schemars::JsonSchema;
21use semver::Version;
22use serde::{Deserialize, Serialize};
23use uuid::Uuid;
24
25use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
26use mz_server_core::listeners::AuthenticatorKind;
27
28pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
29    "materialize.cloud/last-known-active-generation";
30
31pub mod v1alpha1 {
32    use super::*;
33
34    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
35    pub enum MaterializeRolloutStrategy {
36        /// Create a new generation of pods, leaving the old generation around until the
37        /// new ones are ready to take over.
38        /// This minimizes downtime, and is what almost everyone should use.
39        #[default]
40        WaitUntilReady,
41
42        /// Create a new generation of pods, leaving the old generation as the serving generation
43        /// until the user manually promotes the new generation.
44        ///
45        /// When using `ManuallyPromote`, the new generation can be promoted at any
46        /// time, even if it has dataflows that are not fully caught up, by setting
47        /// `forcePromote` to the same value as `requestRollout` in the Materialize spec.
48        ///
49        /// To minimize downtime, promotion should occur when the new generation
50        /// has caught up to the prior generation. To determine if the new
51        /// generation has caught up, consult the `UpToDate` condition in the
52        /// status of the Materialize Resource. If the condition's reason is
53        /// `ReadyToPromote` the new generation is ready to promote.
54        ///
55        /// {{<warning>}}
56        /// Do not leave new generations unpromoted indefinitely.
57        ///
58        /// The new generation keeps open read holds which prevent compaction. Once promoted or
59        /// cancelled, those read holds are released. If left unpromoted for an extended time, this
60        /// data can build up, and can cause extreme deletion load on the metadata backend database
61        /// when finally promoted or cancelled.
62        /// {{</warning>}}
63        ManuallyPromote,
64
65        /// {{<warning>}}
66        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
67        ///
68        /// This strategy should ONLY be used by customers with physical hardware who do not have
69        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
70        /// consult with Materialize engineering to discuss your situation.
71        /// {{</warning>}}
72        ///
73        /// Tear down the old generation of pods and promote the new generation of pods immediately,
74        /// without waiting for the new generation of pods to be ready.
75        ImmediatelyPromoteCausingDowntime,
76    }
77
78    #[derive(
79        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
80    )]
81    #[serde(rename_all = "camelCase")]
82    #[kube(
83        namespaced,
84        group = "materialize.cloud",
85        version = "v1alpha1",
86        kind = "Materialize",
87        singular = "materialize",
88        plural = "materializes",
89        shortname = "mzs",
90        status = "MaterializeStatus",
91        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
92        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
93        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
94    )]
95    pub struct MaterializeSpec {
96        /// The environmentd image to run.
97        pub environmentd_image_ref: String,
98        /// Extra args to pass to the environmentd binary.
99        pub environmentd_extra_args: Option<Vec<String>>,
100        /// Extra environment variables to pass to the environmentd binary.
101        pub environmentd_extra_env: Option<Vec<EnvVar>>,
102        /// {{<warning>}}
103        /// Deprecated.
104        ///
105        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
106        /// {{</warning>}}
107        ///
108        /// If running in AWS, override the IAM role to use to give
109        /// environmentd access to the persist S3 bucket.
110        #[kube(deprecated)]
111        pub environmentd_iam_role_arn: Option<String>,
112        /// If running in AWS, override the IAM role to use to support
113        /// the CREATE CONNECTION feature.
114        pub environmentd_connection_role_arn: Option<String>,
115        /// Resource requirements for the environmentd pod.
116        pub environmentd_resource_requirements: Option<ResourceRequirements>,
117        /// Amount of disk to allocate, if a storage class is provided.
118        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
119        /// Resource requirements for the balancerd pod.
120        pub balancerd_resource_requirements: Option<ResourceRequirements>,
121        /// Resource requirements for the console pod.
122        pub console_resource_requirements: Option<ResourceRequirements>,
123        /// Number of balancerd pods to create.
124        pub balancerd_replicas: Option<i32>,
125        /// Number of console pods to create.
126        pub console_replicas: Option<i32>,
127
128        /// Name of the kubernetes service account to use.
129        /// If not set, we will create one with the same name as this Materialize object.
130        pub service_account_name: Option<String>,
131        /// Annotations to apply to the service account.
132        ///
133        /// Annotations on service accounts are commonly used by cloud providers for IAM.
134        /// AWS uses "eks.amazonaws.com/role-arn".
135        /// Azure uses "azure.workload.identity/client-id", but
136        /// additionally requires "azure.workload.identity/use": "true" on the pods.
137        pub service_account_annotations: Option<BTreeMap<String, String>>,
138        /// Labels to apply to the service account.
139        pub service_account_labels: Option<BTreeMap<String, String>>,
140        /// Annotations to apply to the pods.
141        pub pod_annotations: Option<BTreeMap<String, String>>,
142        /// Labels to apply to the pods.
143        pub pod_labels: Option<BTreeMap<String, String>>,
144
145        /// When changes are made to the environmentd resources (either via
146        /// modifying fields in the spec here or by deploying a new
147        /// orchestratord version which changes how resources are generated),
148        /// existing environmentd processes won't be automatically restarted.
149        /// In order to trigger a restart, the request_rollout field should be
150        /// set to a new (random) value. Once the rollout completes, the value
151        /// of `status.lastCompletedRolloutRequest` will be set to this value
152        /// to indicate completion.
153        ///
154        /// Defaults to a random value in order to ensure that the first
155        /// generation rollout is automatically triggered.
156        #[serde(default)]
157        pub request_rollout: Uuid,
158        /// If `forcePromote` is set to the same value as `requestRollout`, the
159        /// current rollout will skip waiting for clusters in the new
160        /// generation to rehydrate before promoting the new environmentd to
161        /// leader.
162        #[serde(default)]
163        pub force_promote: Uuid,
164        /// This value will be written to an annotation in the generated
165        /// environmentd statefulset, in order to force the controller to
166        /// detect the generated resources as changed even if no other changes
167        /// happened. This can be used to force a rollout to a new generation
168        /// even without making any meaningful changes, by setting it to the
169        /// same value as `requestRollout`.
170        #[serde(default)]
171        pub force_rollout: Uuid,
172        /// {{<warning>}}
173        /// Deprecated and ignored. Use `rolloutStrategy` instead.
174        /// {{</warning>}}
175        #[kube(deprecated)]
176        #[serde(default)]
177        pub in_place_rollout: bool,
178        /// Rollout strategy to use when upgrading this Materialize instance.
179        #[serde(default)]
180        pub rollout_strategy: MaterializeRolloutStrategy,
181        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
182        /// It may also contain `external_login_password_mz_system`, which will be used as
183        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
184        pub backend_secret_name: String,
185        /// How to authenticate with Materialize.
186        #[serde(default)]
187        pub authenticator_kind: AuthenticatorKind,
188        /// Whether to enable role based access control. Defaults to false.
189        #[serde(default)]
190        pub enable_rbac: bool,
191
192        /// The value used by environmentd (via the --environment-id flag) to
193        /// uniquely identify this instance. Must be globally unique, and
194        /// is required if a license key is not provided.
195        /// NOTE: This value MUST NOT be changed in an existing instance,
196        /// since it affects things like the way data is stored in the persist
197        /// backend.
198        #[serde(default)]
199        pub environment_id: Uuid,
200
201        /// The name of a ConfigMap containing system parameters in JSON format.
202        /// The ConfigMap must contain a `system-params.json` key whose value
203        /// is a valid JSON object containing valid system parameters.
204        ///
205        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
206        ///
207        /// Example ConfigMap:
208        /// ```yaml
209        /// data:
210        ///   system-params.json: |
211        ///     {
212        ///       "max_connections": 1000
213        ///     }
214        /// ```
215        pub system_parameter_configmap_name: Option<String>,
216
217        /// The configuration for generating an x509 certificate using cert-manager for balancerd
218        /// to present to incoming connections.
219        /// The `dnsNames` and `issuerRef` fields are required.
220        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
221        /// The configuration for generating an x509 certificate using cert-manager for the console
222        /// to present to incoming connections.
223        /// The `dnsNames` and `issuerRef` fields are required.
224        /// Not yet implemented.
225        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
226        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
227        /// The `issuerRef` field is required.
228        /// This currently is only used for environmentd, but will eventually support clusterd.
229        /// Not yet implemented.
230        pub internal_certificate_spec: Option<MaterializeCertSpec>,
231    }
232
233    impl Materialize {
234        pub fn backend_secret_name(&self) -> String {
235            self.spec.backend_secret_name.clone()
236        }
237
238        pub fn namespace(&self) -> String {
239            self.meta().namespace.clone().unwrap()
240        }
241
242        pub fn create_service_account(&self) -> bool {
243            self.spec.service_account_name.is_none()
244        }
245
246        pub fn service_account_name(&self) -> String {
247            self.spec
248                .service_account_name
249                .clone()
250                .unwrap_or_else(|| self.name_unchecked())
251        }
252
253        pub fn role_name(&self) -> String {
254            self.name_unchecked()
255        }
256
257        pub fn role_binding_name(&self) -> String {
258            self.name_unchecked()
259        }
260
261        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
262            self.name_prefixed(&format!("environmentd-{generation}"))
263        }
264
265        pub fn environmentd_app_name(&self) -> String {
266            "environmentd".to_owned()
267        }
268
269        pub fn environmentd_service_name(&self) -> String {
270            self.name_prefixed("environmentd")
271        }
272
273        pub fn environmentd_service_internal_fqdn(&self) -> String {
274            format!(
275                "{}.{}.svc.cluster.local",
276                self.environmentd_service_name(),
277                self.meta().namespace.as_ref().unwrap()
278            )
279        }
280
281        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
282            self.name_prefixed(&format!("environmentd-{generation}"))
283        }
284
285        pub fn balancerd_app_name(&self) -> String {
286            "balancerd".to_owned()
287        }
288
289        pub fn environmentd_certificate_name(&self) -> String {
290            self.name_prefixed("environmentd-external")
291        }
292
293        pub fn environmentd_certificate_secret_name(&self) -> String {
294            self.name_prefixed("environmentd-tls")
295        }
296
297        pub fn balancerd_deployment_name(&self) -> String {
298            self.name_prefixed("balancerd")
299        }
300
301        pub fn balancerd_service_name(&self) -> String {
302            self.name_prefixed("balancerd")
303        }
304
305        pub fn console_app_name(&self) -> String {
306            "console".to_owned()
307        }
308
309        pub fn balancerd_external_certificate_name(&self) -> String {
310            self.name_prefixed("balancerd-external")
311        }
312
313        pub fn balancerd_external_certificate_secret_name(&self) -> String {
314            self.name_prefixed("balancerd-external-tls")
315        }
316
317        pub fn balancerd_replicas(&self) -> i32 {
318            self.spec.balancerd_replicas.unwrap_or(2)
319        }
320
321        pub fn console_replicas(&self) -> i32 {
322            self.spec.console_replicas.unwrap_or(2)
323        }
324
325        pub fn console_configmap_name(&self) -> String {
326            self.name_prefixed("console")
327        }
328
329        pub fn console_deployment_name(&self) -> String {
330            self.name_prefixed("console")
331        }
332
333        pub fn console_service_name(&self) -> String {
334            self.name_prefixed("console")
335        }
336
337        pub fn console_external_certificate_name(&self) -> String {
338            self.name_prefixed("console-external")
339        }
340
341        pub fn console_external_certificate_secret_name(&self) -> String {
342            self.name_prefixed("console-external-tls")
343        }
344
345        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
346            self.name_prefixed(&format!("persist-pubsub-{generation}"))
347        }
348
349        pub fn listeners_configmap_name(&self, generation: u64) -> String {
350            self.name_prefixed(&format!("listeners-{generation}"))
351        }
352
353        pub fn name_prefixed(&self, suffix: &str) -> String {
354            format!("mz{}-{}", self.resource_id(), suffix)
355        }
356
357        pub fn resource_id(&self) -> &str {
358            &self.status.as_ref().unwrap().resource_id
359        }
360
361        pub fn system_parameter_configmap_name(&self) -> Option<String> {
362            self.spec.system_parameter_configmap_name.clone()
363        }
364
365        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
366            self.spec
367                .environmentd_scratch_volume_storage_requirement
368                .clone()
369                .unwrap_or_else(|| {
370                    self.spec
371                        .environmentd_resource_requirements
372                        .as_ref()
373                        .and_then(|requirements| {
374                            requirements
375                                .requests
376                                .as_ref()
377                                .or(requirements.limits.as_ref())
378                        })
379                        // TODO: in cloud, we've been defaulting to twice the
380                        // memory limit, but k8s-openapi doesn't seem to
381                        // provide any way to parse Quantity values, so there
382                        // isn't an easy way to do arithmetic on it
383                        .and_then(|requirements| requirements.get("memory").cloned())
384                        // TODO: is there a better default to use here?
385                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
386                })
387        }
388
389        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
390            format!(
391                "{}-{}-{}-0",
392                cloud_provider, region, self.spec.environment_id,
393            )
394        }
395
396        pub fn requested_reconciliation_id(&self) -> Uuid {
397            self.spec.request_rollout
398        }
399
400        pub fn rollout_requested(&self) -> bool {
401            self.requested_reconciliation_id()
402                != self
403                    .status
404                    .as_ref()
405                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
406        }
407
408        pub fn set_force_promote(&mut self) {
409            self.spec.force_promote = self.spec.request_rollout;
410        }
411
412        pub fn should_force_promote(&self) -> bool {
413            self.spec.force_promote == self.spec.request_rollout
414                || self.spec.rollout_strategy
415                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
416        }
417
418        pub fn conditions_need_update(&self) -> bool {
419            let Some(status) = self.status.as_ref() else {
420                return true;
421            };
422            if status.conditions.is_empty() {
423                return true;
424            }
425            for condition in &status.conditions {
426                if condition.observed_generation != self.meta().generation {
427                    return true;
428                }
429            }
430            false
431        }
432
433        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
434            let Some(status) = self.status.as_ref() else {
435                return false;
436            };
437            if status.conditions.is_empty() {
438                return false;
439            }
440            status
441                .conditions
442                .iter()
443                .any(|condition| condition.reason == "ReadyToPromote")
444                && &status.resources_hash == resources_hash
445        }
446
447        pub fn is_promoting(&self) -> bool {
448            let Some(status) = self.status.as_ref() else {
449                return false;
450            };
451            if status.conditions.is_empty() {
452                return false;
453            }
454            status
455                .conditions
456                .iter()
457                .any(|condition| condition.reason == "Promoting")
458        }
459
460        pub fn update_in_progress(&self) -> bool {
461            let Some(status) = self.status.as_ref() else {
462                return false;
463            };
464            if status.conditions.is_empty() {
465                return false;
466            }
467            for condition in &status.conditions {
468                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
469                    return true;
470                }
471            }
472            false
473        }
474
475        /// Checks that the given version is greater than or equal
476        /// to the existing version, if the existing version
477        /// can be parsed.
478        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
479            let version = parse_image_ref(&self.spec.environmentd_image_ref);
480            match version {
481                Some(version) => &version >= minimum,
482                // In the rare case that we see an image reference
483                // that we can't parse, we assume that it satisfies all
484                // version checks. Usually these are custom images that have
485                // been by a developer on a branch forked from a recent copy
486                // of main, and so this works out reasonably well in practice.
487                None => {
488                    tracing::warn!(
489                        image_ref = %self.spec.environmentd_image_ref,
490                        "failed to parse image ref",
491                    );
492                    true
493                }
494            }
495        }
496
497        /// This check isn't strictly required since environmentd will still be able to determine
498        /// if the upgrade is allowed or not. However, doing this check allows us to provide
499        /// the error as soon as possible and in a more user friendly way.
500        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
501            // Don't allow rolling back
502            // Note: semver comparison handles RC versions correctly:
503            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
504            if next_version < active_version {
505                return false;
506            }
507
508            if active_version.major == 0 {
509                if next_version.major != active_version.major {
510                    if next_version.major == 26 {
511                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
512                        // before upgrading to 26.0.0
513
514                        return (active_version.minor == 147 && active_version.patch >= 20)
515                            || active_version.minor >= 164;
516                    } else {
517                        return false;
518                    }
519                }
520                // Self managed 25.1 to 25.2
521                if next_version.minor == 147 && active_version.minor == 130 {
522                    return true;
523                }
524                // only allow upgrading a single minor version at a time
525                return next_version.minor <= active_version.minor + 1;
526            } else if active_version.major >= 26 {
527                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
528                return next_version.major <= active_version.major + 1;
529            }
530
531            true
532        }
533
534        /// Checks if the current environmentd image ref is within the upgrade window of the last
535        /// successful rollout.
536        pub fn within_upgrade_window(&self) -> bool {
537            let active_environmentd_version = self
538                .status
539                .as_ref()
540                .and_then(|status| {
541                    status
542                        .last_completed_rollout_environmentd_image_ref
543                        .as_ref()
544                })
545                .and_then(|image_ref| parse_image_ref(image_ref));
546
547            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
548                parse_image_ref(&self.spec.environmentd_image_ref),
549                active_environmentd_version,
550            ) {
551                Self::is_valid_upgrade_version(
552                    &active_environmentd_version,
553                    &next_environmentd_version,
554                )
555            } else {
556                // If we fail to parse either version,
557                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
558                true
559            }
560        }
561
562        pub fn status(&self) -> MaterializeStatus {
563            self.status.clone().unwrap_or_else(|| {
564                let mut status = MaterializeStatus::default();
565
566                status.resource_id = new_resource_id();
567
568                // If we're creating the initial status on an un-soft-deleted
569                // Environment we need to ensure that the last active generation
570                // is restored, otherwise the env will crash loop indefinitely
571                // as its catalog would have durably recorded a greater generation
572                if let Some(last_active_generation) = self
573                    .annotations()
574                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
575                {
576                    status.active_generation = last_active_generation
577                        .parse()
578                        .expect("valid int generation");
579                }
580
581                // Initialize the last completed rollout environmentd image ref to
582                // the current image ref if not already set.
583                status.last_completed_rollout_environmentd_image_ref =
584                    Some(self.spec.environmentd_image_ref.clone());
585
586                status
587            })
588        }
589    }
590
591    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
592    #[serde(rename_all = "camelCase")]
593    pub struct MaterializeStatus {
594        /// Resource identifier used as a name prefix to avoid pod name collisions.
595        pub resource_id: String,
596        /// The generation of Materialize pods actively capable of servicing requests.
597        pub active_generation: u64,
598        /// The UUID of the last successfully completed rollout.
599        pub last_completed_rollout_request: Uuid,
600        /// The image ref of the environmentd image that was last successfully rolled out.
601        /// Used to deny upgrades past 1 major version from the last successful rollout.
602        /// When None, we upgrade anyways.
603        pub last_completed_rollout_environmentd_image_ref: Option<String>,
604        /// A hash calculated from the spec of resources to be created based on this Materialize
605        /// spec. This is used for detecting when the existing resources are up to date.
606        /// If you want to trigger a rollout without making other changes that would cause this
607        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
608        pub resources_hash: String,
609        pub conditions: Vec<Condition>,
610    }
611
612    impl MaterializeStatus {
613        pub fn needs_update(&self, other: &Self) -> bool {
614            let now = chrono::offset::Utc::now();
615            let mut a = self.clone();
616            for condition in &mut a.conditions {
617                condition.last_transition_time = Time(now);
618            }
619            let mut b = other.clone();
620            for condition in &mut b.conditions {
621                condition.last_transition_time = Time(now);
622            }
623            a != b
624        }
625    }
626
627    impl ManagedResource for Materialize {
628        fn default_labels(&self) -> BTreeMap<String, String> {
629            BTreeMap::from_iter([
630                (
631                    "materialize.cloud/organization-name".to_owned(),
632                    self.name_unchecked(),
633                ),
634                (
635                    "materialize.cloud/organization-namespace".to_owned(),
636                    self.namespace(),
637                ),
638                (
639                    "materialize.cloud/mz-resource-id".to_owned(),
640                    self.resource_id().to_owned(),
641                ),
642            ])
643        }
644    }
645}
646
647fn parse_image_ref(image_ref: &str) -> Option<Version> {
648    image_ref
649        .rsplit_once(':')
650        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
651        .and_then(|tag| {
652            // To work around Docker tag restrictions, build metadata in
653            // a Docker tag is delimited by `--` rather than the SemVer
654            // `+` delimiter. So we need to swap the delimiter back to
655            // `+` before parsing it as SemVer.
656            let tag = tag.replace("--", "+");
657            Version::parse(&tag).ok()
658        })
659}
660
661#[cfg(test)]
662mod tests {
663    use kube::core::ObjectMeta;
664    use semver::Version;
665
666    use super::v1alpha1::{Materialize, MaterializeSpec};
667
668    #[mz_ore::test]
669    fn meets_minimum_version() {
670        let mut mz = Materialize {
671            spec: MaterializeSpec {
672                environmentd_image_ref:
673                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
674                        .to_owned(),
675                ..Default::default()
676            },
677            metadata: ObjectMeta {
678                ..Default::default()
679            },
680            status: None,
681        };
682
683        // true cases
684        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
685        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
686        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
687        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
688        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
689        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
690        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
691        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
692        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
693        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
694        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
695        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
696        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
697        mz.spec.environmentd_image_ref =
698            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
699                .to_owned();
700        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
701
702        // false cases
703        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
704        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
706        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
707        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
708        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
709        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
710        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
711    }
712
713    #[mz_ore::test]
714    fn within_upgrade_window() {
715        use super::v1alpha1::MaterializeStatus;
716
717        let mut mz = Materialize {
718            spec: MaterializeSpec {
719                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
720                ..Default::default()
721            },
722            metadata: ObjectMeta {
723                ..Default::default()
724            },
725            status: Some(MaterializeStatus {
726                last_completed_rollout_environmentd_image_ref: Some(
727                    "materialize/environmentd:v26.0.0".to_owned(),
728                ),
729                ..Default::default()
730            }),
731        };
732
733        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
734        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
735        assert!(mz.within_upgrade_window());
736
737        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
738        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
739        assert!(mz.within_upgrade_window());
740
741        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
742        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
743        assert!(!mz.within_upgrade_window());
744
745        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
746        mz.spec.environmentd_image_ref =
747            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
748        assert!(mz.within_upgrade_window());
749
750        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
751        mz.status
752            .as_mut()
753            .unwrap()
754            .last_completed_rollout_environmentd_image_ref =
755            Some("materialize/environmentd:v0.147.20".to_owned());
756        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
757        assert!(mz.within_upgrade_window());
758    }
759
760    #[mz_ore::test]
761    fn is_valid_upgrade_version() {
762        let success_tests = [
763            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
764            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
765            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
766            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
767            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
768            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
769            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
770            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
771            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
772            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
773            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
774            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
775            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
776        ];
777        for (active_version, next_version) in success_tests {
778            assert!(
779                Materialize::is_valid_upgrade_version(&active_version, &next_version),
780                "v{active_version} can upgrade to v{next_version}"
781            );
782        }
783
784        let failure_tests = [
785            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
786            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
787            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
788            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
789            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
790            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
791            // Disallow anything before 0.147.20 to upgrade
792            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
793            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
794            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
795        ];
796        for (active_version, next_version) in failure_tests {
797            assert!(
798                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
799                "v{active_version} can't upgrade to v{next_version}"
800            );
801        }
802    }
803}