mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distr::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    // This is intentionally a subset of the fields of a Certificate.
41    // We do not want customers to configure options that may conflict with
42    // things we override or expand in our code.
43    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        /// Additional DNS names the certificate will be valid for.
47        pub dns_names: Option<Vec<String>>,
48        /// Duration the certificate will be requested for.
49        /// Value must be in units accepted by Go
50        /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
51        pub duration: Option<String>,
52        /// Duration before expiration the certificate will be renewed.
53        /// Value must be in units accepted by Go
54        /// [`time.ParseDuration`](https://golang.org/pkg/time/#ParseDuration).
55        pub renew_before: Option<String>,
56        /// Reference to an `Issuer` or `ClusterIssuer` that will generate the certificate.
57        pub issuer_ref: Option<CertificateIssuerRef>,
58        /// Additional annotations and labels to include in the Certificate object.
59        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62    pub enum MaterializeRolloutStrategy {
63        /// Create a new generation of pods, leaving the old generation around until the
64        /// new ones are ready to take over.
65        /// This minimizes downtime, and is what almost everyone should use.
66        #[default]
67        WaitUntilReady,
68
69        /// {{<warning>}}
70        /// THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
71        ///
72        /// This strategy should ONLY be used by customers with physical hardware who do not have
73        /// enough hardware for the `WaitUntilReady` strategy. If you think you want this, please
74        /// consult with Materialize engineering to discuss your situation.
75        /// {{</warning>}}
76        ///
77        /// Tear down the old generation of pods and promote the new generation of pods immediately,
78        /// without waiting for the new generation of pods to be ready.
79        ImmediatelyPromoteCausingDowntime,
80    }
81
82    #[derive(
83        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84    )]
85    #[serde(rename_all = "camelCase")]
86    #[kube(
87        namespaced,
88        group = "materialize.cloud",
89        version = "v1alpha1",
90        kind = "Materialize",
91        singular = "materialize",
92        plural = "materializes",
93        shortname = "mzs",
94        status = "MaterializeStatus",
95        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97    )]
98    pub struct MaterializeSpec {
99        /// The environmentd image to run.
100        pub environmentd_image_ref: String,
101        /// Extra args to pass to the environmentd binary.
102        pub environmentd_extra_args: Option<Vec<String>>,
103        /// Extra environment variables to pass to the environmentd binary.
104        pub environmentd_extra_env: Option<Vec<EnvVar>>,
105        /// {{<warning>}}
106        /// Deprecated.
107        ///
108        /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
109        /// {{</warning>}}
110        ///
111        /// If running in AWS, override the IAM role to use to give
112        /// environmentd access to the persist S3 bucket.
113        #[kube(deprecated)]
114        pub environmentd_iam_role_arn: Option<String>,
115        /// If running in AWS, override the IAM role to use to support
116        /// the CREATE CONNECTION feature.
117        pub environmentd_connection_role_arn: Option<String>,
118        /// Resource requirements for the environmentd pod.
119        pub environmentd_resource_requirements: Option<ResourceRequirements>,
120        /// Amount of disk to allocate, if a storage class is provided.
121        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
122        /// Resource requirements for the balancerd pod.
123        pub balancerd_resource_requirements: Option<ResourceRequirements>,
124        /// Resource requirements for the console pod.
125        pub console_resource_requirements: Option<ResourceRequirements>,
126        /// Number of balancerd pods to create.
127        pub balancerd_replicas: Option<i32>,
128        /// Number of console pods to create.
129        pub console_replicas: Option<i32>,
130
131        /// Name of the kubernetes service account to use.
132        /// If not set, we will create one with the same name as this Materialize object.
133        pub service_account_name: Option<String>,
134        /// Annotations to apply to the service account.
135        ///
136        /// Annotations on service accounts are commonly used by cloud providers for IAM.
137        /// AWS uses "eks.amazonaws.com/role-arn".
138        /// Azure uses "azure.workload.identity/client-id", but
139        /// additionally requires "azure.workload.identity/use": "true" on the pods.
140        pub service_account_annotations: Option<BTreeMap<String, String>>,
141        /// Labels to apply to the service account.
142        pub service_account_labels: Option<BTreeMap<String, String>>,
143        /// Annotations to apply to the pods.
144        pub pod_annotations: Option<BTreeMap<String, String>>,
145        /// Labels to apply to the pods.
146        pub pod_labels: Option<BTreeMap<String, String>>,
147
148        /// When changes are made to the environmentd resources (either via
149        /// modifying fields in the spec here or by deploying a new
150        /// orchestratord version which changes how resources are generated),
151        /// existing environmentd processes won't be automatically restarted.
152        /// In order to trigger a restart, the request_rollout field should be
153        /// set to a new (random) value. Once the rollout completes, the value
154        /// of `status.lastCompletedRolloutRequest` will be set to this value
155        /// to indicate completion.
156        ///
157        /// Defaults to a random value in order to ensure that the first
158        /// generation rollout is automatically triggered.
159        #[serde(default)]
160        pub request_rollout: Uuid,
161        /// If `forcePromote` is set to the same value as `requestRollout`, the
162        /// current rollout will skip waiting for clusters in the new
163        /// generation to rehydrate before promoting the new environmentd to
164        /// leader.
165        #[serde(default)]
166        pub force_promote: Uuid,
167        /// This value will be written to an annotation in the generated
168        /// environmentd statefulset, in order to force the controller to
169        /// detect the generated resources as changed even if no other changes
170        /// happened. This can be used to force a rollout to a new generation
171        /// even without making any meaningful changes, by setting it to the
172        /// same value as `requestRollout`.
173        #[serde(default)]
174        pub force_rollout: Uuid,
175        /// {{<warning>}}
176        /// Deprecated and ignored. Use `rolloutStrategy` instead.
177        /// {{</warning>}}
178        #[kube(deprecated)]
179        #[serde(default)]
180        pub in_place_rollout: bool,
181        /// Rollout strategy to use when upgrading this Materialize instance.
182        #[serde(default)]
183        pub rollout_strategy: MaterializeRolloutStrategy,
184        /// The name of a secret containing `metadata_backend_url` and `persist_backend_url`.
185        /// It may also contain `external_login_password_mz_system`, which will be used as
186        /// the password for the `mz_system` user if `authenticatorKind` is `Password`.
187        pub backend_secret_name: String,
188        /// How to authenticate with Materialize.
189        #[serde(default)]
190        pub authenticator_kind: AuthenticatorKind,
191        /// Whether to enable role based access control. Defaults to false.
192        #[serde(default)]
193        pub enable_rbac: bool,
194
195        /// The value used by environmentd (via the --environment-id flag) to
196        /// uniquely identify this instance. Must be globally unique, and
197        /// is required if a license key is not provided.
198        /// NOTE: This value MUST NOT be changed in an existing instance,
199        /// since it affects things like the way data is stored in the persist
200        /// backend.
201        #[serde(default)]
202        pub environment_id: Uuid,
203
204        /// The configuration for generating an x509 certificate using cert-manager for balancerd
205        /// to present to incoming connections.
206        /// The `dnsNames` and `issuerRef` fields are required.
207        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
208        /// The configuration for generating an x509 certificate using cert-manager for the console
209        /// to present to incoming connections.
210        /// The `dnsNames` and `issuerRef` fields are required.
211        /// Not yet implemented.
212        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
213        /// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
214        /// The `issuerRef` field is required.
215        /// This currently is only used for environmentd, but will eventually support clusterd.
216        pub internal_certificate_spec: Option<MaterializeCertSpec>,
217    }
218
219    impl Materialize {
220        pub fn backend_secret_name(&self) -> String {
221            self.spec.backend_secret_name.clone()
222        }
223
224        pub fn namespace(&self) -> String {
225            self.meta().namespace.clone().unwrap()
226        }
227
228        pub fn create_service_account(&self) -> bool {
229            self.spec.service_account_name.is_none()
230        }
231
232        pub fn service_account_name(&self) -> String {
233            self.spec
234                .service_account_name
235                .clone()
236                .unwrap_or_else(|| self.name_unchecked())
237        }
238
239        pub fn role_name(&self) -> String {
240            self.name_unchecked()
241        }
242
243        pub fn role_binding_name(&self) -> String {
244            self.name_unchecked()
245        }
246
247        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
248            self.name_prefixed(&format!("environmentd-{generation}"))
249        }
250
251        pub fn environmentd_app_name(&self) -> String {
252            "environmentd".to_owned()
253        }
254
255        pub fn environmentd_service_name(&self) -> String {
256            self.name_prefixed("environmentd")
257        }
258
259        pub fn environmentd_service_internal_fqdn(&self) -> String {
260            format!(
261                "{}.{}.svc.cluster.local",
262                self.environmentd_service_name(),
263                self.meta().namespace.as_ref().unwrap()
264            )
265        }
266
267        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
268            self.name_prefixed(&format!("environmentd-{generation}"))
269        }
270
271        pub fn balancerd_app_name(&self) -> String {
272            "balancerd".to_owned()
273        }
274
275        pub fn environmentd_certificate_name(&self) -> String {
276            self.name_prefixed("environmentd-external")
277        }
278
279        pub fn environmentd_certificate_secret_name(&self) -> String {
280            self.name_prefixed("environmentd-tls")
281        }
282
283        pub fn balancerd_deployment_name(&self) -> String {
284            self.name_prefixed("balancerd")
285        }
286
287        pub fn balancerd_service_name(&self) -> String {
288            self.name_prefixed("balancerd")
289        }
290
291        pub fn console_app_name(&self) -> String {
292            "console".to_owned()
293        }
294
295        pub fn balancerd_external_certificate_name(&self) -> String {
296            self.name_prefixed("balancerd-external")
297        }
298
299        pub fn balancerd_external_certificate_secret_name(&self) -> String {
300            self.name_prefixed("balancerd-external-tls")
301        }
302
303        pub fn balancerd_replicas(&self) -> i32 {
304            self.spec.balancerd_replicas.unwrap_or(2)
305        }
306
307        pub fn console_replicas(&self) -> i32 {
308            self.spec.console_replicas.unwrap_or(2)
309        }
310
311        pub fn console_configmap_name(&self) -> String {
312            self.name_prefixed("console")
313        }
314
315        pub fn console_deployment_name(&self) -> String {
316            self.name_prefixed("console")
317        }
318
319        pub fn console_service_name(&self) -> String {
320            self.name_prefixed("console")
321        }
322
323        pub fn console_external_certificate_name(&self) -> String {
324            self.name_prefixed("console-external")
325        }
326
327        pub fn console_external_certificate_secret_name(&self) -> String {
328            self.name_prefixed("console-external-tls")
329        }
330
331        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
332            self.name_prefixed(&format!("persist-pubsub-{generation}"))
333        }
334
335        pub fn listeners_configmap_name(&self, generation: u64) -> String {
336            self.name_prefixed(&format!("listeners-{generation}"))
337        }
338
339        pub fn name_prefixed(&self, suffix: &str) -> String {
340            format!("mz{}-{}", self.resource_id(), suffix)
341        }
342
343        pub fn resource_id(&self) -> &str {
344            &self.status.as_ref().unwrap().resource_id
345        }
346
347        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
348            self.spec
349                .environmentd_scratch_volume_storage_requirement
350                .clone()
351                .unwrap_or_else(|| {
352                    self.spec
353                        .environmentd_resource_requirements
354                        .as_ref()
355                        .and_then(|requirements| {
356                            requirements
357                                .requests
358                                .as_ref()
359                                .or(requirements.limits.as_ref())
360                        })
361                        // TODO: in cloud, we've been defaulting to twice the
362                        // memory limit, but k8s-openapi doesn't seem to
363                        // provide any way to parse Quantity values, so there
364                        // isn't an easy way to do arithmetic on it
365                        .and_then(|requirements| requirements.get("memory").cloned())
366                        // TODO: is there a better default to use here?
367                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
368                })
369        }
370
371        pub fn default_labels(&self) -> BTreeMap<String, String> {
372            BTreeMap::from_iter([
373                (
374                    "materialize.cloud/organization-name".to_owned(),
375                    self.name_unchecked(),
376                ),
377                (
378                    "materialize.cloud/organization-namespace".to_owned(),
379                    self.namespace(),
380                ),
381                (
382                    "materialize.cloud/mz-resource-id".to_owned(),
383                    self.resource_id().to_owned(),
384                ),
385            ])
386        }
387
388        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
389            format!(
390                "{}-{}-{}-0",
391                cloud_provider, region, self.spec.environment_id,
392            )
393        }
394
395        pub fn requested_reconciliation_id(&self) -> Uuid {
396            self.spec.request_rollout
397        }
398
399        pub fn rollout_requested(&self) -> bool {
400            self.requested_reconciliation_id()
401                != self
402                    .status
403                    .as_ref()
404                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
405        }
406
407        pub fn set_force_promote(&mut self) {
408            self.spec.force_promote = self.spec.request_rollout;
409        }
410
411        pub fn should_force_promote(&self) -> bool {
412            self.spec.force_promote == self.spec.request_rollout
413                || self.spec.rollout_strategy
414                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
415        }
416
417        pub fn conditions_need_update(&self) -> bool {
418            let Some(status) = self.status.as_ref() else {
419                return true;
420            };
421            if status.conditions.is_empty() {
422                return true;
423            }
424            for condition in &status.conditions {
425                if condition.observed_generation != self.meta().generation {
426                    return true;
427                }
428            }
429            false
430        }
431
432        pub fn is_promoting(&self) -> bool {
433            let Some(status) = self.status.as_ref() else {
434                return false;
435            };
436            if status.conditions.is_empty() {
437                return false;
438            }
439            status
440                .conditions
441                .iter()
442                .any(|condition| condition.reason == "Promoting")
443        }
444
445        pub fn update_in_progress(&self) -> bool {
446            let Some(status) = self.status.as_ref() else {
447                return false;
448            };
449            if status.conditions.is_empty() {
450                return false;
451            }
452            for condition in &status.conditions {
453                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
454                    return true;
455                }
456            }
457            false
458        }
459
460        /// Checks that the given version is greater than or equal
461        /// to the existing version, if the existing version
462        /// can be parsed.
463        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
464            let version = parse_image_ref(&self.spec.environmentd_image_ref);
465            match version {
466                Some(version) => &version >= minimum,
467                // In the rare case that we see an image reference
468                // that we can't parse, we assume that it satisfies all
469                // version checks. Usually these are custom images that have
470                // been by a developer on a branch forked from a recent copy
471                // of main, and so this works out reasonably well in practice.
472                None => {
473                    tracing::warn!(
474                        image_ref = %self.spec.environmentd_image_ref,
475                        "failed to parse image ref",
476                    );
477                    true
478                }
479            }
480        }
481
482        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
483            ObjectMeta {
484                namespace: Some(self.namespace()),
485                name: Some(name),
486                labels: Some(self.default_labels()),
487                owner_references: Some(vec![owner_reference(self)]),
488                ..Default::default()
489            }
490        }
491
492        pub fn status(&self) -> MaterializeStatus {
493            self.status.clone().unwrap_or_else(|| {
494                let mut status = MaterializeStatus::default();
495                // DNS-1035 names are supposed to be case insensitive,
496                // so we define our own character set, rather than use the
497                // built-in Alphanumeric distribution from rand, which
498                // includes both upper and lowercase letters.
499                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
500                status.resource_id = rand::rng()
501                    .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
502                    .take(10)
503                    .map(|i| char::from(CHARSET[i]))
504                    .collect();
505
506                // If we're creating the initial status on an un-soft-deleted
507                // Environment we need to ensure that the last active generation
508                // is restored, otherwise the env will crash loop indefinitely
509                // as its catalog would have durably recorded a greater generation
510                if let Some(last_active_generation) = self
511                    .annotations()
512                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
513                {
514                    status.active_generation = last_active_generation
515                        .parse()
516                        .expect("valid int generation");
517                }
518
519                status
520            })
521        }
522    }
523
524    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
525    #[serde(rename_all = "camelCase")]
526    pub struct MaterializeStatus {
527        /// Resource identifier used as a name prefix to avoid pod name collisions.
528        pub resource_id: String,
529        /// The generation of Materialize pods actively capable of servicing requests.
530        pub active_generation: u64,
531        /// The UUID of the last successfully completed rollout.
532        pub last_completed_rollout_request: Uuid,
533        /// A hash calculated from the spec of resources to be created based on this Materialize
534        /// spec. This is used for detecting when the existing resources are up to date.
535        /// If you want to trigger a rollout without making other changes that would cause this
536        /// hash to change, you must set forceRollout to the same UUID as requestRollout.
537        pub resources_hash: String,
538        pub conditions: Vec<Condition>,
539    }
540
541    impl MaterializeStatus {
542        pub fn needs_update(&self, other: &Self) -> bool {
543            let now = chrono::offset::Utc::now();
544            let mut a = self.clone();
545            for condition in &mut a.conditions {
546                condition.last_transition_time = Time(now);
547            }
548            let mut b = other.clone();
549            for condition in &mut b.conditions {
550                condition.last_transition_time = Time(now);
551            }
552            a != b
553        }
554    }
555}
556
557fn parse_image_ref(image_ref: &str) -> Option<Version> {
558    image_ref
559        .rsplit_once(':')
560        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
561        .and_then(|tag| {
562            // To work around Docker tag restrictions, build metadata in
563            // a Docker tag is delimited by `--` rather than the SemVer
564            // `+` delimiter. So we need to swap the delimiter back to
565            // `+` before parsing it as SemVer.
566            let tag = tag.replace("--", "+");
567            Version::parse(&tag).ok()
568        })
569}
570
571fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
572    OwnerReference {
573        api_version: T::api_version(&()).to_string(),
574        kind: T::kind(&()).to_string(),
575        name: t.name_unchecked(),
576        uid: t.uid().unwrap(),
577        block_owner_deletion: Some(true),
578        ..Default::default()
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use kube::core::ObjectMeta;
585    use semver::Version;
586
587    use super::v1alpha1::{Materialize, MaterializeSpec};
588
589    #[mz_ore::test]
590    fn meets_minimum_version() {
591        let mut mz = Materialize {
592            spec: MaterializeSpec {
593                environmentd_image_ref:
594                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
595                        .to_owned(),
596                ..Default::default()
597            },
598            metadata: ObjectMeta {
599                ..Default::default()
600            },
601            status: None,
602        };
603
604        // true cases
605        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
606        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
607        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
608        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
609        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
610        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
611        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
612        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
613        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
614        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
615        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
616        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
617        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
618        mz.spec.environmentd_image_ref =
619            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
620                .to_owned();
621        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
622
623        // false cases
624        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
625        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
626        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
627        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
628        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
629        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
630        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
631        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
632    }
633}