mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    // This is intentionally a subset of the fields of a Certificate.
41    // We do not want customers to configure options that may conflict with
42    // things we override or expand in our code.
43    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        // Additional DNS names the cert will be valid for.
47        pub dns_names: Option<Vec<String>>,
48        // Duration the certificate will be requested for.
49        // Value must be in units accepted by Go time.ParseDuration
50        // https://golang.org/pkg/time/#ParseDuration.
51        pub duration: Option<String>,
52        // Duration before expiration the certificate will be renewed.
53        // Value must be in units accepted by Go time.ParseDuration
54        // https://golang.org/pkg/time/#ParseDuration.
55        pub renew_before: Option<String>,
56        // Reference to an Issuer or ClusterIssuer that will generate the certificate.
57        pub issuer_ref: Option<CertificateIssuerRef>,
58        // Additional annotations and labels to include in the Certificate object.
59        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62    pub enum MaterializeRolloutStrategy {
63        // Default. Create a new generation of pods, leaving the old generation around until the
64        // new ones are ready to take over.
65        // This minimizes downtime, and is what almost everyone should use.
66        #[default]
67        WaitUntilReady,
68
69        // WARNING!!!
70        // THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
71        // WARNING!!!
72        //
73        // Tear down the old generation of pods and promote the new generation of pods immediately,
74        // without waiting for the new generation of pods to be ready.
75        //
76        // This strategy should ONLY be used by customers with physical hardware who do not have
77        // enough hardware for the WaitUntilReady strategy. If you think you want this, please
78        // consult with Materialize engineering to discuss your situation.
79        ImmediatelyPromoteCausingDowntime,
80    }
81
82    #[derive(
83        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84    )]
85    #[serde(rename_all = "camelCase")]
86    #[kube(
87        namespaced,
88        group = "materialize.cloud",
89        version = "v1alpha1",
90        kind = "Materialize",
91        singular = "materialize",
92        plural = "materializes",
93        shortname = "mzs",
94        status = "MaterializeStatus",
95        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97    )]
98    pub struct MaterializeSpec {
99        // The environmentd image to run
100        pub environmentd_image_ref: String,
101        // Extra args to pass to the environmentd binary
102        pub environmentd_extra_args: Option<Vec<String>>,
103        // Extra environment variables to pass to the environmentd binary
104        pub environmentd_extra_env: Option<Vec<EnvVar>>,
105        // DEPRECATED
106        // If running in AWS, override the IAM role to use to give
107        // environmentd access to the persist S3 bucket.
108        // DEPRECATED
109        // Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
110        pub environmentd_iam_role_arn: Option<String>,
111        // If running in AWS, override the IAM role to use to support
112        // the CREATE CONNECTION feature
113        pub environmentd_connection_role_arn: Option<String>,
114        // Resource requirements for the environmentd pod
115        pub environmentd_resource_requirements: Option<ResourceRequirements>,
116        // Amount of disk to allocate, if a storage class is provided
117        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
118        // Resource requirements for the balancerd pod
119        pub balancerd_resource_requirements: Option<ResourceRequirements>,
120        // Resource requirements for the console pod
121        pub console_resource_requirements: Option<ResourceRequirements>,
122        // Number of balancerd pods to create
123        pub balancerd_replicas: Option<i32>,
124        // Number of console pods to create
125        pub console_replicas: Option<i32>,
126
127        // Name of the kubernetes service account to use.
128        // If not set, we will create one with the same name as this Materialize object.
129        pub service_account_name: Option<String>,
130        // Annotations to apply to the service account
131        //
132        // Annotations on service accounts are commonly used by cloud providers for IAM.
133        // AWS uses "eks.amazonaws.com/role-arn".
134        // Azure uses "azure.workload.identity/client-id", but
135        // additionally requires "azure.workload.identity/use": "true" on the pods.
136        pub service_account_annotations: Option<BTreeMap<String, String>>,
137        // Labels to apply to the service account
138        pub service_account_labels: Option<BTreeMap<String, String>>,
139        // Annotations to apply to the pods
140        pub pod_annotations: Option<BTreeMap<String, String>>,
141        // Labels to apply to the pods
142        pub pod_labels: Option<BTreeMap<String, String>>,
143
144        // When changes are made to the environmentd resources (either via
145        // modifying fields in the spec here or by deploying a new
146        // orchestratord version which changes how resources are generated),
147        // existing environmentd processes won't be automatically restarted.
148        // In order to trigger a restart, the request_rollout field should be
149        // set to a new (random) value. Once the rollout completes, the value
150        // of status.last_completed_rollout_request will be set to this value
151        // to indicate completion.
152        //
153        // Defaults to a random value in order to ensure that the first
154        // generation rollout is automatically triggered.
155        #[serde(default)]
156        pub request_rollout: Uuid,
157        // If force_promote is set to the same value as request_rollout, the
158        // current rollout will skip waiting for clusters in the new
159        // generation to rehydrate before promoting the new environmentd to
160        // leader.
161        #[serde(default)]
162        pub force_promote: Uuid,
163        // This value will be written to an annotation in the generated
164        // environmentd statefulset, in order to force the controller to
165        // detect the generated resources as changed even if no other changes
166        // happened. This can be used to force a rollout to a new generation
167        // even without making any meaningful changes.
168        #[serde(default)]
169        pub force_rollout: Uuid,
170        // Deprecated and ignored. Use rollout_strategy instead.
171        #[serde(default)]
172        pub in_place_rollout: bool,
173        // Rollout strategy to use when upgrading this Materialize instance.
174        #[serde(default)]
175        pub rollout_strategy: MaterializeRolloutStrategy,
176        // The name of a secret containing metadata_backend_url and persist_backend_url.
177        // It may also contain external_login_password_mz_system, which will be used as
178        // the password for the mz_system user if authenticator_kind is Password or Sasl.
179        pub backend_secret_name: String,
180        // How to authenticate with Materialize. Valid options are Password, Sasl, and None.
181        // If set to Password or Sasl, the backend secret must contain external_login_password_mz_system.
182        // Sasl enables SCRAM-SHA-256 authentication for PostgreSQL wire protocol connections.
183        #[serde(default)]
184        pub authenticator_kind: AuthenticatorKind,
185        // Whether to enable role based access control. Defaults to false.
186        #[serde(default)]
187        pub enable_rbac: bool,
188
189        // The value used by environmentd (via the --environment-id flag) to
190        // uniquely identify this instance. Must be globally unique, and
191        // is required if a license key is not provided.
192        // NOTE: This value MUST NOT be changed in an existing instance,
193        // since it affects things like the way data is stored in the persist
194        // backend.
195        #[serde(default)]
196        pub environment_id: Uuid,
197
198        // The configuration for generating an x509 certificate using cert-manager for balancerd
199        // to present to incoming connections.
200        // The dns_names and issuer_ref fields are required.
201        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
202        // The configuration for generating an x509 certificate using cert-manager for the console
203        // to present to incoming connections.
204        // The dns_names and issuer_ref fields are required.
205        // Not yet implemented.
206        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
207        // The cert-manager Issuer or ClusterIssuer to use for database internal communication.
208        // The issuer_ref field is required.
209        // This currently is only used for environmentd, but will eventually support clusterd.
210        pub internal_certificate_spec: Option<MaterializeCertSpec>,
211    }
212
213    impl Materialize {
214        pub fn backend_secret_name(&self) -> String {
215            self.spec.backend_secret_name.clone()
216        }
217
218        pub fn namespace(&self) -> String {
219            self.meta().namespace.clone().unwrap()
220        }
221
222        pub fn create_service_account(&self) -> bool {
223            self.spec.service_account_name.is_none()
224        }
225
226        pub fn service_account_name(&self) -> String {
227            self.spec
228                .service_account_name
229                .clone()
230                .unwrap_or_else(|| self.name_unchecked())
231        }
232
233        pub fn role_name(&self) -> String {
234            self.name_unchecked()
235        }
236
237        pub fn role_binding_name(&self) -> String {
238            self.name_unchecked()
239        }
240
241        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
242            self.name_prefixed(&format!("environmentd-{generation}"))
243        }
244
245        pub fn environmentd_app_name(&self) -> String {
246            "environmentd".to_owned()
247        }
248
249        pub fn environmentd_service_name(&self) -> String {
250            self.name_prefixed("environmentd")
251        }
252
253        pub fn environmentd_service_internal_fqdn(&self) -> String {
254            format!(
255                "{}.{}.svc.cluster.local",
256                self.environmentd_service_name(),
257                self.meta().namespace.as_ref().unwrap()
258            )
259        }
260
261        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
262            self.name_prefixed(&format!("environmentd-{generation}"))
263        }
264
265        pub fn balancerd_app_name(&self) -> String {
266            "balancerd".to_owned()
267        }
268
269        pub fn environmentd_certificate_name(&self) -> String {
270            self.name_prefixed("environmentd-external")
271        }
272
273        pub fn environmentd_certificate_secret_name(&self) -> String {
274            self.name_prefixed("environmentd-tls")
275        }
276
277        pub fn balancerd_deployment_name(&self) -> String {
278            self.name_prefixed("balancerd")
279        }
280
281        pub fn balancerd_service_name(&self) -> String {
282            self.name_prefixed("balancerd")
283        }
284
285        pub fn console_app_name(&self) -> String {
286            "console".to_owned()
287        }
288
289        pub fn balancerd_external_certificate_name(&self) -> String {
290            self.name_prefixed("balancerd-external")
291        }
292
293        pub fn balancerd_external_certificate_secret_name(&self) -> String {
294            self.name_prefixed("balancerd-external-tls")
295        }
296
297        pub fn balancerd_replicas(&self) -> i32 {
298            self.spec.balancerd_replicas.unwrap_or(2)
299        }
300
301        pub fn console_replicas(&self) -> i32 {
302            self.spec.console_replicas.unwrap_or(2)
303        }
304
305        pub fn console_configmap_name(&self) -> String {
306            self.name_prefixed("console")
307        }
308
309        pub fn console_deployment_name(&self) -> String {
310            self.name_prefixed("console")
311        }
312
313        pub fn console_service_name(&self) -> String {
314            self.name_prefixed("console")
315        }
316
317        pub fn console_external_certificate_name(&self) -> String {
318            self.name_prefixed("console-external")
319        }
320
321        pub fn console_external_certificate_secret_name(&self) -> String {
322            self.name_prefixed("console-external-tls")
323        }
324
325        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
326            self.name_prefixed(&format!("persist-pubsub-{generation}"))
327        }
328
329        pub fn listeners_configmap_name(&self, generation: u64) -> String {
330            self.name_prefixed(&format!("listeners-{generation}"))
331        }
332
333        pub fn name_prefixed(&self, suffix: &str) -> String {
334            format!("mz{}-{}", self.resource_id(), suffix)
335        }
336
337        pub fn resource_id(&self) -> &str {
338            &self.status.as_ref().unwrap().resource_id
339        }
340
341        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
342            self.spec
343                .environmentd_scratch_volume_storage_requirement
344                .clone()
345                .unwrap_or_else(|| {
346                    self.spec
347                        .environmentd_resource_requirements
348                        .as_ref()
349                        .and_then(|requirements| {
350                            requirements
351                                .requests
352                                .as_ref()
353                                .or(requirements.limits.as_ref())
354                        })
355                        // TODO: in cloud, we've been defaulting to twice the
356                        // memory limit, but k8s-openapi doesn't seem to
357                        // provide any way to parse Quantity values, so there
358                        // isn't an easy way to do arithmetic on it
359                        .and_then(|requirements| requirements.get("memory").cloned())
360                        // TODO: is there a better default to use here?
361                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
362                })
363        }
364
365        pub fn default_labels(&self) -> BTreeMap<String, String> {
366            BTreeMap::from_iter([
367                (
368                    "materialize.cloud/organization-name".to_owned(),
369                    self.name_unchecked(),
370                ),
371                (
372                    "materialize.cloud/organization-namespace".to_owned(),
373                    self.namespace(),
374                ),
375                (
376                    "materialize.cloud/mz-resource-id".to_owned(),
377                    self.resource_id().to_owned(),
378                ),
379            ])
380        }
381
382        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
383            format!(
384                "{}-{}-{}-0",
385                cloud_provider, region, self.spec.environment_id,
386            )
387        }
388
389        pub fn requested_reconciliation_id(&self) -> Uuid {
390            self.spec.request_rollout
391        }
392
393        pub fn rollout_requested(&self) -> bool {
394            self.requested_reconciliation_id()
395                != self
396                    .status
397                    .as_ref()
398                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
399        }
400
401        pub fn set_force_promote(&mut self) {
402            self.spec.force_promote = self.spec.request_rollout;
403        }
404
405        pub fn should_force_promote(&self) -> bool {
406            self.spec.force_promote == self.spec.request_rollout
407                || self.spec.rollout_strategy
408                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
409        }
410
411        pub fn conditions_need_update(&self) -> bool {
412            let Some(status) = self.status.as_ref() else {
413                return true;
414            };
415            if status.conditions.is_empty() {
416                return true;
417            }
418            for condition in &status.conditions {
419                if condition.observed_generation != self.meta().generation {
420                    return true;
421                }
422            }
423            false
424        }
425
426        pub fn is_promoting(&self) -> bool {
427            let Some(status) = self.status.as_ref() else {
428                return false;
429            };
430            if status.conditions.is_empty() {
431                return false;
432            }
433            status
434                .conditions
435                .iter()
436                .any(|condition| condition.reason == "Promoting")
437        }
438
439        pub fn update_in_progress(&self) -> bool {
440            let Some(status) = self.status.as_ref() else {
441                return false;
442            };
443            if status.conditions.is_empty() {
444                return false;
445            }
446            for condition in &status.conditions {
447                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
448                    return true;
449                }
450            }
451            false
452        }
453
454        /// Checks that the given version is greater than or equal
455        /// to the existing version, if the existing version
456        /// can be parsed.
457        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
458            let version = parse_image_ref(&self.spec.environmentd_image_ref);
459            match version {
460                Some(version) => &version >= minimum,
461                // In the rare case that we see an image reference
462                // that we can't parse, we assume that it satisfies all
463                // version checks. Usually these are custom images that have
464                // been by a developer on a branch forked from a recent copy
465                // of main, and so this works out reasonably well in practice.
466                None => {
467                    tracing::warn!(
468                        image_ref = %self.spec.environmentd_image_ref,
469                        "failed to parse image ref",
470                    );
471                    true
472                }
473            }
474        }
475
476        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
477            ObjectMeta {
478                namespace: Some(self.namespace()),
479                name: Some(name),
480                labels: Some(self.default_labels()),
481                owner_references: Some(vec![owner_reference(self)]),
482                ..Default::default()
483            }
484        }
485
486        pub fn status(&self) -> MaterializeStatus {
487            self.status.clone().unwrap_or_else(|| {
488                let mut status = MaterializeStatus::default();
489                // DNS-1035 names are supposed to be case insensitive,
490                // so we define our own character set, rather than use the
491                // built-in Alphanumeric distribution from rand, which
492                // includes both upper and lowercase letters.
493                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
494                status.resource_id = rand::thread_rng()
495                    .sample_iter(Uniform::new(0, CHARSET.len()))
496                    .take(10)
497                    .map(|i| char::from(CHARSET[i]))
498                    .collect();
499
500                // If we're creating the initial status on an un-soft-deleted
501                // Environment we need to ensure that the last active generation
502                // is restored, otherwise the env will crash loop indefinitely
503                // as its catalog would have durably recorded a greater generation
504                if let Some(last_active_generation) = self
505                    .annotations()
506                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
507                {
508                    status.active_generation = last_active_generation
509                        .parse()
510                        .expect("valid int generation");
511                }
512
513                status
514            })
515        }
516    }
517
518    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
519    #[serde(rename_all = "camelCase")]
520    pub struct MaterializeStatus {
521        pub resource_id: String,
522        pub active_generation: u64,
523        pub last_completed_rollout_request: Uuid,
524        pub resources_hash: String,
525        pub conditions: Vec<Condition>,
526    }
527
528    impl MaterializeStatus {
529        pub fn needs_update(&self, other: &Self) -> bool {
530            let now = chrono::offset::Utc::now();
531            let mut a = self.clone();
532            for condition in &mut a.conditions {
533                condition.last_transition_time = Time(now);
534            }
535            let mut b = other.clone();
536            for condition in &mut b.conditions {
537                condition.last_transition_time = Time(now);
538            }
539            a != b
540        }
541    }
542}
543
544fn parse_image_ref(image_ref: &str) -> Option<Version> {
545    image_ref
546        .rsplit_once(':')
547        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
548        .and_then(|tag| {
549            // To work around Docker tag restrictions, build metadata in
550            // a Docker tag is delimited by `--` rather than the SemVer
551            // `+` delimiter. So we need to swap the delimiter back to
552            // `+` before parsing it as SemVer.
553            let tag = tag.replace("--", "+");
554            Version::parse(&tag).ok()
555        })
556}
557
558fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
559    OwnerReference {
560        api_version: T::api_version(&()).to_string(),
561        kind: T::kind(&()).to_string(),
562        name: t.name_unchecked(),
563        uid: t.uid().unwrap(),
564        block_owner_deletion: Some(true),
565        ..Default::default()
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use kube::core::ObjectMeta;
572    use semver::Version;
573
574    use super::v1alpha1::{Materialize, MaterializeSpec};
575
576    #[mz_ore::test]
577    fn meets_minimum_version() {
578        let mut mz = Materialize {
579            spec: MaterializeSpec {
580                environmentd_image_ref:
581                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
582                        .to_owned(),
583                ..Default::default()
584            },
585            metadata: ObjectMeta {
586                ..Default::default()
587            },
588            status: None,
589        };
590
591        // true cases
592        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
593        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
594        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
595        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
596        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
597        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
598        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
599        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
600        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
601        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
602        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
603        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
604        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
605        mz.spec.environmentd_image_ref =
606            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
607                .to_owned();
608        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
609
610        // false cases
611        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
612        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
613        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
614        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
615        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
616        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
617        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
618        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
619    }
620}