mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    // This is intentionally a subset of the fields of a Certificate.
41    // We do not want customers to configure options that may conflict with
42    // things we override or expand in our code.
43    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        // Additional DNS names the cert will be valid for.
47        pub dns_names: Option<Vec<String>>,
48        // Duration the certificate will be requested for.
49        // Value must be in units accepted by Go time.ParseDuration
50        // https://golang.org/pkg/time/#ParseDuration.
51        pub duration: Option<String>,
52        // Duration before expiration the certificate will be renewed.
53        // Value must be in units accepted by Go time.ParseDuration
54        // https://golang.org/pkg/time/#ParseDuration.
55        pub renew_before: Option<String>,
56        // Reference to an Issuer or ClusterIssuer that will generate the certificate.
57        pub issuer_ref: Option<CertificateIssuerRef>,
58        // Additional annotations and labels to include in the Certificate object.
59        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62    pub enum MaterializeRolloutStrategy {
63        // Default. Create a new generation of pods, leaving the old generation around until the
64        // new ones are ready to take over.
65        // This minimizes downtime, and is what almost everyone should use.
66        #[default]
67        WaitUntilReady,
68
69        // WARNING!!!
70        // THIS WILL CAUSE YOUR MATERIALIZE INSTANCE TO BE UNAVAILABLE FOR SOME TIME!!!
71        // WARNING!!!
72        //
73        // Tear down the old generation of pods and promote the new generation of pods immediately,
74        // without waiting for the new generation of pods to be ready.
75        //
76        // This strategy should ONLY be used by customers with physical hardware who do not have
77        // enough hardware for the WaitUntilReady strategy. If you think you want this, please
78        // consult with Materialize engineering to discuss your situation.
79        ImmediatelyPromoteCausingDowntime,
80    }
81
82    #[derive(
83        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84    )]
85    #[serde(rename_all = "camelCase")]
86    #[kube(
87        namespaced,
88        group = "materialize.cloud",
89        version = "v1alpha1",
90        kind = "Materialize",
91        singular = "materialize",
92        plural = "materializes",
93        shortname = "mzs",
94        status = "MaterializeStatus",
95        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97    )]
98    pub struct MaterializeSpec {
99        // The environmentd image to run
100        pub environmentd_image_ref: String,
101        // Extra args to pass to the environmentd binary
102        pub environmentd_extra_args: Option<Vec<String>>,
103        // Extra environment variables to pass to the environmentd binary
104        pub environmentd_extra_env: Option<Vec<EnvVar>>,
105        // DEPRECATED
106        // If running in AWS, override the IAM role to use to give
107        // environmentd access to the persist S3 bucket.
108        // DEPRECATED
109        // Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
110        pub environmentd_iam_role_arn: Option<String>,
111        // If running in AWS, override the IAM role to use to support
112        // the CREATE CONNECTION feature
113        pub environmentd_connection_role_arn: Option<String>,
114        // Resource requirements for the environmentd pod
115        pub environmentd_resource_requirements: Option<ResourceRequirements>,
116        // Amount of disk to allocate, if a storage class is provided
117        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
118        // Resource requirements for the balancerd pod
119        pub balancerd_resource_requirements: Option<ResourceRequirements>,
120        // Resource requirements for the console pod
121        pub console_resource_requirements: Option<ResourceRequirements>,
122        // Number of balancerd pods to create
123        pub balancerd_replicas: Option<i32>,
124        // Number of console pods to create
125        pub console_replicas: Option<i32>,
126
127        // Name of the kubernetes service account to use.
128        // If not set, we will create one with the same name as this Materialize object.
129        pub service_account_name: Option<String>,
130        // Annotations to apply to the service account
131        //
132        // Annotations on service accounts are commonly used by cloud providers for IAM.
133        // AWS uses "eks.amazonaws.com/role-arn".
134        // Azure uses "azure.workload.identity/client-id", but
135        // additionally requires "azure.workload.identity/use": "true" on the pods.
136        pub service_account_annotations: Option<BTreeMap<String, String>>,
137        // Labels to apply to the service account
138        pub service_account_labels: Option<BTreeMap<String, String>>,
139        // Annotations to apply to the pods
140        pub pod_annotations: Option<BTreeMap<String, String>>,
141        // Labels to apply to the pods
142        pub pod_labels: Option<BTreeMap<String, String>>,
143
144        // When changes are made to the environmentd resources (either via
145        // modifying fields in the spec here or by deploying a new
146        // orchestratord version which changes how resources are generated),
147        // existing environmentd processes won't be automatically restarted.
148        // In order to trigger a restart, the request_rollout field should be
149        // set to a new (random) value. Once the rollout completes, the value
150        // of status.last_completed_rollout_request will be set to this value
151        // to indicate completion.
152        //
153        // Defaults to a random value in order to ensure that the first
154        // generation rollout is automatically triggered.
155        #[serde(default)]
156        pub request_rollout: Uuid,
157        // If force_promote is set to the same value as request_rollout, the
158        // current rollout will skip waiting for clusters in the new
159        // generation to rehydrate before promoting the new environmentd to
160        // leader.
161        #[serde(default)]
162        pub force_promote: Uuid,
163        // This value will be written to an annotation in the generated
164        // environmentd statefulset, in order to force the controller to
165        // detect the generated resources as changed even if no other changes
166        // happened. This can be used to force a rollout to a new generation
167        // even without making any meaningful changes.
168        #[serde(default)]
169        pub force_rollout: Uuid,
170        // Deprecated and ignored. Use rollout_strategy instead.
171        #[serde(default)]
172        pub in_place_rollout: bool,
173        // Rollout strategy to use when upgrading this Materialize instance.
174        #[serde(default)]
175        pub rollout_strategy: MaterializeRolloutStrategy,
176        // The name of a secret containing metadata_backend_url and persist_backend_url.
177        // It may also contain external_login_password_mz_system, which will be used as
178        // the password for the mz_system user if authenticator_kind is Password.
179        pub backend_secret_name: String,
180        // How to authenticate with Materialize. Valid options are Password and None.
181        // If set to Password, the backend secret must contain external_login_password_mz_system.
182        #[serde(default)]
183        pub authenticator_kind: AuthenticatorKind,
184        // Whether to enable role based access control. Defaults to false.
185        #[serde(default)]
186        pub enable_rbac: bool,
187
188        // The value used by environmentd (via the --environment-id flag) to
189        // uniquely identify this instance. Must be globally unique, and
190        // is required if a license key is not provided.
191        // NOTE: This value MUST NOT be changed in an existing instance,
192        // since it affects things like the way data is stored in the persist
193        // backend.
194        #[serde(default)]
195        pub environment_id: Uuid,
196
197        // The configuration for generating an x509 certificate using cert-manager for balancerd
198        // to present to incoming connections.
199        // The dns_names and issuer_ref fields are required.
200        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
201        // The configuration for generating an x509 certificate using cert-manager for the console
202        // to present to incoming connections.
203        // The dns_names and issuer_ref fields are required.
204        // Not yet implemented.
205        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
206        // The cert-manager Issuer or ClusterIssuer to use for database internal communication.
207        // The issuer_ref field is required.
208        // This currently is only used for environmentd, but will eventually support clusterd.
209        pub internal_certificate_spec: Option<MaterializeCertSpec>,
210    }
211
212    impl Materialize {
213        pub fn backend_secret_name(&self) -> String {
214            self.spec.backend_secret_name.clone()
215        }
216
217        pub fn namespace(&self) -> String {
218            self.meta().namespace.clone().unwrap()
219        }
220
221        pub fn create_service_account(&self) -> bool {
222            self.spec.service_account_name.is_none()
223        }
224
225        pub fn service_account_name(&self) -> String {
226            self.spec
227                .service_account_name
228                .clone()
229                .unwrap_or_else(|| self.name_unchecked())
230        }
231
232        pub fn role_name(&self) -> String {
233            self.name_unchecked()
234        }
235
236        pub fn role_binding_name(&self) -> String {
237            self.name_unchecked()
238        }
239
240        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
241            self.name_prefixed(&format!("environmentd-{generation}"))
242        }
243
244        pub fn environmentd_app_name(&self) -> String {
245            "environmentd".to_owned()
246        }
247
248        pub fn environmentd_service_name(&self) -> String {
249            self.name_prefixed("environmentd")
250        }
251
252        pub fn environmentd_service_internal_fqdn(&self) -> String {
253            format!(
254                "{}.{}.svc.cluster.local",
255                self.environmentd_service_name(),
256                self.meta().namespace.as_ref().unwrap()
257            )
258        }
259
260        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
261            self.name_prefixed(&format!("environmentd-{generation}"))
262        }
263
264        pub fn balancerd_app_name(&self) -> String {
265            "balancerd".to_owned()
266        }
267
268        pub fn environmentd_certificate_name(&self) -> String {
269            self.name_prefixed("environmentd-external")
270        }
271
272        pub fn environmentd_certificate_secret_name(&self) -> String {
273            self.name_prefixed("environmentd-tls")
274        }
275
276        pub fn balancerd_deployment_name(&self) -> String {
277            self.name_prefixed("balancerd")
278        }
279
280        pub fn balancerd_service_name(&self) -> String {
281            self.name_prefixed("balancerd")
282        }
283
284        pub fn console_app_name(&self) -> String {
285            "console".to_owned()
286        }
287
288        pub fn balancerd_external_certificate_name(&self) -> String {
289            self.name_prefixed("balancerd-external")
290        }
291
292        pub fn balancerd_external_certificate_secret_name(&self) -> String {
293            self.name_prefixed("balancerd-external-tls")
294        }
295
296        pub fn balancerd_replicas(&self) -> i32 {
297            self.spec.balancerd_replicas.unwrap_or(2)
298        }
299
300        pub fn console_replicas(&self) -> i32 {
301            self.spec.console_replicas.unwrap_or(2)
302        }
303
304        pub fn console_configmap_name(&self) -> String {
305            self.name_prefixed("console")
306        }
307
308        pub fn console_deployment_name(&self) -> String {
309            self.name_prefixed("console")
310        }
311
312        pub fn console_service_name(&self) -> String {
313            self.name_prefixed("console")
314        }
315
316        pub fn console_external_certificate_name(&self) -> String {
317            self.name_prefixed("console-external")
318        }
319
320        pub fn console_external_certificate_secret_name(&self) -> String {
321            self.name_prefixed("console-external-tls")
322        }
323
324        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
325            self.name_prefixed(&format!("persist-pubsub-{generation}"))
326        }
327
328        pub fn listeners_configmap_name(&self, generation: u64) -> String {
329            self.name_prefixed(&format!("listeners-{generation}"))
330        }
331
332        pub fn name_prefixed(&self, suffix: &str) -> String {
333            format!("mz{}-{}", self.resource_id(), suffix)
334        }
335
336        pub fn resource_id(&self) -> &str {
337            &self.status.as_ref().unwrap().resource_id
338        }
339
340        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
341            self.spec
342                .environmentd_scratch_volume_storage_requirement
343                .clone()
344                .unwrap_or_else(|| {
345                    self.spec
346                        .environmentd_resource_requirements
347                        .as_ref()
348                        .and_then(|requirements| {
349                            requirements
350                                .requests
351                                .as_ref()
352                                .or(requirements.limits.as_ref())
353                        })
354                        // TODO: in cloud, we've been defaulting to twice the
355                        // memory limit, but k8s-openapi doesn't seem to
356                        // provide any way to parse Quantity values, so there
357                        // isn't an easy way to do arithmetic on it
358                        .and_then(|requirements| requirements.get("memory").cloned())
359                        // TODO: is there a better default to use here?
360                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
361                })
362        }
363
364        pub fn default_labels(&self) -> BTreeMap<String, String> {
365            BTreeMap::from_iter([
366                (
367                    "materialize.cloud/organization-name".to_owned(),
368                    self.name_unchecked(),
369                ),
370                (
371                    "materialize.cloud/organization-namespace".to_owned(),
372                    self.namespace(),
373                ),
374                (
375                    "materialize.cloud/mz-resource-id".to_owned(),
376                    self.resource_id().to_owned(),
377                ),
378            ])
379        }
380
381        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
382            format!(
383                "{}-{}-{}-0",
384                cloud_provider, region, self.spec.environment_id,
385            )
386        }
387
388        pub fn requested_reconciliation_id(&self) -> Uuid {
389            self.spec.request_rollout
390        }
391
392        pub fn rollout_requested(&self) -> bool {
393            self.requested_reconciliation_id()
394                != self
395                    .status
396                    .as_ref()
397                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
398        }
399
400        pub fn set_force_promote(&mut self) {
401            self.spec.force_promote = self.spec.request_rollout;
402        }
403
404        pub fn should_force_promote(&self) -> bool {
405            self.spec.force_promote == self.spec.request_rollout
406                || self.spec.rollout_strategy
407                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
408        }
409
410        pub fn conditions_need_update(&self) -> bool {
411            let Some(status) = self.status.as_ref() else {
412                return true;
413            };
414            if status.conditions.is_empty() {
415                return true;
416            }
417            for condition in &status.conditions {
418                if condition.observed_generation != self.meta().generation {
419                    return true;
420                }
421            }
422            false
423        }
424
425        pub fn is_promoting(&self) -> bool {
426            let Some(status) = self.status.as_ref() else {
427                return false;
428            };
429            if status.conditions.is_empty() {
430                return false;
431            }
432            status
433                .conditions
434                .iter()
435                .any(|condition| condition.reason == "Promoting")
436        }
437
438        pub fn update_in_progress(&self) -> bool {
439            let Some(status) = self.status.as_ref() else {
440                return false;
441            };
442            if status.conditions.is_empty() {
443                return false;
444            }
445            for condition in &status.conditions {
446                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
447                    return true;
448                }
449            }
450            false
451        }
452
453        /// Checks that the given version is greater than or equal
454        /// to the existing version, if the existing version
455        /// can be parsed.
456        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
457            let version = parse_image_ref(&self.spec.environmentd_image_ref);
458            match version {
459                Some(version) => &version >= minimum,
460                // In the rare case that we see an image reference
461                // that we can't parse, we assume that it satisfies all
462                // version checks. Usually these are custom images that have
463                // been by a developer on a branch forked from a recent copy
464                // of main, and so this works out reasonably well in practice.
465                None => {
466                    tracing::warn!(
467                        image_ref = %self.spec.environmentd_image_ref,
468                        "failed to parse image ref",
469                    );
470                    true
471                }
472            }
473        }
474
475        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
476            ObjectMeta {
477                namespace: Some(self.namespace()),
478                name: Some(name),
479                labels: Some(self.default_labels()),
480                owner_references: Some(vec![owner_reference(self)]),
481                ..Default::default()
482            }
483        }
484
485        pub fn status(&self) -> MaterializeStatus {
486            self.status.clone().unwrap_or_else(|| {
487                let mut status = MaterializeStatus::default();
488                // DNS-1035 names are supposed to be case insensitive,
489                // so we define our own character set, rather than use the
490                // built-in Alphanumeric distribution from rand, which
491                // includes both upper and lowercase letters.
492                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
493                status.resource_id = rand::thread_rng()
494                    .sample_iter(Uniform::new(0, CHARSET.len()))
495                    .take(10)
496                    .map(|i| char::from(CHARSET[i]))
497                    .collect();
498
499                // If we're creating the initial status on an un-soft-deleted
500                // Environment we need to ensure that the last active generation
501                // is restored, otherwise the env will crash loop indefinitely
502                // as its catalog would have durably recorded a greater generation
503                if let Some(last_active_generation) = self
504                    .annotations()
505                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
506                {
507                    status.active_generation = last_active_generation
508                        .parse()
509                        .expect("valid int generation");
510                }
511
512                status
513            })
514        }
515    }
516
517    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
518    #[serde(rename_all = "camelCase")]
519    pub struct MaterializeStatus {
520        pub resource_id: String,
521        pub active_generation: u64,
522        pub last_completed_rollout_request: Uuid,
523        pub resources_hash: String,
524        pub conditions: Vec<Condition>,
525    }
526
527    impl MaterializeStatus {
528        pub fn needs_update(&self, other: &Self) -> bool {
529            let now = chrono::offset::Utc::now();
530            let mut a = self.clone();
531            for condition in &mut a.conditions {
532                condition.last_transition_time = Time(now);
533            }
534            let mut b = other.clone();
535            for condition in &mut b.conditions {
536                condition.last_transition_time = Time(now);
537            }
538            a != b
539        }
540    }
541}
542
543fn parse_image_ref(image_ref: &str) -> Option<Version> {
544    image_ref
545        .rsplit_once(':')
546        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
547        .and_then(|tag| {
548            // To work around Docker tag restrictions, build metadata in
549            // a Docker tag is delimited by `--` rather than the SemVer
550            // `+` delimiter. So we need to swap the delimiter back to
551            // `+` before parsing it as SemVer.
552            let tag = tag.replace("--", "+");
553            Version::parse(&tag).ok()
554        })
555}
556
557fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
558    OwnerReference {
559        api_version: T::api_version(&()).to_string(),
560        kind: T::kind(&()).to_string(),
561        name: t.name_unchecked(),
562        uid: t.uid().unwrap(),
563        block_owner_deletion: Some(true),
564        ..Default::default()
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use kube::core::ObjectMeta;
571    use semver::Version;
572
573    use super::v1alpha1::{Materialize, MaterializeSpec};
574
575    #[mz_ore::test]
576    fn meets_minimum_version() {
577        let mut mz = Materialize {
578            spec: MaterializeSpec {
579                environmentd_image_ref:
580                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
581                        .to_owned(),
582                ..Default::default()
583            },
584            metadata: ObjectMeta {
585                ..Default::default()
586            },
587            status: None,
588        };
589
590        // true cases
591        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
592        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
593        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
594        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
595        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
596        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
597        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
598        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
599        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
600        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
601        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
602        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
603        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
604        mz.spec.environmentd_image_ref =
605            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
606                .to_owned();
607        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
608
609        // false cases
610        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
611        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
612        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
613        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
614        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
615        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
616        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
617        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
618    }
619}