mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    // This is intentionally a subset of the fields of a Certificate.
41    // We do not want customers to configure options that may conflict with
42    // things we override or expand in our code.
43    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        // Additional DNS names the cert will be valid for.
47        pub dns_names: Option<Vec<String>>,
48        // Duration the certificate will be requested for.
49        // Value must be in units accepted by Go time.ParseDuration
50        // https://golang.org/pkg/time/#ParseDuration.
51        pub duration: Option<String>,
52        // Duration before expiration the certificate will be renewed.
53        // Value must be in units accepted by Go time.ParseDuration
54        // https://golang.org/pkg/time/#ParseDuration.
55        pub renew_before: Option<String>,
56        // Reference to an Issuer or ClusterIssuer that will generate the certificate.
57        pub issuer_ref: Option<CertificateIssuerRef>,
58        // Additional annotations and labels to include in the Certificate object.
59        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61
62    #[derive(
63        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
64    )]
65    #[serde(rename_all = "camelCase")]
66    #[kube(
67        namespaced,
68        group = "materialize.cloud",
69        version = "v1alpha1",
70        kind = "Materialize",
71        singular = "materialize",
72        plural = "materializes",
73        shortname = "mzs",
74        status = "MaterializeStatus",
75        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
76        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
77    )]
78    pub struct MaterializeSpec {
79        // The environmentd image to run
80        pub environmentd_image_ref: String,
81        // Extra args to pass to the environmentd binary
82        pub environmentd_extra_args: Option<Vec<String>>,
83        // Extra environment variables to pass to the environmentd binary
84        pub environmentd_extra_env: Option<Vec<EnvVar>>,
85        // DEPRECATED
86        // If running in AWS, override the IAM role to use to give
87        // environmentd access to the persist S3 bucket.
88        // DEPRECATED
89        // Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
90        pub environmentd_iam_role_arn: Option<String>,
91        // If running in AWS, override the IAM role to use to support
92        // the CREATE CONNECTION feature
93        pub environmentd_connection_role_arn: Option<String>,
94        // Resource requirements for the environmentd pod
95        pub environmentd_resource_requirements: Option<ResourceRequirements>,
96        // Amount of disk to allocate, if a storage class is provided
97        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
98        // Resource requirements for the balancerd pod
99        pub balancerd_resource_requirements: Option<ResourceRequirements>,
100        // Resource requirements for the console pod
101        pub console_resource_requirements: Option<ResourceRequirements>,
102        // Number of balancerd pods to create
103        pub balancerd_replicas: Option<i32>,
104        // Number of console pods to create
105        pub console_replicas: Option<i32>,
106
107        // Name of the kubernetes service account to use.
108        // If not set, we will create one with the same name as this Materialize object.
109        pub service_account_name: Option<String>,
110        // Annotations to apply to the service account
111        //
112        // Annotations on service accounts are commonly used by cloud providers for IAM.
113        // AWS uses "eks.amazonaws.com/role-arn".
114        // Azure uses "azure.workload.identity/client-id", but
115        // additionally requires "azure.workload.identity/use": "true" on the pods.
116        pub service_account_annotations: Option<BTreeMap<String, String>>,
117        // Labels to apply to the service account
118        pub service_account_labels: Option<BTreeMap<String, String>>,
119        // Annotations to apply to the pods
120        pub pod_annotations: Option<BTreeMap<String, String>>,
121        // Labels to apply to the pods
122        pub pod_labels: Option<BTreeMap<String, String>>,
123
124        // When changes are made to the environmentd resources (either via
125        // modifying fields in the spec here or by deploying a new
126        // orchestratord version which changes how resources are generated),
127        // existing environmentd processes won't be automatically restarted.
128        // In order to trigger a restart, the request_rollout field should be
129        // set to a new (random) value. Once the rollout completes, the value
130        // of status.last_completed_rollout_request will be set to this value
131        // to indicate completion.
132        //
133        // Defaults to a random value in order to ensure that the first
134        // generation rollout is automatically triggered.
135        #[serde(default)]
136        pub request_rollout: Uuid,
137        // If force_promote is set to the same value as request_rollout, the
138        // current rollout will skip waiting for clusters in the new
139        // generation to rehydrate before promoting the new environmentd to
140        // leader.
141        #[serde(default)]
142        pub force_promote: Uuid,
143        // This value will be written to an annotation in the generated
144        // environmentd statefulset, in order to force the controller to
145        // detect the generated resources as changed even if no other changes
146        // happened. This can be used to force a rollout to a new generation
147        // even without making any meaningful changes.
148        #[serde(default)]
149        pub force_rollout: Uuid,
150        // If false (the default), orchestratord will use the leader
151        // promotion codepath to minimize downtime during rollouts. If true,
152        // it will just kill the environmentd pod directly.
153        #[serde(default)]
154        pub in_place_rollout: bool,
155        // The name of a secret containing metadata_backend_url and persist_backend_url.
156        // It may also contain external_login_password_mz_system, which will be used as
157        // the password for the mz_system user if authenticator_kind is Password.
158        pub backend_secret_name: String,
159        // How to authenticate with Materialize. Valid options are Password and None.
160        // If set to Password, the backend secret must contain external_login_password_mz_system.
161        #[serde(default)]
162        pub authenticator_kind: AuthenticatorKind,
163        // Whether to enable role based access control. Defaults to false.
164        #[serde(default)]
165        pub enable_rbac: bool,
166
167        // The value used by environmentd (via the --environment-id flag) to
168        // uniquely identify this instance. Must be globally unique, and
169        // defaults to a random value.
170        // NOTE: This value MUST NOT be changed in an existing instance,
171        // since it affects things like the way data is stored in the persist
172        // backend.
173        #[serde(default)]
174        pub environment_id: Uuid,
175
176        // The configuration for generating an x509 certificate using cert-manager for balancerd
177        // to present to incoming connections.
178        // The dns_names and issuer_ref fields are required.
179        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
180        // The configuration for generating an x509 certificate using cert-manager for the console
181        // to present to incoming connections.
182        // The dns_names and issuer_ref fields are required.
183        // Not yet implemented.
184        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
185        // The cert-manager Issuer or ClusterIssuer to use for database internal communication.
186        // The issuer_ref field is required.
187        // This currently is only used for environmentd, but will eventually support clusterd.
188        pub internal_certificate_spec: Option<MaterializeCertSpec>,
189    }
190
191    impl Materialize {
192        pub fn backend_secret_name(&self) -> String {
193            self.spec.backend_secret_name.clone()
194        }
195
196        pub fn namespace(&self) -> String {
197            self.meta().namespace.clone().unwrap()
198        }
199
200        pub fn create_service_account(&self) -> bool {
201            self.spec.service_account_name.is_none()
202        }
203
204        pub fn service_account_name(&self) -> String {
205            self.spec
206                .service_account_name
207                .clone()
208                .unwrap_or_else(|| self.name_unchecked())
209        }
210
211        pub fn role_name(&self) -> String {
212            self.name_unchecked()
213        }
214
215        pub fn role_binding_name(&self) -> String {
216            self.name_unchecked()
217        }
218
219        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
220            self.name_prefixed(&format!("environmentd-{generation}"))
221        }
222
223        pub fn environmentd_app_name(&self) -> String {
224            "environmentd".to_owned()
225        }
226
227        pub fn environmentd_service_name(&self) -> String {
228            self.name_prefixed("environmentd")
229        }
230
231        pub fn environmentd_service_internal_fqdn(&self) -> String {
232            format!(
233                "{}.{}.svc.cluster.local",
234                self.environmentd_service_name(),
235                self.meta().namespace.as_ref().unwrap()
236            )
237        }
238
239        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
240            self.name_prefixed(&format!("environmentd-{generation}"))
241        }
242
243        pub fn balancerd_app_name(&self) -> String {
244            "balancerd".to_owned()
245        }
246
247        pub fn environmentd_certificate_name(&self) -> String {
248            self.name_prefixed("environmentd-external")
249        }
250
251        pub fn environmentd_certificate_secret_name(&self) -> String {
252            self.name_prefixed("environmentd-tls")
253        }
254
255        pub fn balancerd_deployment_name(&self) -> String {
256            self.name_prefixed("balancerd")
257        }
258
259        pub fn balancerd_service_name(&self) -> String {
260            self.name_prefixed("balancerd")
261        }
262
263        pub fn console_app_name(&self) -> String {
264            "console".to_owned()
265        }
266
267        pub fn balancerd_external_certificate_name(&self) -> String {
268            self.name_prefixed("balancerd-external")
269        }
270
271        pub fn balancerd_external_certificate_secret_name(&self) -> String {
272            self.name_prefixed("balancerd-external-tls")
273        }
274
275        pub fn balancerd_replicas(&self) -> i32 {
276            self.spec.balancerd_replicas.unwrap_or(2)
277        }
278
279        pub fn console_replicas(&self) -> i32 {
280            self.spec.console_replicas.unwrap_or(2)
281        }
282
283        pub fn console_configmap_name(&self) -> String {
284            self.name_prefixed("console")
285        }
286
287        pub fn console_deployment_name(&self) -> String {
288            self.name_prefixed("console")
289        }
290
291        pub fn console_service_name(&self) -> String {
292            self.name_prefixed("console")
293        }
294
295        pub fn console_external_certificate_name(&self) -> String {
296            self.name_prefixed("console-external")
297        }
298
299        pub fn console_external_certificate_secret_name(&self) -> String {
300            self.name_prefixed("console-external-tls")
301        }
302
303        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
304            self.name_prefixed(&format!("persist-pubsub-{generation}"))
305        }
306
307        pub fn listeners_configmap_name(&self, generation: u64) -> String {
308            self.name_prefixed(&format!("listeners-{generation}"))
309        }
310
311        pub fn name_prefixed(&self, suffix: &str) -> String {
312            format!("mz{}-{}", self.resource_id(), suffix)
313        }
314
315        pub fn resource_id(&self) -> &str {
316            &self.status.as_ref().unwrap().resource_id
317        }
318
319        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
320            self.spec
321                .environmentd_scratch_volume_storage_requirement
322                .clone()
323                .unwrap_or_else(|| {
324                    self.spec
325                        .environmentd_resource_requirements
326                        .as_ref()
327                        .and_then(|requirements| {
328                            requirements
329                                .requests
330                                .as_ref()
331                                .or(requirements.limits.as_ref())
332                        })
333                        // TODO: in cloud, we've been defaulting to twice the
334                        // memory limit, but k8s-openapi doesn't seem to
335                        // provide any way to parse Quantity values, so there
336                        // isn't an easy way to do arithmetic on it
337                        .and_then(|requirements| requirements.get("memory").cloned())
338                        // TODO: is there a better default to use here?
339                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
340                })
341        }
342
343        pub fn default_labels(&self) -> BTreeMap<String, String> {
344            BTreeMap::from_iter([
345                (
346                    "materialize.cloud/organization-name".to_owned(),
347                    self.name_unchecked(),
348                ),
349                (
350                    "materialize.cloud/organization-namespace".to_owned(),
351                    self.namespace(),
352                ),
353                (
354                    "materialize.cloud/mz-resource-id".to_owned(),
355                    self.resource_id().to_owned(),
356                ),
357            ])
358        }
359
360        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
361            format!(
362                "{}-{}-{}-0",
363                cloud_provider, region, self.spec.environment_id,
364            )
365        }
366
367        pub fn requested_reconciliation_id(&self) -> Uuid {
368            self.spec.request_rollout
369        }
370
371        pub fn in_place_rollout(&self) -> bool {
372            self.spec.in_place_rollout
373        }
374
375        pub fn rollout_requested(&self) -> bool {
376            self.requested_reconciliation_id()
377                != self
378                    .status
379                    .as_ref()
380                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
381        }
382
383        pub fn set_force_promote(&mut self) {
384            self.spec.force_promote = self.spec.request_rollout;
385        }
386
387        pub fn should_force_promote(&self) -> bool {
388            self.spec.force_promote == self.spec.request_rollout
389        }
390
391        pub fn conditions_need_update(&self) -> bool {
392            let Some(status) = self.status.as_ref() else {
393                return true;
394            };
395            if status.conditions.is_empty() {
396                return true;
397            }
398            for condition in &status.conditions {
399                if condition.observed_generation != self.meta().generation {
400                    return true;
401                }
402            }
403            false
404        }
405
406        pub fn update_in_progress(&self) -> bool {
407            let Some(status) = self.status.as_ref() else {
408                return false;
409            };
410            if status.conditions.is_empty() {
411                return false;
412            }
413            for condition in &status.conditions {
414                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
415                    return true;
416                }
417            }
418            false
419        }
420
421        /// Checks that the given version is greater than or equal
422        /// to the existing version, if the existing version
423        /// can be parsed.
424        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
425            let version = parse_image_ref(&self.spec.environmentd_image_ref);
426            match version {
427                Some(version) => &version >= minimum,
428                // In the rare case that we see an image reference
429                // that we can't parse, we assume that it satisfies all
430                // version checks. Usually these are custom images that have
431                // been by a developer on a branch forked from a recent copy
432                // of main, and so this works out reasonably well in practice.
433                None => {
434                    tracing::warn!(
435                        image_ref = %self.spec.environmentd_image_ref,
436                        "failed to parse image ref",
437                    );
438                    true
439                }
440            }
441        }
442
443        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
444            ObjectMeta {
445                namespace: Some(self.namespace()),
446                name: Some(name),
447                labels: Some(self.default_labels()),
448                owner_references: Some(vec![owner_reference(self)]),
449                ..Default::default()
450            }
451        }
452
453        pub fn status(&self) -> MaterializeStatus {
454            self.status.clone().unwrap_or_else(|| {
455                let mut status = MaterializeStatus::default();
456                // DNS-1035 names are supposed to be case insensitive,
457                // so we define our own character set, rather than use the
458                // built-in Alphanumeric distribution from rand, which
459                // includes both upper and lowercase letters.
460                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
461                status.resource_id = rand::thread_rng()
462                    .sample_iter(Uniform::new(0, CHARSET.len()))
463                    .take(10)
464                    .map(|i| char::from(CHARSET[i]))
465                    .collect();
466
467                // If we're creating the initial status on an un-soft-deleted
468                // Environment we need to ensure that the last active generation
469                // is restored, otherwise the env will crash loop indefinitely
470                // as its catalog would have durably recorded a greater generation
471                if let Some(last_active_generation) = self
472                    .annotations()
473                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
474                {
475                    status.active_generation = last_active_generation
476                        .parse()
477                        .expect("valid int generation");
478                }
479
480                status
481            })
482        }
483    }
484
485    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
486    #[serde(rename_all = "camelCase")]
487    pub struct MaterializeStatus {
488        pub resource_id: String,
489        pub active_generation: u64,
490        pub last_completed_rollout_request: Uuid,
491        pub resources_hash: String,
492        pub conditions: Vec<Condition>,
493    }
494
495    impl MaterializeStatus {
496        pub fn needs_update(&self, other: &Self) -> bool {
497            let now = chrono::offset::Utc::now();
498            let mut a = self.clone();
499            for condition in &mut a.conditions {
500                condition.last_transition_time = Time(now);
501            }
502            let mut b = other.clone();
503            for condition in &mut b.conditions {
504                condition.last_transition_time = Time(now);
505            }
506            a != b
507        }
508    }
509}
510
511fn parse_image_ref(image_ref: &str) -> Option<Version> {
512    image_ref
513        .rsplit_once(':')
514        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
515        .and_then(|tag| {
516            // To work around Docker tag restrictions, build metadata in
517            // a Docker tag is delimited by `--` rather than the SemVer
518            // `+` delimiter. So we need to swap the delimiter back to
519            // `+` before parsing it as SemVer.
520            let tag = tag.replace("--", "+");
521            Version::parse(&tag).ok()
522        })
523}
524
525fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
526    OwnerReference {
527        api_version: T::api_version(&()).to_string(),
528        kind: T::kind(&()).to_string(),
529        name: t.name_unchecked(),
530        uid: t.uid().unwrap(),
531        block_owner_deletion: Some(true),
532        ..Default::default()
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use kube::core::ObjectMeta;
539    use semver::Version;
540
541    use super::v1alpha1::{Materialize, MaterializeSpec};
542
543    #[mz_ore::test]
544    fn meets_minimum_version() {
545        let mut mz = Materialize {
546            spec: MaterializeSpec {
547                environmentd_image_ref:
548                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
549                        .to_owned(),
550                ..Default::default()
551            },
552            metadata: ObjectMeta {
553                ..Default::default()
554            },
555            status: None,
556        };
557
558        // true cases
559        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
560        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
561        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
562        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
563        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
564        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
565        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
566        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
567        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
568        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
569        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
570        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
571        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
572        mz.spec.environmentd_image_ref =
573            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
574                .to_owned();
575        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
576
577        // false cases
578        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
579        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
580        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
581        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
582        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
583        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
584        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
585        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
586    }
587}