Skip to main content

mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, Time},
17    },
18    jiff::Timestamp,
19};
20use kube::{CustomResource, Resource, ResourceExt};
21use schemars::JsonSchema;
22use semver::Version;
23use serde::{Deserialize, Serialize};
24use uuid::Uuid;
25
26use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
27use mz_server_core::listeners::AuthenticatorKind;
28
29pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
30    "materialize.cloud/last-known-active-generation";
31
32pub mod v1alpha1 {
33    use super::*;
34
35    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36    pub enum MaterializeRolloutStrategy {
37        /// Create a new generation of pods, leaving the old generation around until the
38        /// new ones are ready to take over.
39        /// This minimizes downtime, and is what almost everyone should use.
40        #[default]
41        WaitUntilReady,
42
43        /// Create a new generation of pods, leaving the old generation as the serving generation
44        /// until the user manually promotes the new generation.
45        ///
46        /// When using `ManuallyPromote`, the new generation can be promoted at any
47        /// time, even if it has dataflows that are not fully caught up, by setting
48        /// `forcePromote` to the same value as `requestRollout` in the Materialize spec.
49        ///
50        /// To minimize downtime, promotion should occur when the new generation
51        /// has caught up to the prior generation. To determine if the new
52        /// generation has caught up, consult the `UpToDate` condition in the
53        /// status of the Materialize Resource. If the condition's reason is
54        /// `ReadyToPromote` the new generation is ready to promote.
55        ///
56        /// {{<warning>}}
57        /// Do not leave new generations unpromoted indefinitely.
58        ///
59        /// The new generation keeps open read holds which prevent compaction. Once promoted or
60        /// cancelled, those read holds are released. If left unpromoted for an extended time, this
61        /// data can build up, and can cause extreme deletion load on the metadata backend database
62        /// when finally promoted or cancelled.
63        /// {{</warning>}}
64        ManuallyPromote,
65
66        /// {{<warning>}}
67        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
68        ///
69        /// This strategy should ONLY be used by customers with physical hardware who do not have
70        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
71        /// consult with Materialize engineering to discuss your situation.
72        /// {{</warning>}}
73        ///
74        /// Tear down the old generation of pods and promote the new generation of pods immediately,
75        /// without waiting for the new generation of pods to be ready.
76        ImmediatelyPromoteCausingDowntime,
77    }
78
79    #[derive(
80        CustomResource,
81        Clone,
82        Debug,
83        Default,
84        PartialEq,
85        Deserialize,
86        Serialize,
87        JsonSchema
88    )]
89    #[serde(rename_all = "camelCase")]
90    #[kube(
91        namespaced,
92        group = "materialize.cloud",
93        version = "v1alpha1",
94        kind = "Materialize",
95        singular = "materialize",
96        plural = "materializes",
97        shortname = "mzs",
98        status = "MaterializeStatus",
99        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
100        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
101        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
102    )]
103    pub struct MaterializeSpec {
104        /// The environmentd image to run.
105        pub environmentd_image_ref: String,
106        /// Extra args to pass to the environmentd binary.
107        pub environmentd_extra_args: Option<Vec<String>>,
108        /// Extra environment variables to pass to the environmentd binary.
109        pub environmentd_extra_env: Option<Vec<EnvVar>>,
110        /// {{<warning>}}
111        /// Deprecated.
112        ///
113        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
114        /// {{</warning>}}
115        ///
116        /// If running in AWS, override the IAM role to use to give
117        /// environmentd access to the persist S3 bucket.
118        #[kube(deprecated)]
119        pub environmentd_iam_role_arn: Option<String>,
120        /// If running in AWS, override the IAM role to use to support
121        /// the CREATE CONNECTION feature.
122        pub environmentd_connection_role_arn: Option<String>,
123        /// Resource requirements for the environmentd pod.
124        pub environmentd_resource_requirements: Option<ResourceRequirements>,
125        /// Amount of disk to allocate, if a storage class is provided.
126        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
127        /// Resource requirements for the balancerd pod.
128        pub balancerd_resource_requirements: Option<ResourceRequirements>,
129        /// Resource requirements for the console pod.
130        pub console_resource_requirements: Option<ResourceRequirements>,
131        /// Number of balancerd pods to create.
132        pub balancerd_replicas: Option<i32>,
133        /// Number of console pods to create.
134        pub console_replicas: Option<i32>,
135
136        /// Name of the kubernetes service account to use.
137        /// If not set, we will create one with the same name as this Materialize object.
138        pub service_account_name: Option<String>,
139        /// Annotations to apply to the service account.
140        ///
141        /// Annotations on service accounts are commonly used by cloud providers for IAM.
142        /// AWS uses "eks.amazonaws.com/role-arn".
143        /// Azure uses "azure.workload.identity/client-id", but
144        /// additionally requires "azure.workload.identity/use": "true" on the pods.
145        pub service_account_annotations: Option<BTreeMap<String, String>>,
146        /// Labels to apply to the service account.
147        pub service_account_labels: Option<BTreeMap<String, String>>,
148        /// Annotations to apply to the pods.
149        pub pod_annotations: Option<BTreeMap<String, String>>,
150        /// Labels to apply to the pods.
151        pub pod_labels: Option<BTreeMap<String, String>>,
152
153        /// When changes are made to the environmentd resources (either via
154        /// modifying fields in the spec here or by deploying a new
155        /// orchestratord version which changes how resources are generated),
156        /// existing environmentd processes won't be automatically restarted.
157        /// In order to trigger a restart, the request_rollout field should be
158        /// set to a new (random) value. Once the rollout completes, the value
159        /// of `status.lastCompletedRolloutRequest` will be set to this value
160        /// to indicate completion.
161        ///
162        /// Defaults to a random value in order to ensure that the first
163        /// generation rollout is automatically triggered.
164        #[serde(default)]
165        pub request_rollout: Uuid,
166        /// If `forcePromote` is set to the same value as `requestRollout`, the
167        /// current rollout will skip waiting for clusters in the new
168        /// generation to rehydrate before promoting the new environmentd to
169        /// leader.
170        #[serde(default)]
171        pub force_promote: Uuid,
172        /// This value will be written to an annotation in the generated
173        /// environmentd statefulset, in order to force the controller to
174        /// detect the generated resources as changed even if no other changes
175        /// happened. This can be used to force a rollout to a new generation
176        /// even without making any meaningful changes, by setting it to the
177        /// same value as `requestRollout`.
178        #[serde(default)]
179        pub force_rollout: Uuid,
180        /// {{<warning>}}
181        /// Deprecated and ignored. Use `rolloutStrategy` instead.
182        /// {{</warning>}}
183        #[kube(deprecated)]
184        #[serde(default)]
185        pub in_place_rollout: bool,
186        /// Rollout strategy to use when upgrading this Materialize instance.
187        #[serde(default)]
188        pub rollout_strategy: MaterializeRolloutStrategy,
189        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
190        /// It may also contain `external_login_password_mz_system`, which will be used as
191        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
192        pub backend_secret_name: String,
193        /// How to authenticate with Materialize.
194        #[serde(default)]
195        pub authenticator_kind: AuthenticatorKind,
196        /// Whether to enable role based access control. Defaults to false.
197        #[serde(default)]
198        pub enable_rbac: bool,
199
200        /// The value used by environmentd (via the --environment-id flag) to
201        /// uniquely identify this instance. Must be globally unique, and
202        /// is required if a license key is not provided.
203        /// NOTE: This value MUST NOT be changed in an existing instance,
204        /// since it affects things like the way data is stored in the persist
205        /// backend.
206        #[serde(default)]
207        pub environment_id: Uuid,
208
209        /// The name of a ConfigMap containing system parameters in JSON format.
210        /// The ConfigMap must contain a `system-params.json` key whose value
211        /// is a valid JSON object containing valid system parameters.
212        ///
213        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
214        ///
215        /// Example ConfigMap:
216        /// ```yaml
217        /// data:
218        ///   system-params.json: |
219        ///     {
220        ///       "max_connections": 1000
221        ///     }
222        /// ```
223        pub system_parameter_configmap_name: Option<String>,
224
225        /// The configuration for generating an x509 certificate using cert-manager for balancerd
226        /// to present to incoming connections.
227        /// The `dnsNames` and `issuerRef` fields are required.
228        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
229        /// The configuration for generating an x509 certificate using cert-manager for the console
230        /// to present to incoming connections.
231        /// The `dnsNames` and `issuerRef` fields are required.
232        /// Not yet implemented.
233        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
234        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
235        /// The `issuerRef` field is required.
236        /// This currently is only used for environmentd, but will eventually support clusterd.
237        /// Not yet implemented.
238        pub internal_certificate_spec: Option<MaterializeCertSpec>,
239    }
240
241    impl Materialize {
242        pub fn backend_secret_name(&self) -> String {
243            self.spec.backend_secret_name.clone()
244        }
245
246        pub fn namespace(&self) -> String {
247            self.meta().namespace.clone().unwrap()
248        }
249
250        pub fn create_service_account(&self) -> bool {
251            self.spec.service_account_name.is_none()
252        }
253
254        pub fn service_account_name(&self) -> String {
255            self.spec
256                .service_account_name
257                .clone()
258                .unwrap_or_else(|| self.name_unchecked())
259        }
260
261        pub fn role_name(&self) -> String {
262            self.name_unchecked()
263        }
264
265        pub fn role_binding_name(&self) -> String {
266            self.name_unchecked()
267        }
268
269        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
270            self.name_prefixed(&format!("environmentd-{generation}"))
271        }
272
273        pub fn environmentd_app_name(&self) -> String {
274            "environmentd".to_owned()
275        }
276
277        pub fn environmentd_service_name(&self) -> String {
278            self.name_prefixed("environmentd")
279        }
280
281        pub fn environmentd_service_internal_fqdn(&self) -> String {
282            format!(
283                "{}.{}.svc.cluster.local",
284                self.environmentd_service_name(),
285                self.meta().namespace.as_ref().unwrap()
286            )
287        }
288
289        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
290            self.name_prefixed(&format!("environmentd-{generation}"))
291        }
292
293        pub fn balancerd_app_name(&self) -> String {
294            "balancerd".to_owned()
295        }
296
297        pub fn environmentd_certificate_name(&self) -> String {
298            self.name_prefixed("environmentd-external")
299        }
300
301        pub fn environmentd_certificate_secret_name(&self) -> String {
302            self.name_prefixed("environmentd-tls")
303        }
304
305        pub fn balancerd_deployment_name(&self) -> String {
306            self.name_prefixed("balancerd")
307        }
308
309        pub fn balancerd_service_name(&self) -> String {
310            self.name_prefixed("balancerd")
311        }
312
313        pub fn console_app_name(&self) -> String {
314            "console".to_owned()
315        }
316
317        pub fn balancerd_external_certificate_name(&self) -> String {
318            self.name_prefixed("balancerd-external")
319        }
320
321        pub fn balancerd_external_certificate_secret_name(&self) -> String {
322            self.name_prefixed("balancerd-external-tls")
323        }
324
325        pub fn balancerd_replicas(&self) -> i32 {
326            self.spec.balancerd_replicas.unwrap_or(2)
327        }
328
329        pub fn console_replicas(&self) -> i32 {
330            self.spec.console_replicas.unwrap_or(2)
331        }
332
333        pub fn console_configmap_name(&self) -> String {
334            self.name_prefixed("console")
335        }
336
337        pub fn console_deployment_name(&self) -> String {
338            self.name_prefixed("console")
339        }
340
341        pub fn console_service_name(&self) -> String {
342            self.name_prefixed("console")
343        }
344
345        pub fn console_external_certificate_name(&self) -> String {
346            self.name_prefixed("console-external")
347        }
348
349        pub fn console_external_certificate_secret_name(&self) -> String {
350            self.name_prefixed("console-external-tls")
351        }
352
353        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
354            self.name_prefixed(&format!("persist-pubsub-{generation}"))
355        }
356
357        pub fn listeners_configmap_name(&self, generation: u64) -> String {
358            self.name_prefixed(&format!("listeners-{generation}"))
359        }
360
361        pub fn name_prefixed(&self, suffix: &str) -> String {
362            format!("mz{}-{}", self.resource_id(), suffix)
363        }
364
365        pub fn resource_id(&self) -> &str {
366            &self.status.as_ref().unwrap().resource_id
367        }
368
369        pub fn system_parameter_configmap_name(&self) -> Option<String> {
370            self.spec.system_parameter_configmap_name.clone()
371        }
372
373        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
374            self.spec
375                .environmentd_scratch_volume_storage_requirement
376                .clone()
377                .unwrap_or_else(|| {
378                    self.spec
379                        .environmentd_resource_requirements
380                        .as_ref()
381                        .and_then(|requirements| {
382                            requirements
383                                .requests
384                                .as_ref()
385                                .or(requirements.limits.as_ref())
386                        })
387                        // TODO: in cloud, we've been defaulting to twice the
388                        // memory limit, but k8s-openapi doesn't seem to
389                        // provide any way to parse Quantity values, so there
390                        // isn't an easy way to do arithmetic on it
391                        .and_then(|requirements| requirements.get("memory").cloned())
392                        // TODO: is there a better default to use here?
393                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
394                })
395        }
396
397        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
398            format!(
399                "{}-{}-{}-0",
400                cloud_provider, region, self.spec.environment_id,
401            )
402        }
403
404        pub fn requested_reconciliation_id(&self) -> Uuid {
405            self.spec.request_rollout
406        }
407
408        pub fn rollout_requested(&self) -> bool {
409            self.requested_reconciliation_id()
410                != self
411                    .status
412                    .as_ref()
413                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
414        }
415
416        pub fn set_force_promote(&mut self) {
417            self.spec.force_promote = self.spec.request_rollout;
418        }
419
420        pub fn should_force_promote(&self) -> bool {
421            self.spec.force_promote == self.spec.request_rollout
422                || self.spec.rollout_strategy
423                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
424        }
425
426        pub fn conditions_need_update(&self) -> bool {
427            let Some(status) = self.status.as_ref() else {
428                return true;
429            };
430            if status.conditions.is_empty() {
431                return true;
432            }
433            for condition in &status.conditions {
434                if condition.observed_generation != self.meta().generation {
435                    return true;
436                }
437            }
438            false
439        }
440
441        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
442            let Some(status) = self.status.as_ref() else {
443                return false;
444            };
445            if status.conditions.is_empty() {
446                return false;
447            }
448            status
449                .conditions
450                .iter()
451                .any(|condition| condition.reason == "ReadyToPromote")
452                && &status.resources_hash == resources_hash
453        }
454
455        pub fn is_promoting(&self) -> bool {
456            let Some(status) = self.status.as_ref() else {
457                return false;
458            };
459            if status.conditions.is_empty() {
460                return false;
461            }
462            status
463                .conditions
464                .iter()
465                .any(|condition| condition.reason == "Promoting")
466        }
467
468        pub fn update_in_progress(&self) -> bool {
469            let Some(status) = self.status.as_ref() else {
470                return false;
471            };
472            if status.conditions.is_empty() {
473                return false;
474            }
475            for condition in &status.conditions {
476                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
477                    return true;
478                }
479            }
480            false
481        }
482
483        /// Checks that the given version is greater than or equal
484        /// to the existing version, if the existing version
485        /// can be parsed.
486        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
487            let version = parse_image_ref(&self.spec.environmentd_image_ref);
488            match version {
489                // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
490                Some(version) => version.cmp_precedence(minimum).is_ge(),
491                // In the rare case that we see an image reference
492                // that we can't parse, we assume that it satisfies all
493                // version checks. Usually these are custom images that have
494                // been by a developer on a branch forked from a recent copy
495                // of main, and so this works out reasonably well in practice.
496                None => {
497                    tracing::warn!(
498                        image_ref = %self.spec.environmentd_image_ref,
499                        "failed to parse image ref",
500                    );
501                    true
502                }
503            }
504        }
505
506        /// This check isn't strictly required since environmentd will still be able to determine
507        /// if the upgrade is allowed or not. However, doing this check allows us to provide
508        /// the error as soon as possible and in a more user friendly way.
509        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
510            // Don't allow rolling back
511            // Note: semver comparison handles RC versions correctly:
512            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
513            // Use cmp_precedence() to ignore build metadata
514            if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
515                return false;
516            }
517
518            if active_version.major == 0 {
519                if next_version.major != active_version.major {
520                    if next_version.major == 26 {
521                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
522                        // before upgrading to 26.0.0
523
524                        return (active_version.minor == 147 && active_version.patch >= 20)
525                            || active_version.minor >= 164;
526                    } else {
527                        return false;
528                    }
529                }
530                // Self managed 25.1 to 25.2
531                if next_version.minor == 147 && active_version.minor == 130 {
532                    return true;
533                }
534                // only allow upgrading a single minor version at a time
535                return next_version.minor <= active_version.minor + 1;
536            } else if active_version.major >= 26 {
537                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
538                return next_version.major <= active_version.major + 1;
539            }
540
541            true
542        }
543
544        /// Checks if the current environmentd image ref is within the upgrade window of the last
545        /// successful rollout.
546        pub fn within_upgrade_window(&self) -> bool {
547            let active_environmentd_version = self
548                .status
549                .as_ref()
550                .and_then(|status| {
551                    status
552                        .last_completed_rollout_environmentd_image_ref
553                        .as_ref()
554                })
555                .and_then(|image_ref| parse_image_ref(image_ref));
556
557            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
558                parse_image_ref(&self.spec.environmentd_image_ref),
559                active_environmentd_version,
560            ) {
561                Self::is_valid_upgrade_version(
562                    &active_environmentd_version,
563                    &next_environmentd_version,
564                )
565            } else {
566                // If we fail to parse either version,
567                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
568                true
569            }
570        }
571
572        pub fn status(&self) -> MaterializeStatus {
573            self.status.clone().unwrap_or_else(|| {
574                let mut status = MaterializeStatus::default();
575
576                status.resource_id = new_resource_id();
577
578                // If we're creating the initial status on an un-soft-deleted
579                // Environment we need to ensure that the last active generation
580                // is restored, otherwise the env will crash loop indefinitely
581                // as its catalog would have durably recorded a greater generation
582                if let Some(last_active_generation) = self
583                    .annotations()
584                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
585                {
586                    status.active_generation = last_active_generation
587                        .parse()
588                        .expect("valid int generation");
589                }
590
591                // Initialize the last completed rollout environmentd image ref to
592                // the current image ref if not already set.
593                status.last_completed_rollout_environmentd_image_ref =
594                    Some(self.spec.environmentd_image_ref.clone());
595
596                status
597            })
598        }
599    }
600
601    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
602    #[serde(rename_all = "camelCase")]
603    pub struct MaterializeStatus {
604        /// Resource identifier used as a name prefix to avoid pod name collisions.
605        pub resource_id: String,
606        /// The generation of Materialize pods actively capable of servicing requests.
607        pub active_generation: u64,
608        /// The UUID of the last successfully completed rollout.
609        pub last_completed_rollout_request: Uuid,
610        /// The image ref of the environmentd image that was last successfully rolled out.
611        /// Used to deny upgrades past 1 major version from the last successful rollout.
612        /// When None, we upgrade anyways.
613        pub last_completed_rollout_environmentd_image_ref: Option<String>,
614        /// A hash calculated from the spec of resources to be created based on this Materialize
615        /// spec. This is used for detecting when the existing resources are up to date.
616        /// If you want to trigger a rollout without making other changes that would cause this
617        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
618        pub resources_hash: String,
619        pub conditions: Vec<Condition>,
620    }
621
622    impl MaterializeStatus {
623        pub fn needs_update(&self, other: &Self) -> bool {
624            let now = Timestamp::now();
625            let mut a = self.clone();
626            for condition in &mut a.conditions {
627                condition.last_transition_time = Time(now);
628            }
629            let mut b = other.clone();
630            for condition in &mut b.conditions {
631                condition.last_transition_time = Time(now);
632            }
633            a != b
634        }
635    }
636
637    impl ManagedResource for Materialize {
638        fn default_labels(&self) -> BTreeMap<String, String> {
639            BTreeMap::from_iter([
640                (
641                    "materialize.cloud/organization-name".to_owned(),
642                    self.name_unchecked(),
643                ),
644                (
645                    "materialize.cloud/organization-namespace".to_owned(),
646                    self.namespace(),
647                ),
648                (
649                    "materialize.cloud/mz-resource-id".to_owned(),
650                    self.resource_id().to_owned(),
651                ),
652            ])
653        }
654    }
655}
656
657fn parse_image_ref(image_ref: &str) -> Option<Version> {
658    image_ref
659        .rsplit_once(':')
660        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
661        .and_then(|tag| {
662            // To work around Docker tag restrictions, build metadata in
663            // a Docker tag is delimited by `--` rather than the SemVer
664            // `+` delimiter. So we need to swap the delimiter back to
665            // `+` before parsing it as SemVer.
666            let tag = tag.replace("--", "+");
667            Version::parse(&tag).ok()
668        })
669}
670
671#[cfg(test)]
672mod tests {
673    use kube::core::ObjectMeta;
674    use semver::Version;
675
676    use super::v1alpha1::{Materialize, MaterializeSpec};
677
678    #[mz_ore::test]
679    fn meets_minimum_version() {
680        let mut mz = Materialize {
681            spec: MaterializeSpec {
682                environmentd_image_ref:
683                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
684                        .to_owned(),
685                ..Default::default()
686            },
687            metadata: ObjectMeta {
688                ..Default::default()
689            },
690            status: None,
691        };
692
693        // true cases
694        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
695        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
696        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
697        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
698        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
699        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
700        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
701        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
702        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
703        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
704        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
705        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
706        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
707        mz.spec.environmentd_image_ref =
708            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
709                .to_owned();
710        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
711
712        // false cases
713        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
714        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
715        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
716        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
717        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
718        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
719        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
720        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
721    }
722
723    #[mz_ore::test]
724    fn within_upgrade_window() {
725        use super::v1alpha1::MaterializeStatus;
726
727        let mut mz = Materialize {
728            spec: MaterializeSpec {
729                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
730                ..Default::default()
731            },
732            metadata: ObjectMeta {
733                ..Default::default()
734            },
735            status: Some(MaterializeStatus {
736                last_completed_rollout_environmentd_image_ref: Some(
737                    "materialize/environmentd:v26.0.0".to_owned(),
738                ),
739                ..Default::default()
740            }),
741        };
742
743        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
744        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
745        assert!(mz.within_upgrade_window());
746
747        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
748        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
749        assert!(mz.within_upgrade_window());
750
751        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
752        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
753        assert!(!mz.within_upgrade_window());
754
755        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
756        mz.spec.environmentd_image_ref =
757            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
758        assert!(mz.within_upgrade_window());
759
760        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
761        mz.status
762            .as_mut()
763            .unwrap()
764            .last_completed_rollout_environmentd_image_ref =
765            Some("materialize/environmentd:v0.147.20".to_owned());
766        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
767        assert!(mz.within_upgrade_window());
768
769        // Pass: upgrading from 26.11.0-dev.0+b to 26.11.0-dev.0+a (same major.minor.patch.prerelease, different build metadata)
770        mz.status
771            .as_mut()
772            .unwrap()
773            .last_completed_rollout_environmentd_image_ref =
774            Some("materialize/environmentd:v26.11.0-dev.0+b".to_owned());
775        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.11.0-dev.0+a".to_owned();
776        assert!(mz.within_upgrade_window());
777    }
778
779    #[mz_ore::test]
780    fn is_valid_upgrade_version() {
781        let success_tests = [
782            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
783            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
784            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
785            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
786            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
787            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
788            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
789            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
790            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
791            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
792            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
793            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
794            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
795        ];
796        for (active_version, next_version) in success_tests {
797            assert!(
798                Materialize::is_valid_upgrade_version(&active_version, &next_version),
799                "v{active_version} can upgrade to v{next_version}"
800            );
801        }
802
803        let failure_tests = [
804            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
805            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
806            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
807            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
808            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
809            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
810            // Disallow anything before 0.147.20 to upgrade
811            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
812            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
813            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
814        ];
815        for (active_version, next_version) in failure_tests {
816            assert!(
817                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
818                "v{active_version} can't upgrade to v{next_version}"
819            );
820        }
821    }
822}