Skip to main content

mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::time::Duration;
12
13use k8s_openapi::{
14    api::core::v1::{EnvVar, ResourceRequirements},
15    apimachinery::pkg::{
16        api::resource::Quantity,
17        apis::meta::v1::{Condition, Time},
18    },
19    jiff::Timestamp,
20};
21use kube::{CustomResource, Resource, ResourceExt};
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use sha2::{Digest, Sha256};
26use uuid::Uuid;
27
28use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};
29use mz_server_core::listeners::AuthenticatorKind;
30
31pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
32    "materialize.cloud/last-known-active-generation";
33pub const FORCE_ROLLOUT_ANNOTATION: &str = "materialize.cloud/force-rollout";
34
35#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
36pub enum MaterializeRolloutStrategy {
37    /// Create a new generation of pods, leaving the old generation around until the
38    /// new ones are ready to take over.
39    /// This minimizes downtime, and is what almost everyone should use.
40    #[default]
41    WaitUntilReady,
42
43    /// Create a new generation of pods, leaving the old generation as the serving generation
44    /// until the user manually promotes the new generation.
45    ///
46    /// When using `ManuallyPromote`, the new generation can be promoted at any
47    /// time, even if it has dataflows that are not fully caught up, by setting
48    /// `forcePromote` to the same value as `requestRollout` in the Materialize spec.
49    ///
50    /// To minimize downtime, promotion should occur when the new generation
51    /// has caught up to the prior generation. To determine if the new
52    /// generation has caught up, consult the `UpToDate` condition in the
53    /// status of the Materialize Resource. If the condition's reason is
54    /// `ReadyToPromote` the new generation is ready to promote.
55    ///
56    /// {{<warning>}}
57    /// Do not leave new generations unpromoted indefinitely.
58    ///
59    /// The new generation keeps open read holds which prevent compaction. Once promoted or
60    /// cancelled, those read holds are released. If left unpromoted for an extended time, this
61    /// data can build up, and can cause extreme deletion load on the metadata backend database
62    /// when finally promoted or cancelled.
63    ///
64    /// To guard against this, a rollout that remains in progress longer
65    /// than `rolloutRequestTimeout` (default 24h) is automatically
66    /// cancelled.
67    /// {{</warning>}}
68    ManuallyPromote,
69
70    /// {{<warning>}}
71    /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
72    ///
73    /// This strategy should ONLY be used by customers with physical hardware who do not have
74    /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
75    /// consult with Materialize engineering to discuss your situation.
76    /// {{</warning>}}
77    ///
78    /// Tear down the old generation of pods and promote the new generation of pods immediately,
79    /// without waiting for the new generation of pods to be ready.
80    ImmediatelyPromoteCausingDowntime,
81}
82
83/// Default for [`RolloutRequestTimeout`]. A new generation that sits
84/// un-promoted holds back compaction via read holds, and promoting it
85/// after a long delay can cause incident-inducing load; 24h is a
86/// conservative upper bound on how long any rollout should take.
87pub const DEFAULT_ROLLOUT_REQUEST_TIMEOUT: &str = "24h";
88
89/// The maximum time [`v1alpha1::MaterializeSpec::rollout_request_timeout`] allows a
90/// rollout to remain in progress.
91///
92/// A transparent wrapper around the duration string whose [`Default`] is
93/// [`DEFAULT_ROLLOUT_REQUEST_TIMEOUT`]. Routing the default through `Default`
94/// keeps a single source of truth: the derived `Default` for
95/// [`v1alpha1::MaterializeSpec`], serde's `#[serde(default)]` (applied when the field
96/// is omitted on deserialize), and the schema default surfaced in the
97/// generated CRD (so the API server fills it in and `kubectl explain` shows
98/// it) all resolve to the same value.
99#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, JsonSchema)]
100#[serde(transparent)]
101pub struct RolloutRequestTimeout(pub String);
102
103impl Default for RolloutRequestTimeout {
104    fn default() -> Self {
105        RolloutRequestTimeout(DEFAULT_ROLLOUT_REQUEST_TIMEOUT.to_owned())
106    }
107}
108
109pub mod v1alpha1 {
110    use super::*;
111
112    #[derive(
113        CustomResource,
114        Clone,
115        Debug,
116        Default,
117        PartialEq,
118        Deserialize,
119        Serialize,
120        JsonSchema
121    )]
122    #[serde(rename_all = "camelCase")]
123    #[kube(
124        namespaced,
125        group = "materialize.cloud",
126        version = "v1alpha1",
127        kind = "Materialize",
128        singular = "materialize",
129        plural = "materializes",
130        shortname = "mzs",
131        status = "MaterializeStatus",
132        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
133        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
134        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
135    )]
136    pub struct MaterializeSpec {
137        /// The environmentd image to run.
138        pub environmentd_image_ref: String,
139        /// Extra args to pass to the environmentd binary.
140        pub environmentd_extra_args: Option<Vec<String>>,
141        /// Extra environment variables to pass to the environmentd binary.
142        pub environmentd_extra_env: Option<Vec<EnvVar>>,
143        /// {{<warning>}}
144        /// Deprecated.
145        ///
146        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
147        /// {{</warning>}}
148        ///
149        /// If running in AWS, override the IAM role to use to give
150        /// environmentd access to the persist S3 bucket.
151        #[kube(deprecated)]
152        pub environmentd_iam_role_arn: Option<String>,
153        /// If running in AWS, override the IAM role to use to support
154        /// the CREATE CONNECTION feature.
155        pub environmentd_connection_role_arn: Option<String>,
156        /// Resource requirements for the environmentd pod.
157        pub environmentd_resource_requirements: Option<ResourceRequirements>,
158        /// Amount of disk to allocate, if a storage class is provided.
159        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
160        /// Resource requirements for the balancerd pod.
161        pub balancerd_resource_requirements: Option<ResourceRequirements>,
162        /// Resource requirements for the console pod.
163        pub console_resource_requirements: Option<ResourceRequirements>,
164        /// Number of balancerd pods to create.
165        pub balancerd_replicas: Option<i32>,
166        /// Number of console pods to create.
167        pub console_replicas: Option<i32>,
168
169        /// Name of the kubernetes service account to use.
170        /// If not set, we will create one with the same name as this Materialize object.
171        pub service_account_name: Option<String>,
172        /// Annotations to apply to the service account.
173        ///
174        /// Annotations on service accounts are commonly used by cloud providers for IAM.
175        /// AWS uses "eks.amazonaws.com/role-arn".
176        /// Azure uses "azure.workload.identity/client-id", but
177        /// additionally requires "azure.workload.identity/use": "true" on the pods.
178        pub service_account_annotations: Option<BTreeMap<String, String>>,
179        /// Labels to apply to the service account.
180        pub service_account_labels: Option<BTreeMap<String, String>>,
181        /// Annotations to apply to the pods.
182        pub pod_annotations: Option<BTreeMap<String, String>>,
183        /// Labels to apply to the pods.
184        pub pod_labels: Option<BTreeMap<String, String>>,
185
186        /// When changes are made to the environmentd resources (either via
187        /// modifying fields in the spec here or by deploying a new
188        /// orchestratord version which changes how resources are generated),
189        /// existing environmentd processes won't be automatically restarted.
190        /// In order to trigger a restart, the request_rollout field should be
191        /// set to a new (random) value. Once the rollout completes, the value
192        /// of `status.lastCompletedRolloutRequest` will be set to this value
193        /// to indicate completion.
194        ///
195        /// Defaults to a random value in order to ensure that the first
196        /// generation rollout is automatically triggered.
197        #[serde(default)]
198        pub request_rollout: Uuid,
199        /// If `forcePromote` is set to the same value as `requestRollout`, the
200        /// current rollout will skip waiting for clusters in the new
201        /// generation to rehydrate before promoting the new environmentd to
202        /// leader.
203        #[serde(default)]
204        pub force_promote: String,
205        /// This value will be written to an annotation in the generated
206        /// environmentd statefulset, in order to force the controller to
207        /// detect the generated resources as changed even if no other changes
208        /// happened. This can be used to force a rollout to a new generation
209        /// even without making any meaningful changes, by setting it to the
210        /// same value as `requestRollout`.
211        #[serde(default)]
212        pub force_rollout: Uuid,
213        /// {{<warning>}}
214        /// Deprecated and ignored. Use `rolloutStrategy` instead.
215        /// {{</warning>}}
216        #[kube(deprecated)]
217        #[serde(default)]
218        pub in_place_rollout: bool,
219        /// Rollout strategy to use when upgrading this Materialize instance.
220        #[serde(default)]
221        pub rollout_strategy: MaterializeRolloutStrategy,
222        /// The maximum amount of time a rollout may remain in progress before
223        /// it is automatically cancelled.
224        ///
225        /// While a rollout is in progress, the new generation of `environmentd`
226        /// runs in a read-only, un-promoted state and holds back compaction via
227        /// read holds. Leaving it in this state for too long can cause
228        /// incident-inducing load when it is eventually promoted, so the
229        /// operator cancels the rollout once this timeout is exceeded: the new
230        /// generation is torn down and the previously-active generation
231        /// continues serving. A new rollout can then be triggered by setting
232        /// `requestRollout` to a new value.
233        ///
234        /// This does not apply to the `ImmediatelyPromoteCausingDowntime`
235        /// rollout strategy or to force-promoted rollouts, since by the time
236        /// those are in progress the old generation may already be gone.
237        ///
238        /// The value is parsed as a human-readable duration, e.g. `24h`,
239        /// `90m`, or `1h 30m`. Defaults to [`DEFAULT_ROLLOUT_REQUEST_TIMEOUT`]
240        /// when omitted (the API server fills it in); an unparseable value also
241        /// falls back to that default.
242        #[serde(default)]
243        pub rollout_request_timeout: RolloutRequestTimeout,
244        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
245        /// It may also contain `external_login_password_mz_system`, which will be used as
246        /// the password for the `mz_system` user if `authenticatorKind` is `Password`,
247        /// `Sasl`, or `Oidc`.
248        pub backend_secret_name: String,
249        /// How to authenticate with Materialize.
250        #[serde(default)]
251        pub authenticator_kind: AuthenticatorKind,
252        /// Whether to enable role based access control. Defaults to false.
253        #[serde(default)]
254        pub enable_rbac: bool,
255
256        /// The value used by environmentd (via the --environment-id flag) to
257        /// uniquely identify this instance. Must be globally unique, and
258        /// is required if a license key is not provided.
259        /// NOTE: This value MUST NOT be changed in an existing instance,
260        /// since it affects things like the way data is stored in the persist
261        /// backend.
262        #[serde(default)]
263        pub environment_id: Uuid,
264
265        /// The name of a ConfigMap containing system parameters in JSON format.
266        /// The ConfigMap must contain a `system-params.json` key whose value
267        /// is a valid JSON object containing valid system parameters.
268        ///
269        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
270        ///
271        /// Example ConfigMap:
272        /// ```yaml
273        /// data:
274        ///   system-params.json: |
275        ///     {
276        ///       "max_connections": 1000
277        ///     }
278        /// ```
279        pub system_parameter_configmap_name: Option<String>,
280
281        /// The configuration for generating an x509 certificate using cert-manager for balancerd
282        /// to present to incoming connections.
283        /// The `dnsNames` and `issuerRef` fields are required.
284        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
285        /// The configuration for generating an x509 certificate using cert-manager for the console
286        /// to present to incoming connections.
287        /// The `dnsNames` and `issuerRef` fields are required.
288        /// Not yet implemented.
289        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
290        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
291        /// The `issuerRef` field is required.
292        /// This currently is only used for environmentd, but will eventually support clusterd.
293        /// Not yet implemented.
294        pub internal_certificate_spec: Option<MaterializeCertSpec>,
295    }
296
297    impl Materialize {
298        pub fn backend_secret_name(&self) -> String {
299            self.spec.backend_secret_name.clone()
300        }
301
302        pub fn namespace(&self) -> String {
303            self.meta().namespace.clone().unwrap()
304        }
305
306        pub fn create_service_account(&self) -> bool {
307            self.spec.service_account_name.is_none()
308        }
309
310        pub fn service_account_name(&self) -> String {
311            self.spec
312                .service_account_name
313                .clone()
314                .unwrap_or_else(|| self.name_unchecked())
315        }
316
317        pub fn role_name(&self) -> String {
318            self.name_unchecked()
319        }
320
321        pub fn role_binding_name(&self) -> String {
322            self.name_unchecked()
323        }
324
325        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
326            self.name_prefixed(&format!("environmentd-{generation}"))
327        }
328
329        pub fn environmentd_app_name(&self) -> String {
330            "environmentd".to_owned()
331        }
332
333        pub fn environmentd_service_name(&self) -> String {
334            self.name_prefixed("environmentd")
335        }
336
337        pub fn environmentd_service_internal_fqdn(&self) -> String {
338            format!(
339                "{}.{}.svc.cluster.local",
340                self.environmentd_service_name(),
341                self.meta().namespace.as_ref().unwrap()
342            )
343        }
344
345        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
346            self.name_prefixed(&format!("environmentd-{generation}"))
347        }
348
349        pub fn balancerd_app_name(&self) -> String {
350            "balancerd".to_owned()
351        }
352
353        pub fn environmentd_certificate_name(&self) -> String {
354            self.name_prefixed("environmentd-external")
355        }
356
357        pub fn environmentd_certificate_secret_name(&self) -> String {
358            self.name_prefixed("environmentd-tls")
359        }
360
361        pub fn balancerd_deployment_name(&self) -> String {
362            self.name_prefixed("balancerd")
363        }
364
365        pub fn balancerd_service_name(&self) -> String {
366            self.name_prefixed("balancerd")
367        }
368
369        pub fn console_app_name(&self) -> String {
370            "console".to_owned()
371        }
372
373        pub fn balancerd_external_certificate_name(&self) -> String {
374            self.name_prefixed("balancerd-external")
375        }
376
377        pub fn balancerd_external_certificate_secret_name(&self) -> String {
378            self.name_prefixed("balancerd-external-tls")
379        }
380
381        pub fn balancerd_replicas(&self) -> i32 {
382            self.spec.balancerd_replicas.unwrap_or(2)
383        }
384
385        pub fn console_replicas(&self) -> i32 {
386            self.spec.console_replicas.unwrap_or(2)
387        }
388
389        pub fn console_configmap_name(&self) -> String {
390            self.name_prefixed("console")
391        }
392
393        pub fn console_deployment_name(&self) -> String {
394            self.name_prefixed("console")
395        }
396
397        pub fn console_service_name(&self) -> String {
398            self.name_prefixed("console")
399        }
400
401        pub fn console_external_certificate_name(&self) -> String {
402            self.name_prefixed("console-external")
403        }
404
405        pub fn console_external_certificate_secret_name(&self) -> String {
406            self.name_prefixed("console-external-tls")
407        }
408
409        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
410            self.name_prefixed(&format!("persist-pubsub-{generation}"))
411        }
412
413        pub fn listeners_configmap_name(&self, generation: u64) -> String {
414            self.name_prefixed(&format!("listeners-{generation}"))
415        }
416
417        pub fn name_prefixed(&self, suffix: &str) -> String {
418            format!("mz{}-{}", self.resource_id(), suffix)
419        }
420
421        pub fn resource_id(&self) -> &str {
422            &self.status.as_ref().unwrap().resource_id
423        }
424
425        pub fn system_parameter_configmap_name(&self) -> Option<String> {
426            self.spec.system_parameter_configmap_name.clone()
427        }
428
429        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
430            self.spec
431                .environmentd_scratch_volume_storage_requirement
432                .clone()
433                .unwrap_or_else(|| {
434                    self.spec
435                        .environmentd_resource_requirements
436                        .as_ref()
437                        .and_then(|requirements| {
438                            requirements
439                                .requests
440                                .as_ref()
441                                .or(requirements.limits.as_ref())
442                        })
443                        // TODO: in cloud, we've been defaulting to twice the
444                        // memory limit, but k8s-openapi doesn't seem to
445                        // provide any way to parse Quantity values, so there
446                        // isn't an easy way to do arithmetic on it
447                        .and_then(|requirements| requirements.get("memory").cloned())
448                        // TODO: is there a better default to use here?
449                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
450                })
451        }
452
453        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
454            format!(
455                "{}-{}-{}-0",
456                cloud_provider, region, self.spec.environment_id,
457            )
458        }
459
460        pub fn requested_reconciliation_id(&self) -> Uuid {
461            self.spec.request_rollout
462        }
463
464        pub fn rollout_requested(&self) -> bool {
465            self.requested_reconciliation_id()
466                != self
467                    .status
468                    .as_ref()
469                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
470        }
471
472        /// The maximum amount of time a rollout may remain in progress before
473        /// it is automatically cancelled. Parsed from
474        /// [`MaterializeSpec::rollout_request_timeout`], falling back to
475        /// [`DEFAULT_ROLLOUT_REQUEST_TIMEOUT`] when unset or unparseable.
476        pub fn rollout_request_timeout(&self) -> Duration {
477            let timeout = &self.spec.rollout_request_timeout.0;
478            humantime::parse_duration(timeout)
479                .or_else(|e| {
480                    tracing::warn!(
481                        rollout_request_timeout = %timeout,
482                        "failed to parse rolloutRequestTimeout, using default: {e}",
483                    );
484                    humantime::parse_duration(DEFAULT_ROLLOUT_REQUEST_TIMEOUT)
485                })
486                .expect("DEFAULT_ROLLOUT_REQUEST_TIMEOUT must be a valid duration")
487        }
488
489        /// If a timeout-eligible rollout is currently in progress, returns the
490        /// time at which it entered the in-progress (`Unknown`) state. Used to
491        /// enforce the rollout timeout.
492        ///
493        /// The `Applying` and `ReadyToPromote` phases are both reported as a
494        /// single in-progress window: [`Self::up_to_date_transition_time`]
495        /// carries the timestamp forward across them (they share the `Unknown`
496        /// status), so the timeout spans the whole pre-promotion rollout rather
497        /// than resetting at each phase.
498        ///
499        /// The `Promoting` phase is deliberately excluded even though it is
500        /// also `Unknown`: once a rollout has reached promotion it must never
501        /// be cancelled by the timeout, since the previously-active generation
502        /// may already be torn down, leaving nothing to fall back to. (The
503        /// controller also never reaches the timeout check while promoting,
504        /// because `is_promoting` takes priority; this is belt-and-suspenders.)
505        pub fn rollout_in_progress_since(&self) -> Option<Timestamp> {
506            self.status
507                .as_ref()?
508                .conditions
509                .iter()
510                .find_map(|condition| {
511                    if condition.type_ == "UpToDate"
512                        && condition.status == "Unknown"
513                        && condition.reason != "Promoting"
514                    {
515                        Some(condition.last_transition_time.0)
516                    } else {
517                        None
518                    }
519                })
520        }
521
522        /// The `last_transition_time` to record for a new `UpToDate` condition
523        /// with `new_status`, following the Kubernetes convention that
524        /// `last_transition_time` marks when the condition's *status* last
525        /// changed — not its reason or message. While the status is unchanged
526        /// the existing timestamp is carried forward; it only resets to `now`
527        /// when the status actually changes (or there is no prior condition).
528        ///
529        /// This is what lets a rollout that moves through several same-status
530        /// phases (`Applying` -> `ReadyToPromote`, both `Unknown`) be measured
531        /// from when it first entered that status, so the rollout timeout
532        /// covers the phases together instead of restarting at each one.
533        pub fn up_to_date_transition_time(&self, new_status: &str, now: Timestamp) -> Timestamp {
534            self.status
535                .as_ref()
536                .and_then(|status| {
537                    status
538                        .conditions
539                        .iter()
540                        .find(|condition| condition.type_ == "UpToDate")
541                })
542                .filter(|condition| condition.status == new_status)
543                .map_or(now, |condition| condition.last_transition_time.0)
544        }
545
546        /// Returns the environmentd image ref of the currently-active
547        /// generation: the image of the last completed rollout, falling back
548        /// to the spec image when no rollout has completed yet. Downstream
549        /// resources (balancerd, console) should track this rather than
550        /// [`MaterializeSpec::environmentd_image_ref`] so they stay aligned
551        /// with the running environmentd when the spec is mid-rollout or has
552        /// been partially reverted (DEP-42).
553        pub fn active_environmentd_image_ref(&self) -> &str {
554            self.status
555                .as_ref()
556                .and_then(|s| s.last_completed_rollout_environmentd_image_ref.as_deref())
557                .unwrap_or(&self.spec.environmentd_image_ref)
558        }
559
560        pub fn set_force_promote(&mut self) {
561            self.spec.force_promote = self.spec.request_rollout.hyphenated().to_string();
562        }
563
564        pub fn should_force_promote(&self) -> bool {
565            self.spec.force_promote == self.spec.request_rollout.hyphenated().to_string()
566                || self.spec.force_promote
567                    == super::v1::Materialize::from(self.clone()).generate_rollout_hash()
568                || self.spec.rollout_strategy
569                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
570        }
571
572        pub fn conditions_need_update(&self) -> bool {
573            let Some(status) = self.status.as_ref() else {
574                return true;
575            };
576            if status.conditions.is_empty() {
577                return true;
578            }
579            for condition in &status.conditions {
580                if condition.observed_generation != self.meta().generation {
581                    return true;
582                }
583            }
584            false
585        }
586
587        pub fn is_ready_to_promote(&self, resources_hash: &str) -> bool {
588            let Some(status) = self.status.as_ref() else {
589                return false;
590            };
591            if status.conditions.is_empty() {
592                return false;
593            }
594            status
595                .conditions
596                .iter()
597                .any(|condition| condition.reason == "ReadyToPromote")
598                && &status.resources_hash == resources_hash
599        }
600
601        pub fn is_promoting(&self) -> bool {
602            let Some(status) = self.status.as_ref() else {
603                return false;
604            };
605            if status.conditions.is_empty() {
606                return false;
607            }
608            status
609                .conditions
610                .iter()
611                .any(|condition| condition.reason == "Promoting")
612        }
613
614        pub fn update_in_progress(&self) -> bool {
615            let Some(status) = self.status.as_ref() else {
616                return false;
617            };
618            if status.conditions.is_empty() {
619                return false;
620            }
621            for condition in &status.conditions {
622                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
623                    return true;
624                }
625            }
626            false
627        }
628
629        /// Checks that the given version is greater than or equal
630        /// to the existing version, if the existing version
631        /// can be parsed.
632        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
633            let version = parse_image_ref(&self.spec.environmentd_image_ref);
634            match version {
635                // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
636                Some(version) => version.cmp_precedence(minimum).is_ge(),
637                // In the rare case that we see an image reference
638                // that we can't parse, we assume that it satisfies all
639                // version checks. Usually these are custom images that have
640                // been by a developer on a branch forked from a recent copy
641                // of main, and so this works out reasonably well in practice.
642                None => {
643                    tracing::warn!(
644                        image_ref = %self.spec.environmentd_image_ref,
645                        "failed to parse image ref",
646                    );
647                    true
648                }
649            }
650        }
651
652        /// This check isn't strictly required since environmentd will still be able to determine
653        /// if the upgrade is allowed or not. However, doing this check allows us to provide
654        /// the error as soon as possible and in a more user friendly way.
655        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
656            // Don't allow rolling back
657            // Note: semver comparison handles RC versions correctly:
658            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
659            // Use cmp_precedence() to ignore build metadata
660            if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
661                return false;
662            }
663
664            if active_version.major == 0 {
665                if next_version.major != active_version.major {
666                    if next_version.major == 26 {
667                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
668                        // before upgrading to 26.0.0
669                        return (active_version.minor == 147 && active_version.patch >= 20)
670                            || active_version.minor >= 164;
671                    } else {
672                        return false;
673                    }
674                }
675                // Self managed 25.1 to 25.2
676                if next_version.minor == 147 && active_version.minor == 130 {
677                    return true;
678                }
679                // only allow upgrading a single minor version at a time
680                return next_version.minor <= active_version.minor + 1;
681            } else if active_version.major >= 26 {
682                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
683                return next_version.major <= active_version.major + 1;
684            }
685
686            true
687        }
688
689        /// Checks if the current environmentd image ref is within the upgrade window of the last
690        /// successful rollout.
691        pub fn within_upgrade_window(&self) -> bool {
692            let active_environmentd_version = self
693                .status
694                .as_ref()
695                .and_then(|status| {
696                    status
697                        .last_completed_rollout_environmentd_image_ref
698                        .as_ref()
699                })
700                .and_then(|image_ref| parse_image_ref(image_ref));
701
702            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
703                parse_image_ref(&self.spec.environmentd_image_ref),
704                active_environmentd_version,
705            ) {
706                Self::is_valid_upgrade_version(
707                    &active_environmentd_version,
708                    &next_environmentd_version,
709                )
710            } else {
711                // If we fail to parse either version,
712                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
713                true
714            }
715        }
716
717        pub fn status(&self) -> MaterializeStatus {
718            self.status.clone().unwrap_or_else(|| {
719                let mut status = MaterializeStatus::default();
720
721                status.resource_id = new_resource_id();
722
723                // If we're creating the initial status on an un-soft-deleted
724                // Environment we need to ensure that the last active generation
725                // is restored, otherwise the env will crash loop indefinitely
726                // as its catalog would have durably recorded a greater generation
727                if let Some(last_active_generation) = self
728                    .annotations()
729                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
730                {
731                    status.active_generation = last_active_generation
732                        .parse()
733                        .expect("valid int generation");
734                }
735
736                // Initialize the last completed rollout environmentd image ref to
737                // the current image ref if not already set.
738                status.last_completed_rollout_environmentd_image_ref =
739                    Some(self.spec.environmentd_image_ref.clone());
740
741                status
742            })
743        }
744    }
745
746    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
747    #[serde(rename_all = "camelCase")]
748    pub struct MaterializeStatus {
749        /// Resource identifier used as a name prefix to avoid pod name collisions.
750        pub resource_id: String,
751        /// The generation of Materialize pods actively capable of servicing requests.
752        pub active_generation: u64,
753        /// The UUID of the last successfully completed rollout.
754        pub last_completed_rollout_request: Uuid,
755        /// The image ref of the environmentd image that was last successfully rolled out.
756        /// Used to deny upgrades past 1 major version from the last successful rollout.
757        /// When None, we upgrade anyways.
758        pub last_completed_rollout_environmentd_image_ref: Option<String>,
759        /// A hash calculated from the spec of resources to be created based on this Materialize
760        /// spec. This is used for detecting when the existing resources are up to date.
761        /// If you want to trigger a rollout without making other changes that would cause this
762        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
763        pub resources_hash: String,
764        /// The last completed rollout hash from v1.
765        /// This exists on this older version only for round-trip conversion support.
766        pub last_completed_rollout_hash: Option<String>,
767        pub conditions: Vec<Condition>,
768    }
769
770    impl MaterializeStatus {
771        pub fn needs_update(&self, other: &Self) -> bool {
772            let now = Timestamp::now();
773            let mut a = self.clone();
774            for condition in &mut a.conditions {
775                condition.last_transition_time = Time(now);
776            }
777            let mut b = other.clone();
778            for condition in &mut b.conditions {
779                condition.last_transition_time = Time(now);
780            }
781            a != b
782        }
783    }
784
785    impl ManagedResource for Materialize {
786        fn default_labels(&self) -> BTreeMap<String, String> {
787            BTreeMap::from_iter([
788                (
789                    "materialize.cloud/organization-name".to_owned(),
790                    self.name_unchecked(),
791                ),
792                (
793                    "materialize.cloud/organization-namespace".to_owned(),
794                    self.namespace(),
795                ),
796                (
797                    "materialize.cloud/mz-resource-id".to_owned(),
798                    self.resource_id().to_owned(),
799                ),
800            ])
801        }
802    }
803
804    impl From<v1::Materialize> for Materialize {
805        fn from(value: v1::Materialize) -> Self {
806            let rollout_hash = value.generate_rollout_hash();
807            // Derive a deterministic UUID from the rollout hash so that the
808            // same v1 spec always produces the same requestRollout,
809            // making re-applies of an unchanged spec idempotent.
810            let request_rollout = Uuid::new_v5(&Uuid::NAMESPACE_OID, rollout_hash.as_bytes());
811            Materialize {
812                metadata: value.metadata,
813                spec: MaterializeSpec {
814                    environmentd_image_ref: value.spec.environmentd_image_ref,
815                    environmentd_extra_args: value.spec.environmentd_extra_args,
816                    environmentd_extra_env: value.spec.environmentd_extra_env,
817                    environmentd_iam_role_arn: None,
818                    environmentd_connection_role_arn: value.spec.environmentd_connection_role_arn,
819                    environmentd_resource_requirements: value
820                        .spec
821                        .environmentd_resource_requirements,
822                    environmentd_scratch_volume_storage_requirement: value
823                        .spec
824                        .environmentd_scratch_volume_storage_requirement,
825                    balancerd_resource_requirements: value.spec.balancerd_resource_requirements,
826                    console_resource_requirements: value.spec.console_resource_requirements,
827                    balancerd_replicas: value.spec.balancerd_replicas,
828                    console_replicas: value.spec.console_replicas,
829                    service_account_name: value.spec.service_account_name,
830                    service_account_annotations: value.spec.service_account_annotations,
831                    service_account_labels: value.spec.service_account_labels,
832                    pod_annotations: value.spec.pod_annotations,
833                    pod_labels: value.spec.pod_labels,
834                    force_promote: value.spec.force_promote.unwrap_or_default(),
835                    force_rollout: value.spec.force_rollout,
836                    rollout_strategy: value.spec.rollout_strategy,
837                    rollout_request_timeout: value.spec.rollout_request_timeout,
838                    backend_secret_name: value.spec.backend_secret_name,
839                    authenticator_kind: value.spec.authenticator_kind,
840                    enable_rbac: value.spec.enable_rbac,
841                    environment_id: value.spec.environment_id,
842                    system_parameter_configmap_name: value.spec.system_parameter_configmap_name,
843                    balancerd_external_certificate_spec: value
844                        .spec
845                        .balancerd_external_certificate_spec,
846                    console_external_certificate_spec: value.spec.console_external_certificate_spec,
847                    internal_certificate_spec: value.spec.internal_certificate_spec,
848                    request_rollout,
849                    in_place_rollout: false,
850                },
851                status: value.status.map(|status| MaterializeStatus {
852                    resource_id: status.resource_id,
853                    active_generation: status.active_generation,
854                    last_completed_rollout_environmentd_image_ref: status
855                        .last_completed_rollout_environmentd_image_ref,
856                    conditions: status.conditions,
857                    // Derive the same deterministic UUID from the last
858                    // completed hash so that request_rollout == this value
859                    // when the spec hasn't changed (no rollout needed).
860                    last_completed_rollout_request: status
861                        .last_completed_rollout_hash
862                        .as_ref()
863                        .map(|hash| Uuid::new_v5(&Uuid::NAMESPACE_OID, hash.as_bytes()))
864                        .unwrap_or(Uuid::nil()),
865                    last_completed_rollout_hash: status.last_completed_rollout_hash,
866                    resources_hash: "".to_owned(),
867                }),
868            }
869        }
870    }
871}
872
873pub mod v1 {
874    use super::*;
875
876    #[derive(
877        CustomResource,
878        Clone,
879        Debug,
880        Default,
881        PartialEq,
882        Deserialize,
883        Serialize,
884        JsonSchema
885    )]
886    #[serde(rename_all = "camelCase")]
887    #[kube(
888        namespaced,
889        group = "materialize.cloud",
890        version = "v1",
891        kind = "Materialize",
892        singular = "materialize",
893        plural = "materializes",
894        shortname = "mzs",
895        status = "MaterializeStatus",
896        printcolumn = r#"{"name": "ImageRefRunning", "type": "string", "description": "Reference to the Docker image that is currently in use.", "jsonPath": ".status.lastCompletedRolloutEnvironmentdImageRef", "priority": 1}"#,
897        printcolumn = r#"{"name": "ImageRefToDeploy", "type": "string", "description": "Reference to the Docker image which will be deployed on the next rollout.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
898        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
899    )]
900    pub struct MaterializeSpec {
901        /// The environmentd image to run.
902        pub environmentd_image_ref: String,
903        /// Extra args to pass to the environmentd binary.
904        pub environmentd_extra_args: Option<Vec<String>>,
905        /// Extra environment variables to pass to the environmentd binary.
906        pub environmentd_extra_env: Option<Vec<EnvVar>>,
907        /// If running in AWS, override the IAM role to use to support
908        /// the CREATE CONNECTION feature.
909        pub environmentd_connection_role_arn: Option<String>,
910        /// Resource requirements for the environmentd pod.
911        pub environmentd_resource_requirements: Option<ResourceRequirements>,
912        /// Amount of disk to allocate, if a storage class is provided.
913        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
914        /// Resource requirements for the balancerd pod.
915        ///
916        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
917        pub balancerd_resource_requirements: Option<ResourceRequirements>,
918        /// Resource requirements for the console pod.
919        ///
920        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
921        pub console_resource_requirements: Option<ResourceRequirements>,
922        /// Number of balancerd pods to create.
923        ///
924        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
925        pub balancerd_replicas: Option<i32>,
926        /// Number of console pods to create.
927        ///
928        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
929        pub console_replicas: Option<i32>,
930
931        /// Name of the kubernetes service account to use.
932        /// If not set, we will create one with the same name as this Materialize object.
933        pub service_account_name: Option<String>,
934        /// Annotations to apply to the service account.
935        ///
936        /// Annotations on service accounts are commonly used by cloud providers for IAM.
937        /// AWS uses "eks.amazonaws.com/role-arn".
938        /// Azure uses "azure.workload.identity/client-id", but
939        /// additionally requires "azure.workload.identity/use": "true" on the pods.
940        pub service_account_annotations: Option<BTreeMap<String, String>>,
941        /// Labels to apply to the service account.
942        pub service_account_labels: Option<BTreeMap<String, String>>,
943        /// Annotations to apply to the pods.
944        pub pod_annotations: Option<BTreeMap<String, String>>,
945        /// Labels to apply to the pods.
946        pub pod_labels: Option<BTreeMap<String, String>>,
947
948        /// If `forcePromote` is set to the same value as the `status.requestedRolloutHash`,
949        /// current rollout will skip waiting for clusters in the new
950        /// generation to rehydrate before promoting the new environmentd to
951        /// leader.
952        ///
953        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
954        pub force_promote: Option<String>,
955        /// This value will force the controller to detect the spec as changed
956        /// even if no other changes happened. This can be used to force a rollout
957        /// to a new generation even without making any meaningful changes.
958        #[serde(default)]
959        pub force_rollout: Uuid,
960        /// Rollout strategy to use when upgrading this Materialize instance.
961        #[serde(default)]
962        pub rollout_strategy: MaterializeRolloutStrategy,
963        /// The maximum amount of time a rollout may remain in progress before
964        /// it is automatically cancelled.
965        ///
966        /// While a rollout is in progress, the new generation of `environmentd`
967        /// runs in a read-only, un-promoted state and holds back compaction via
968        /// read holds. Leaving it in this state for too long can cause
969        /// incident-inducing load when it is eventually promoted, so the
970        /// operator cancels the rollout once this timeout is exceeded: the new
971        /// generation is torn down and the previously-active generation
972        /// continues serving. A new rollout can then be triggered by setting
973        /// `requestRollout` to a new value.
974        ///
975        /// This does not apply to the `ImmediatelyPromoteCausingDowntime`
976        /// rollout strategy or to force-promoted rollouts, since by the time
977        /// those are in progress the old generation may already be gone.
978        ///
979        /// The value is parsed as a human-readable duration, e.g. `24h`,
980        /// `90m`, or `1h 30m`. Defaults to [`DEFAULT_ROLLOUT_REQUEST_TIMEOUT`]
981        /// when omitted (the API server fills it in); an unparseable value also
982        /// falls back to that default.
983        #[serde(default)]
984        pub rollout_request_timeout: RolloutRequestTimeout,
985        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
986        /// It may also contain `external_login_password_mz_system`, which will be used as
987        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
988        pub backend_secret_name: String,
989        /// How to authenticate with Materialize.
990        #[serde(default)]
991        pub authenticator_kind: AuthenticatorKind,
992        /// Whether to enable role based access control. Defaults to false.
993        #[serde(default)]
994        pub enable_rbac: bool,
995
996        /// The value used by environmentd (via the --environment-id flag) to
997        /// uniquely identify this instance. Must be globally unique, and
998        /// is required if a license key is not provided.
999        /// NOTE: This value MUST NOT be changed in an existing instance,
1000        /// since it affects things like the way data is stored in the persist
1001        /// backend.
1002        #[serde(default)]
1003        pub environment_id: Uuid,
1004
1005        /// The name of a ConfigMap containing system parameters in JSON format.
1006        /// The ConfigMap must contain a `system-params.json` key whose value
1007        /// is a valid JSON object containing valid system parameters.
1008        ///
1009        /// Run `SHOW ALL` in SQL to see a subset of configurable system parameters.
1010        ///
1011        /// Example ConfigMap:
1012        /// ```yaml
1013        /// data:
1014        ///   system-params.json: |
1015        ///     {
1016        ///       "max_connections": 1000
1017        ///     }
1018        /// ```
1019        pub system_parameter_configmap_name: Option<String>,
1020
1021        /// The configuration for generating an x509 certificate using cert-manager for balancerd
1022        /// to present to incoming connections.
1023        /// The `dnsNames` and `issuerRef` fields are required.
1024        ///
1025        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
1026        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
1027        /// The configuration for generating an x509 certificate using cert-manager for the console
1028        /// to present to incoming connections.
1029        /// The `dnsNames` and `issuerRef` fields are required.
1030        /// Not yet implemented.
1031        ///
1032        /// This field is excluded from the rollout hash and changes will not trigger a rollout.
1033        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
1034        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
1035        /// The `issuerRef` field is required.
1036        /// This currently is only used for environmentd, but will eventually support clusterd.
1037        /// Not yet implemented.
1038        pub internal_certificate_spec: Option<MaterializeCertSpec>,
1039    }
1040
1041    impl Materialize {
1042        pub fn generate_rollout_hash(&self) -> String {
1043            let mut hasher = Sha256::new();
1044            // Remove fields that don't affect the resources generated per generation,
1045            // and we don't want to trigger a rollout from.
1046            let spec = MaterializeSpec {
1047                environmentd_image_ref: self.spec.environmentd_image_ref.clone(),
1048                environmentd_extra_args: self.spec.environmentd_extra_args.clone(),
1049                environmentd_extra_env: self.spec.environmentd_extra_env.clone(),
1050                environmentd_connection_role_arn: self
1051                    .spec
1052                    .environmentd_connection_role_arn
1053                    .clone(),
1054                environmentd_resource_requirements: self
1055                    .spec
1056                    .environmentd_resource_requirements
1057                    .clone(),
1058                environmentd_scratch_volume_storage_requirement: self
1059                    .spec
1060                    .environmentd_scratch_volume_storage_requirement
1061                    .clone(),
1062                balancerd_resource_requirements: None,
1063                console_resource_requirements: None,
1064                balancerd_replicas: None,
1065                console_replicas: None,
1066                service_account_name: self.spec.service_account_name.clone(),
1067                service_account_annotations: self.spec.service_account_annotations.clone(),
1068                service_account_labels: self.spec.service_account_labels.clone(),
1069                pod_annotations: self.spec.pod_annotations.clone(),
1070                pod_labels: self.spec.pod_labels.clone(),
1071                force_promote: None,
1072                force_rollout: self.spec.force_rollout,
1073                rollout_strategy: self.spec.rollout_strategy.clone(),
1074                rollout_request_timeout: self.spec.rollout_request_timeout.clone(),
1075                backend_secret_name: self.spec.backend_secret_name.clone(),
1076                authenticator_kind: self.spec.authenticator_kind,
1077                enable_rbac: self.spec.enable_rbac,
1078                environment_id: self.spec.environment_id,
1079                system_parameter_configmap_name: self.spec.system_parameter_configmap_name.clone(),
1080                balancerd_external_certificate_spec: None,
1081                console_external_certificate_spec: None,
1082                internal_certificate_spec: self.spec.internal_certificate_spec.clone(),
1083            };
1084            hasher.update(&serde_json::to_vec(&spec).unwrap());
1085            if let Some(annotation) = self
1086                .metadata
1087                .annotations
1088                .as_ref()
1089                .and_then(|annotations| annotations.get(FORCE_ROLLOUT_ANNOTATION))
1090            {
1091                hasher.update(annotation);
1092            }
1093            format!("{:x}", hasher.finalize())
1094        }
1095
1096        pub fn backend_secret_name(&self) -> String {
1097            self.spec.backend_secret_name.clone()
1098        }
1099
1100        pub fn namespace(&self) -> String {
1101            self.meta().namespace.clone().unwrap()
1102        }
1103
1104        pub fn create_service_account(&self) -> bool {
1105            self.spec.service_account_name.is_none()
1106        }
1107
1108        pub fn service_account_name(&self) -> String {
1109            self.spec
1110                .service_account_name
1111                .clone()
1112                .unwrap_or_else(|| self.name_unchecked())
1113        }
1114
1115        pub fn role_name(&self) -> String {
1116            self.name_unchecked()
1117        }
1118
1119        pub fn role_binding_name(&self) -> String {
1120            self.name_unchecked()
1121        }
1122
1123        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
1124            self.name_prefixed(&format!("environmentd-{generation}"))
1125        }
1126
1127        pub fn environmentd_app_name(&self) -> String {
1128            "environmentd".to_owned()
1129        }
1130
1131        pub fn environmentd_service_name(&self) -> String {
1132            self.name_prefixed("environmentd")
1133        }
1134
1135        pub fn environmentd_service_internal_fqdn(&self) -> String {
1136            format!(
1137                "{}.{}.svc.cluster.local",
1138                self.environmentd_service_name(),
1139                self.meta().namespace.as_ref().unwrap()
1140            )
1141        }
1142
1143        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
1144            self.name_prefixed(&format!("environmentd-{generation}"))
1145        }
1146
1147        pub fn balancerd_app_name(&self) -> String {
1148            "balancerd".to_owned()
1149        }
1150
1151        pub fn environmentd_certificate_name(&self) -> String {
1152            self.name_prefixed("environmentd-external")
1153        }
1154
1155        pub fn environmentd_certificate_secret_name(&self) -> String {
1156            self.name_prefixed("environmentd-tls")
1157        }
1158
1159        pub fn balancerd_deployment_name(&self) -> String {
1160            self.name_prefixed("balancerd")
1161        }
1162
1163        pub fn balancerd_service_name(&self) -> String {
1164            self.name_prefixed("balancerd")
1165        }
1166
1167        pub fn console_app_name(&self) -> String {
1168            "console".to_owned()
1169        }
1170
1171        pub fn balancerd_external_certificate_name(&self) -> String {
1172            self.name_prefixed("balancerd-external")
1173        }
1174
1175        pub fn balancerd_external_certificate_secret_name(&self) -> String {
1176            self.name_prefixed("balancerd-external-tls")
1177        }
1178
1179        pub fn balancerd_replicas(&self) -> i32 {
1180            self.spec.balancerd_replicas.unwrap_or(2)
1181        }
1182
1183        pub fn console_replicas(&self) -> i32 {
1184            self.spec.console_replicas.unwrap_or(2)
1185        }
1186
1187        pub fn console_configmap_name(&self) -> String {
1188            self.name_prefixed("console")
1189        }
1190
1191        pub fn console_deployment_name(&self) -> String {
1192            self.name_prefixed("console")
1193        }
1194
1195        pub fn console_service_name(&self) -> String {
1196            self.name_prefixed("console")
1197        }
1198
1199        pub fn console_external_certificate_name(&self) -> String {
1200            self.name_prefixed("console-external")
1201        }
1202
1203        pub fn console_external_certificate_secret_name(&self) -> String {
1204            self.name_prefixed("console-external-tls")
1205        }
1206
1207        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
1208            self.name_prefixed(&format!("persist-pubsub-{generation}"))
1209        }
1210
1211        pub fn listeners_configmap_name(&self, generation: u64) -> String {
1212            self.name_prefixed(&format!("listeners-{generation}"))
1213        }
1214
1215        pub fn name_prefixed(&self, suffix: &str) -> String {
1216            format!("mz{}-{}", self.resource_id(), suffix)
1217        }
1218
1219        pub fn resource_id(&self) -> &str {
1220            &self.status.as_ref().unwrap().resource_id
1221        }
1222
1223        pub fn system_parameter_configmap_name(&self) -> Option<String> {
1224            self.spec.system_parameter_configmap_name.clone()
1225        }
1226
1227        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
1228            self.spec
1229                .environmentd_scratch_volume_storage_requirement
1230                .clone()
1231                .unwrap_or_else(|| {
1232                    self.spec
1233                        .environmentd_resource_requirements
1234                        .as_ref()
1235                        .and_then(|requirements| {
1236                            requirements
1237                                .requests
1238                                .as_ref()
1239                                .or(requirements.limits.as_ref())
1240                        })
1241                        // TODO: in cloud, we've been defaulting to twice the
1242                        // memory limit, but k8s-openapi doesn't seem to
1243                        // provide any way to parse Quantity values, so there
1244                        // isn't an easy way to do arithmetic on it
1245                        .and_then(|requirements| requirements.get("memory").cloned())
1246                        // TODO: is there a better default to use here?
1247                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
1248                })
1249        }
1250
1251        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
1252            format!(
1253                "{}-{}-{}-0",
1254                cloud_provider, region, self.spec.environment_id,
1255            )
1256        }
1257
1258        pub fn rollout_requested(&self) -> bool {
1259            self.status
1260                .as_ref()
1261                .map(|status| status.last_completed_rollout_hash != status.requested_rollout_hash)
1262                .unwrap_or(false)
1263        }
1264
1265        pub fn set_force_promote(&mut self) {
1266            self.spec.force_promote = Some(self.generate_rollout_hash());
1267        }
1268
1269        pub fn should_force_promote(&self) -> bool {
1270            self.spec.force_promote.as_ref()
1271                == self
1272                    .status
1273                    .as_ref()
1274                    .and_then(|status| status.requested_rollout_hash.as_ref())
1275                || self.spec.rollout_strategy
1276                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
1277        }
1278
1279        pub fn conditions_need_update(&self) -> bool {
1280            let Some(status) = self.status.as_ref() else {
1281                return true;
1282            };
1283            if status.conditions.is_empty() {
1284                return true;
1285            }
1286            for condition in &status.conditions {
1287                if condition.observed_generation != self.meta().generation {
1288                    return true;
1289                }
1290            }
1291            false
1292        }
1293
1294        pub fn is_ready_to_promote(&self, rollout_hash: &str) -> bool {
1295            let Some(status) = self.status.as_ref() else {
1296                return false;
1297            };
1298            if status.conditions.is_empty() {
1299                return false;
1300            }
1301            status
1302                .conditions
1303                .iter()
1304                .any(|condition| condition.reason == "ReadyToPromote")
1305                && status.requested_rollout_hash.as_deref() == Some(rollout_hash)
1306        }
1307
1308        pub fn is_promoting(&self) -> bool {
1309            let Some(status) = self.status.as_ref() else {
1310                return false;
1311            };
1312            if status.conditions.is_empty() {
1313                return false;
1314            }
1315            status
1316                .conditions
1317                .iter()
1318                .any(|condition| condition.reason == "Promoting")
1319        }
1320
1321        pub fn update_in_progress(&self) -> bool {
1322            let Some(status) = self.status.as_ref() else {
1323                return false;
1324            };
1325            if status.conditions.is_empty() {
1326                return false;
1327            }
1328            for condition in &status.conditions {
1329                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
1330                    return true;
1331                }
1332            }
1333            false
1334        }
1335
1336        /// Checks that the given version is greater than or equal
1337        /// to the existing version, if the existing version
1338        /// can be parsed.
1339        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
1340            let version = parse_image_ref(&self.spec.environmentd_image_ref);
1341            match version {
1342                // Use cmp_precedence() to ignore build metadata per SemVer 2.0.0 spec
1343                Some(version) => version.cmp_precedence(minimum).is_ge(),
1344                // In the rare case that we see an image reference
1345                // that we can't parse, we assume that it satisfies all
1346                // version checks. Usually these are custom images that have
1347                // been by a developer on a branch forked from a recent copy
1348                // of main, and so this works out reasonably well in practice.
1349                None => {
1350                    tracing::warn!(
1351                        image_ref = %self.spec.environmentd_image_ref,
1352                        "failed to parse image ref",
1353                    );
1354                    true
1355                }
1356            }
1357        }
1358
1359        /// This check isn't strictly required since environmentd will still be able to determine
1360        /// if the upgrade is allowed or not. However, doing this check allows us to provide
1361        /// the error as soon as possible and in a more user friendly way.
1362        pub fn is_valid_upgrade_version(active_version: &Version, next_version: &Version) -> bool {
1363            // Don't allow rolling back
1364            // Note: semver comparison handles RC versions correctly:
1365            // v26.0.0-rc.1 < v26.0.0-rc.2 < v26.0.0
1366            // Use cmp_precedence() to ignore build metadata
1367            if next_version.cmp_precedence(active_version) == std::cmp::Ordering::Less {
1368                return false;
1369            }
1370
1371            if active_version.major == 0 {
1372                if next_version.major != active_version.major {
1373                    if next_version.major == 26 {
1374                        // We require customers to upgrade from 0.147.20 (Self Managed 25.2) or v0.164.X (Cloud)
1375                        // before upgrading to 26.0.0
1376
1377                        return (active_version.minor == 147 && active_version.patch >= 20)
1378                            || active_version.minor >= 164;
1379                    } else {
1380                        return false;
1381                    }
1382                }
1383                // Self managed 25.1 to 25.2
1384                if next_version.minor == 147 && active_version.minor == 130 {
1385                    return true;
1386                }
1387                // only allow upgrading a single minor version at a time
1388                return next_version.minor <= active_version.minor + 1;
1389            } else if active_version.major >= 26 {
1390                // For versions 26.X.X and onwards, we deny upgrades past 1 major version of the active version
1391                return next_version.major <= active_version.major + 1;
1392            }
1393
1394            true
1395        }
1396
1397        /// Checks if the current environmentd image ref is within the upgrade window of the last
1398        /// successful rollout.
1399        pub fn within_upgrade_window(&self) -> bool {
1400            let active_environmentd_version = self
1401                .status
1402                .as_ref()
1403                .and_then(|status| {
1404                    status
1405                        .last_completed_rollout_environmentd_image_ref
1406                        .as_ref()
1407                })
1408                .and_then(|image_ref| parse_image_ref(image_ref));
1409
1410            if let (Some(next_environmentd_version), Some(active_environmentd_version)) = (
1411                parse_image_ref(&self.spec.environmentd_image_ref),
1412                active_environmentd_version,
1413            ) {
1414                Self::is_valid_upgrade_version(
1415                    &active_environmentd_version,
1416                    &next_environmentd_version,
1417                )
1418            } else {
1419                // If we fail to parse either version,
1420                // we still allow the upgrade since environmentd will still error if the upgrade is not allowed.
1421                true
1422            }
1423        }
1424
1425        pub fn status(&self) -> MaterializeStatus {
1426            self.status.clone().unwrap_or_else(|| {
1427                let mut status = MaterializeStatus::default();
1428
1429                status.resource_id = new_resource_id();
1430
1431                // If we're creating the initial status on an un-soft-deleted
1432                // Environment we need to ensure that the last active generation
1433                // is restored, otherwise the env will crash loop indefinitely
1434                // as its catalog would have durably recorded a greater generation
1435                if let Some(last_active_generation) = self
1436                    .annotations()
1437                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
1438                {
1439                    status.active_generation = last_active_generation
1440                        .parse()
1441                        .expect("valid int generation");
1442                }
1443
1444                // Initialize the last completed rollout environmentd image ref to
1445                // the current image ref if not already set.
1446                status.last_completed_rollout_environmentd_image_ref =
1447                    Some(self.spec.environmentd_image_ref.clone());
1448
1449                status
1450            })
1451        }
1452    }
1453
1454    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
1455    #[serde(rename_all = "camelCase")]
1456    pub struct MaterializeStatus {
1457        /// Resource identifier used as a name prefix to avoid pod name collisions.
1458        pub resource_id: String,
1459        /// The generation of Materialize pods actively capable of servicing requests.
1460        pub active_generation: u64,
1461        /// The image ref of the environmentd image that was last successfully rolled out.
1462        /// Used to deny upgrades past 1 major version from the last successful rollout.
1463        /// When None, we upgrade anyways.
1464        pub last_completed_rollout_environmentd_image_ref: Option<String>,
1465        /// The last completed rollout's requestedRolloutHash.
1466        pub last_completed_rollout_hash: Option<String>,
1467        /// Hash of a subset of the Materialize spec and other fields.
1468        /// This is used to determine when the spec has changed and we need to rollout.
1469        pub requested_rollout_hash: Option<String>,
1470        pub conditions: Vec<Condition>,
1471    }
1472
1473    impl MaterializeStatus {
1474        pub fn needs_update(&self, other: &Self) -> bool {
1475            let now = Timestamp::now();
1476            let mut a = self.clone();
1477            for condition in &mut a.conditions {
1478                condition.last_transition_time = Time(now);
1479            }
1480            let mut b = other.clone();
1481            for condition in &mut b.conditions {
1482                condition.last_transition_time = Time(now);
1483            }
1484            a != b
1485        }
1486    }
1487
1488    impl ManagedResource for Materialize {
1489        fn default_labels(&self) -> BTreeMap<String, String> {
1490            BTreeMap::from_iter([
1491                (
1492                    "materialize.cloud/organization-name".to_owned(),
1493                    self.name_unchecked(),
1494                ),
1495                (
1496                    "materialize.cloud/organization-namespace".to_owned(),
1497                    self.namespace(),
1498                ),
1499                (
1500                    "materialize.cloud/mz-resource-id".to_owned(),
1501                    self.resource_id().to_owned(),
1502                ),
1503            ])
1504        }
1505
1506        fn app_name(&self) -> Option<&str> {
1507            Some("environmentd")
1508        }
1509    }
1510
1511    impl From<v1alpha1::Materialize> for Materialize {
1512        fn from(value: v1alpha1::Materialize) -> Self {
1513            let is_promoting = value.is_promoting();
1514            let service_account_annotations = if let Some(environmentd_iam_role_arn) =
1515                value.spec.environmentd_iam_role_arn
1516            {
1517                let mut annotations = value.spec.service_account_annotations.unwrap_or_default();
1518                annotations
1519                    .entry("eks.amazonaws.com/role-arn".to_owned())
1520                    .or_insert(environmentd_iam_role_arn);
1521                Some(annotations)
1522            } else {
1523                value.spec.service_account_annotations
1524            };
1525            let mut mz = Materialize {
1526                metadata: value.metadata,
1527                spec: MaterializeSpec {
1528                    environmentd_image_ref: value.spec.environmentd_image_ref,
1529                    environmentd_extra_args: value.spec.environmentd_extra_args,
1530                    environmentd_extra_env: value.spec.environmentd_extra_env,
1531                    environmentd_connection_role_arn: value.spec.environmentd_connection_role_arn,
1532                    environmentd_resource_requirements: value
1533                        .spec
1534                        .environmentd_resource_requirements,
1535                    environmentd_scratch_volume_storage_requirement: value
1536                        .spec
1537                        .environmentd_scratch_volume_storage_requirement,
1538                    balancerd_resource_requirements: value.spec.balancerd_resource_requirements,
1539                    console_resource_requirements: value.spec.console_resource_requirements,
1540                    balancerd_replicas: value.spec.balancerd_replicas,
1541                    console_replicas: value.spec.console_replicas,
1542                    service_account_name: value.spec.service_account_name,
1543                    service_account_annotations,
1544                    service_account_labels: value.spec.service_account_labels,
1545                    pod_annotations: value.spec.pod_annotations,
1546                    pod_labels: value.spec.pod_labels,
1547                    force_promote: if value.spec.force_promote.is_empty()
1548                        || &value.spec.force_promote == "00000000-0000-0000-0000-000000000000"
1549                    {
1550                        None
1551                    } else {
1552                        Some(value.spec.force_promote.to_string())
1553                    },
1554                    force_rollout: value.spec.force_rollout,
1555                    rollout_strategy: value.spec.rollout_strategy,
1556                    rollout_request_timeout: value.spec.rollout_request_timeout,
1557                    backend_secret_name: value.spec.backend_secret_name,
1558                    authenticator_kind: value.spec.authenticator_kind,
1559                    enable_rbac: value.spec.enable_rbac,
1560                    environment_id: value.spec.environment_id,
1561                    system_parameter_configmap_name: value.spec.system_parameter_configmap_name,
1562                    balancerd_external_certificate_spec: value
1563                        .spec
1564                        .balancerd_external_certificate_spec,
1565                    console_external_certificate_spec: value.spec.console_external_certificate_spec,
1566                    internal_certificate_spec: value.spec.internal_certificate_spec,
1567                },
1568                status: None,
1569            };
1570            let calculated_rollout_hash = mz.generate_rollout_hash();
1571            let last_completed_rollout_hash = match value
1572                .status
1573                .as_ref()
1574                .and_then(|status| status.last_completed_rollout_hash.to_owned())
1575            {
1576                Some(last_completed_rollout_hash) => Some(last_completed_rollout_hash),
1577                None => {
1578                    let currently_rolling_out = value
1579                        .status
1580                        .as_ref()
1581                        .map(|status| {
1582                            status.last_completed_rollout_request != value.spec.request_rollout
1583                                // If this is the first apply,
1584                                // these could both be nil and we still need to do a rollout.
1585                                || status.last_completed_rollout_request.is_nil()
1586                        })
1587                        .unwrap_or(true);
1588                    if currently_rolling_out {
1589                        // If they store a change, we're going to start over on a new rollout.
1590                        None
1591                    } else {
1592                        Some(calculated_rollout_hash.clone())
1593                    }
1594                }
1595            };
1596            let requested_rollout_hash = if is_promoting {
1597                None
1598            } else {
1599                Some(calculated_rollout_hash)
1600            };
1601            mz.status = value.status.map(|status| MaterializeStatus {
1602                resource_id: status.resource_id,
1603                active_generation: status.active_generation,
1604                last_completed_rollout_environmentd_image_ref: status
1605                    .last_completed_rollout_environmentd_image_ref,
1606                last_completed_rollout_hash,
1607                requested_rollout_hash,
1608                conditions: status.conditions,
1609            });
1610            mz
1611        }
1612    }
1613}
1614
1615fn parse_image_ref(image_ref: &str) -> Option<Version> {
1616    image_ref
1617        .rsplit_once(':')
1618        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
1619        .and_then(|tag| {
1620            // To work around Docker tag restrictions, build metadata in
1621            // a Docker tag is delimited by `--` rather than the SemVer
1622            // `+` delimiter. So we need to swap the delimiter back to
1623            // `+` before parsing it as SemVer.
1624            let tag = tag.replace("--", "+");
1625            Version::parse(&tag).ok()
1626        })
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use std::time::Duration;
1632
1633    use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
1634    use k8s_openapi::jiff::Timestamp;
1635    use kube::core::ObjectMeta;
1636    use semver::Version;
1637
1638    use super::v1alpha1::{Materialize, MaterializeSpec, MaterializeStatus};
1639    use super::{DEFAULT_ROLLOUT_REQUEST_TIMEOUT, RolloutRequestTimeout};
1640
1641    #[mz_ore::test]
1642    fn meets_minimum_version() {
1643        let mut mz = Materialize {
1644            spec: MaterializeSpec {
1645                environmentd_image_ref:
1646                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
1647                        .to_owned(),
1648                ..Default::default()
1649            },
1650            metadata: ObjectMeta {
1651                ..Default::default()
1652            },
1653            status: None,
1654        };
1655
1656        // true cases
1657        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1658        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
1659        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1660        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
1661        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1662        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
1663        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1664        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
1665        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1666        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
1667        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1668        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
1669        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1670        mz.spec.environmentd_image_ref =
1671            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
1672                .to_owned();
1673        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
1674
1675        // false cases
1676        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
1677        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1678        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
1679        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1680        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
1681        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
1682        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
1683        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
1684    }
1685
1686    #[mz_ore::test]
1687    fn within_upgrade_window() {
1688        let mut mz = Materialize {
1689            spec: MaterializeSpec {
1690                environmentd_image_ref: "materialize/environmentd:v26.0.0".to_owned(),
1691                ..Default::default()
1692            },
1693            metadata: ObjectMeta {
1694                ..Default::default()
1695            },
1696            status: Some(MaterializeStatus {
1697                last_completed_rollout_environmentd_image_ref: Some(
1698                    "materialize/environmentd:v26.0.0".to_owned(),
1699                ),
1700                ..Default::default()
1701            }),
1702        };
1703
1704        // Pass: upgrading from 26.0.0 to 27.7.3 (within 1 major version)
1705        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.3".to_owned();
1706        assert!(mz.within_upgrade_window());
1707
1708        // Pass: upgrading from 26.0.0 to 27.7.8-dev.0 (within 1 major version, pre-release)
1709        mz.spec.environmentd_image_ref = "materialize/environmentd:v27.7.8-dev.0".to_owned();
1710        assert!(mz.within_upgrade_window());
1711
1712        // Fail: upgrading from 26.0.0 to 28.0.1 (more than 1 major version)
1713        mz.spec.environmentd_image_ref = "materialize/environmentd:v28.0.1".to_owned();
1714        assert!(!mz.within_upgrade_window());
1715
1716        // Pass: upgrading from 26.0.0 to 28.0.1.not_a_valid_version (invalid version, defaults to true)
1717        mz.spec.environmentd_image_ref =
1718            "materialize/environmentd:v28.0.1.not_a_valid_version".to_owned();
1719        assert!(mz.within_upgrade_window());
1720
1721        // Pass: upgrading from 0.164.0 to 26.1.0 (self managed 25.2 to 26.0)
1722        mz.status
1723            .as_mut()
1724            .unwrap()
1725            .last_completed_rollout_environmentd_image_ref =
1726            Some("materialize/environmentd:v0.147.20".to_owned());
1727        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.1.0".to_owned();
1728        assert!(mz.within_upgrade_window());
1729
1730        // Pass: upgrading from 26.11.0-dev.0+b to 26.11.0-dev.0+a (same major.minor.patch.prerelease, different build metadata)
1731        mz.status
1732            .as_mut()
1733            .unwrap()
1734            .last_completed_rollout_environmentd_image_ref =
1735            Some("materialize/environmentd:v26.11.0-dev.0+b".to_owned());
1736        mz.spec.environmentd_image_ref = "materialize/environmentd:v26.11.0-dev.0+a".to_owned();
1737        assert!(mz.within_upgrade_window());
1738    }
1739
1740    #[mz_ore::test]
1741    fn is_valid_upgrade_version() {
1742        let success_tests = [
1743            (Version::new(0, 83, 0), Version::new(0, 83, 0)),
1744            (Version::new(0, 83, 0), Version::new(0, 84, 0)),
1745            (Version::new(0, 9, 0), Version::new(0, 10, 0)),
1746            (Version::new(0, 99, 0), Version::new(0, 100, 0)),
1747            (Version::new(0, 83, 0), Version::new(0, 83, 1)),
1748            (Version::new(0, 83, 0), Version::new(0, 83, 2)),
1749            (Version::new(0, 83, 2), Version::new(0, 83, 10)),
1750            // 0.147.20 to 26.0.0 represents the Self Managed 25.2 to 26.0 upgrade
1751            (Version::new(0, 147, 20), Version::new(26, 0, 0)),
1752            (Version::new(0, 164, 0), Version::new(26, 0, 0)),
1753            (Version::new(26, 0, 0), Version::new(26, 1, 0)),
1754            (Version::new(26, 5, 3), Version::new(26, 10, 0)),
1755            (Version::new(0, 130, 0), Version::new(0, 147, 0)),
1756        ];
1757        for (active_version, next_version) in success_tests {
1758            assert!(
1759                Materialize::is_valid_upgrade_version(&active_version, &next_version),
1760                "v{active_version} can upgrade to v{next_version}"
1761            );
1762        }
1763
1764        let failure_tests = [
1765            (Version::new(0, 83, 0), Version::new(0, 82, 0)),
1766            (Version::new(0, 83, 3), Version::new(0, 83, 2)),
1767            (Version::new(0, 83, 3), Version::new(1, 83, 3)),
1768            (Version::new(0, 83, 0), Version::new(0, 85, 0)),
1769            (Version::new(26, 0, 0), Version::new(28, 0, 0)),
1770            (Version::new(0, 130, 0), Version::new(26, 1, 0)),
1771            // Disallow anything before 0.147.20 to upgrade
1772            (Version::new(0, 147, 1), Version::new(26, 0, 0)),
1773            // Disallow anything between 0.148.0 and 0.164.0 to upgrade
1774            (Version::new(0, 148, 0), Version::new(26, 0, 0)),
1775        ];
1776        for (active_version, next_version) in failure_tests {
1777            assert!(
1778                !Materialize::is_valid_upgrade_version(&active_version, &next_version),
1779                "v{active_version} can't upgrade to v{next_version}"
1780            );
1781        }
1782    }
1783
1784    #[mz_ore::test]
1785    fn rollout_request_timeout() {
1786        let mz_with = |timeout: &str| Materialize {
1787            spec: MaterializeSpec {
1788                rollout_request_timeout: RolloutRequestTimeout(timeout.to_owned()),
1789                ..Default::default()
1790            },
1791            metadata: ObjectMeta::default(),
1792            status: None,
1793        };
1794
1795        // The default const is a valid duration and resolves to 24h.
1796        let default = humantime::parse_duration(DEFAULT_ROLLOUT_REQUEST_TIMEOUT).unwrap();
1797        assert_eq!(default, Duration::from_secs(24 * 60 * 60));
1798
1799        // The field's Default (used by `MaterializeSpec::default()` and serde's
1800        // `#[serde(default)]`) is the 24h default, with no empty intermediate.
1801        assert_eq!(
1802            RolloutRequestTimeout::default().0,
1803            DEFAULT_ROLLOUT_REQUEST_TIMEOUT
1804        );
1805        assert_eq!(
1806            Materialize {
1807                spec: MaterializeSpec::default(),
1808                metadata: ObjectMeta::default(),
1809                status: None,
1810            }
1811            .rollout_request_timeout(),
1812            default
1813        );
1814
1815        // Parseable values are honored.
1816        assert_eq!(
1817            mz_with("1h").rollout_request_timeout(),
1818            Duration::from_secs(60 * 60)
1819        );
1820        assert_eq!(
1821            mz_with("90m").rollout_request_timeout(),
1822            Duration::from_secs(90 * 60)
1823        );
1824        assert_eq!(
1825            mz_with("1h 30m").rollout_request_timeout(),
1826            Duration::from_secs(90 * 60)
1827        );
1828        // Unparseable values fall back to the default.
1829        assert_eq!(mz_with("not a duration").rollout_request_timeout(), default);
1830    }
1831
1832    #[mz_ore::test]
1833    fn rollout_request_timeout_schema_default() {
1834        // The default must be surfaced in the generated CRD's OpenAPI schema
1835        // (not just in the Rust helper), so the Kubernetes API server defaults
1836        // omitted fields and `kubectl explain` shows it.
1837        let crd = serde_json::to_value(<Materialize as kube::CustomResourceExt>::crd())
1838            .expect("CRD serializes");
1839        let default = &crd["spec"]["versions"][0]["schema"]["openAPIV3Schema"]["properties"]["spec"]
1840            ["properties"]["rolloutRequestTimeout"]["default"];
1841        assert_eq!(
1842            default,
1843            &serde_json::json!(DEFAULT_ROLLOUT_REQUEST_TIMEOUT),
1844            "rolloutRequestTimeout schema default missing/wrong in generated CRD",
1845        );
1846    }
1847
1848    #[mz_ore::test]
1849    fn rollout_in_progress_since() {
1850        let now = Timestamp::now();
1851        let condition = |type_: &str, status: &str| Condition {
1852            type_: type_.to_owned(),
1853            status: status.to_owned(),
1854            last_transition_time: Time(now),
1855            message: String::new(),
1856            observed_generation: None,
1857            reason: "Test".to_owned(),
1858        };
1859        let mz_with = |conditions: Vec<Condition>| Materialize {
1860            spec: MaterializeSpec::default(),
1861            metadata: ObjectMeta::default(),
1862            status: Some(MaterializeStatus {
1863                conditions,
1864                ..Default::default()
1865            }),
1866        };
1867
1868        // No status at all.
1869        let mz = Materialize {
1870            spec: MaterializeSpec::default(),
1871            metadata: ObjectMeta::default(),
1872            status: None,
1873        };
1874        assert_eq!(mz.rollout_in_progress_since(), None);
1875
1876        // A timeout-eligible rollout in progress is signalled by an `Unknown`
1877        // `UpToDate` condition (the Applying and ReadyToPromote phases).
1878        assert_eq!(
1879            mz_with(vec![condition("UpToDate", "Unknown")]).rollout_in_progress_since(),
1880            Some(now)
1881        );
1882
1883        // The `Promoting` phase is also `Unknown`, but must NOT be reported:
1884        // once promoting, the rollout can no longer be cancelled by the
1885        // timeout.
1886        assert_eq!(
1887            mz_with(vec![Condition {
1888                reason: "Promoting".to_owned(),
1889                ..condition("UpToDate", "Unknown")
1890            }])
1891            .rollout_in_progress_since(),
1892            None
1893        );
1894
1895        // A settled rollout (True/False) is not in progress.
1896        assert_eq!(
1897            mz_with(vec![condition("UpToDate", "True")]).rollout_in_progress_since(),
1898            None
1899        );
1900        assert_eq!(
1901            mz_with(vec![condition("UpToDate", "False")]).rollout_in_progress_since(),
1902            None
1903        );
1904    }
1905
1906    #[mz_ore::test]
1907    fn up_to_date_transition_time() {
1908        // Two distinct, fixed instants so we can tell "carried the old
1909        // timestamp" apart from "reset to now".
1910        let stored = Timestamp::from_second(1_000).unwrap();
1911        let now = Timestamp::from_second(2_000).unwrap();
1912
1913        let condition = |status: &str| Condition {
1914            type_: "UpToDate".to_owned(),
1915            status: status.to_owned(),
1916            last_transition_time: Time(stored),
1917            message: String::new(),
1918            observed_generation: None,
1919            reason: "Test".to_owned(),
1920        };
1921        let mz_with = |conditions: Vec<Condition>| Materialize {
1922            spec: MaterializeSpec::default(),
1923            metadata: ObjectMeta::default(),
1924            status: Some(MaterializeStatus {
1925                conditions,
1926                ..Default::default()
1927            }),
1928        };
1929
1930        // No prior condition: use `now`.
1931        let mz = Materialize {
1932            spec: MaterializeSpec::default(),
1933            metadata: ObjectMeta::default(),
1934            status: None,
1935        };
1936        assert_eq!(mz.up_to_date_transition_time("Unknown", now), now);
1937
1938        // Same status as the prior condition: carry its timestamp forward, so
1939        // consecutive same-status phases (Applying -> ReadyToPromote) share one
1940        // timer.
1941        assert_eq!(
1942            mz_with(vec![condition("Unknown")]).up_to_date_transition_time("Unknown", now),
1943            stored
1944        );
1945
1946        // Status changed: reset to `now`.
1947        assert_eq!(
1948            mz_with(vec![condition("Unknown")]).up_to_date_transition_time("True", now),
1949            now
1950        );
1951    }
1952
1953    #[mz_ore::test]
1954    fn active_environmentd_image_ref() {
1955        const OLD: &str = "materialize/environmentd:v26.0.0";
1956        const NEW: &str = "materialize/environmentd:v27.0.0";
1957
1958        let mz_with = |spec_image: &str, status: Option<MaterializeStatus>| Materialize {
1959            spec: MaterializeSpec {
1960                environmentd_image_ref: spec_image.to_owned(),
1961                ..Default::default()
1962            },
1963            metadata: ObjectMeta::default(),
1964            status,
1965        };
1966
1967        // No status yet (pre-initial-reconcile): fall back to spec.
1968        let mz = mz_with(NEW, None);
1969        assert_eq!(mz.active_environmentd_image_ref(), NEW);
1970
1971        // Status present but last_completed_rollout_environmentd_image_ref
1972        // unset (e.g. resource upgraded from older orchestratord that didn't
1973        // populate the field): fall back to spec.
1974        let mz = mz_with(
1975            NEW,
1976            Some(MaterializeStatus {
1977                last_completed_rollout_environmentd_image_ref: None,
1978                ..Default::default()
1979            }),
1980        );
1981        assert_eq!(mz.active_environmentd_image_ref(), NEW);
1982
1983        // Steady state: spec image == last completed image. Either source is
1984        // fine; the method must return that image.
1985        let mz = mz_with(
1986            NEW,
1987            Some(MaterializeStatus {
1988                last_completed_rollout_environmentd_image_ref: Some(NEW.to_owned()),
1989                ..Default::default()
1990            }),
1991        );
1992        assert_eq!(mz.active_environmentd_image_ref(), NEW);
1993
1994        // DEP-42 / mid-rollout: spec image == NEW but last_completed_* still
1995        // holds OLD — either because the user canceled the rollout by
1996        // reverting only requestRollout, or because the new generation has
1997        // not yet been promoted. The active environmentd is still OLD, so
1998        // downstream resources must track OLD. Without this method,
1999        // balancerd would inherit the spec's NEW image while environmentd
2000        // still runs OLD, leaving balancerd pods skewed from the running
2001        // env.
2002        let mz = mz_with(
2003            NEW,
2004            Some(MaterializeStatus {
2005                last_completed_rollout_environmentd_image_ref: Some(OLD.to_owned()),
2006                ..Default::default()
2007            }),
2008        );
2009        assert_eq!(mz.active_environmentd_image_ref(), OLD);
2010    }
2011}