mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    // This is intentionally a subset of the fields of a Certificate.
41    // We do not want customers to configure options that may conflict with
42    // things we override or expand in our code.
43    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        // Additional DNS names the cert will be valid for.
47        pub dns_names: Option<Vec<String>>,
48        // Duration the certificate will be requested for.
49        // Value must be in units accepted by Go time.ParseDuration
50        // https://golang.org/pkg/time/#ParseDuration.
51        pub duration: Option<String>,
52        // Duration before expiration the certificate will be renewed.
53        // Value must be in units accepted by Go time.ParseDuration
54        // https://golang.org/pkg/time/#ParseDuration.
55        pub renew_before: Option<String>,
56        // Reference to an Issuer or ClusterIssuer that will generate the certificate.
57        pub issuer_ref: Option<CertificateIssuerRef>,
58        // Additional annotations and labels to include in the Certificate object.
59        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61
62    #[derive(
63        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
64    )]
65    #[serde(rename_all = "camelCase")]
66    #[kube(
67        namespaced,
68        group = "materialize.cloud",
69        version = "v1alpha1",
70        kind = "Materialize",
71        singular = "materialize",
72        plural = "materializes",
73        shortname = "mzs",
74        status = "MaterializeStatus",
75        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
76        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
77    )]
78    pub struct MaterializeSpec {
79        // The environmentd image to run
80        pub environmentd_image_ref: String,
81        // Extra args to pass to the environmentd binary
82        pub environmentd_extra_args: Option<Vec<String>>,
83        // Extra environment variables to pass to the environmentd binary
84        pub environmentd_extra_env: Option<Vec<EnvVar>>,
85        // DEPRECATED
86        // If running in AWS, override the IAM role to use to give
87        // environmentd access to the persist S3 bucket.
88        // DEPRECATED
89        // Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
90        pub environmentd_iam_role_arn: Option<String>,
91        // If running in AWS, override the IAM role to use to support
92        // the CREATE CONNECTION feature
93        pub environmentd_connection_role_arn: Option<String>,
94        // Resource requirements for the environmentd pod
95        pub environmentd_resource_requirements: Option<ResourceRequirements>,
96        // Amount of disk to allocate, if a storage class is provided
97        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
98        // Resource requirements for the balancerd pod
99        pub balancerd_resource_requirements: Option<ResourceRequirements>,
100        // Resource requirements for the console pod
101        pub console_resource_requirements: Option<ResourceRequirements>,
102
103        // Name of the kubernetes service account to use.
104        // If not set, we will create one with the same name as this Materialize object.
105        pub service_account_name: Option<String>,
106        // Annotations to apply to the service account
107        //
108        // Annotations on service accounts are commonly used by cloud providers for IAM.
109        // AWS uses "eks.amazonaws.com/role-arn".
110        // Azure uses "azure.workload.identity/client-id", but
111        // additionally requires "azure.workload.identity/use": "true" on the pods.
112        pub service_account_annotations: Option<BTreeMap<String, String>>,
113        // Labels to apply to the service account
114        pub service_account_labels: Option<BTreeMap<String, String>>,
115        // Annotations to apply to the pods
116        pub pod_annotations: Option<BTreeMap<String, String>>,
117        // Labels to apply to the pods
118        pub pod_labels: Option<BTreeMap<String, String>>,
119
120        // When changes are made to the environmentd resources (either via
121        // modifying fields in the spec here or by deploying a new
122        // orchestratord version which changes how resources are generated),
123        // existing environmentd processes won't be automatically restarted.
124        // In order to trigger a restart, the request_rollout field should be
125        // set to a new (random) value. Once the rollout completes, the value
126        // of status.last_completed_rollout_request will be set to this value
127        // to indicate completion.
128        //
129        // Defaults to a random value in order to ensure that the first
130        // generation rollout is automatically triggered.
131        #[serde(default)]
132        pub request_rollout: Uuid,
133        // If force_promote is set to the same value as request_rollout, the
134        // current rollout will skip waiting for clusters in the new
135        // generation to rehydrate before promoting the new environmentd to
136        // leader.
137        #[serde(default)]
138        pub force_promote: Uuid,
139        // This value will be written to an annotation in the generated
140        // environmentd statefulset, in order to force the controller to
141        // detect the generated resources as changed even if no other changes
142        // happened. This can be used to force a rollout to a new generation
143        // even without making any meaningful changes.
144        #[serde(default)]
145        pub force_rollout: Uuid,
146        // If false (the default), orchestratord will use the leader
147        // promotion codepath to minimize downtime during rollouts. If true,
148        // it will just kill the environmentd pod directly.
149        #[serde(default)]
150        pub in_place_rollout: bool,
151        // The name of a secret containing metadata_backend_url and persist_backend_url.
152        // It may also contain external_login_password_mz_system, which will be used as
153        // the password for the mz_system user if authenticator_kind is Password.
154        pub backend_secret_name: String,
155        // How to authenticate with Materialize. Valid options are Password and None.
156        // If set to Password, the backend secret must contain external_login_password_mz_system.
157        #[serde(default)]
158        pub authenticator_kind: AuthenticatorKind,
159        // Whether to enable role based access control. Defaults to false.
160        #[serde(default)]
161        pub enable_rbac: bool,
162
163        // The value used by environmentd (via the --environment-id flag) to
164        // uniquely identify this instance. Must be globally unique, and
165        // defaults to a random value.
166        // NOTE: This value MUST NOT be changed in an existing instance,
167        // since it affects things like the way data is stored in the persist
168        // backend.
169        #[serde(default)]
170        pub environment_id: Uuid,
171
172        // The configuration for generating an x509 certificate using cert-manager for balancerd
173        // to present to incoming connections.
174        // The dns_names and issuer_ref fields are required.
175        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
176        // The configuration for generating an x509 certificate using cert-manager for the console
177        // to present to incoming connections.
178        // The dns_names and issuer_ref fields are required.
179        // Not yet implemented.
180        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
181        // The cert-manager Issuer or ClusterIssuer to use for database internal communication.
182        // The issuer_ref field is required.
183        // This currently is only used for environmentd, but will eventually support clusterd.
184        pub internal_certificate_spec: Option<MaterializeCertSpec>,
185    }
186
187    impl Materialize {
188        pub fn backend_secret_name(&self) -> String {
189            self.spec.backend_secret_name.clone()
190        }
191
192        pub fn namespace(&self) -> String {
193            self.meta().namespace.clone().unwrap()
194        }
195
196        pub fn create_service_account(&self) -> bool {
197            self.spec.service_account_name.is_none()
198        }
199
200        pub fn service_account_name(&self) -> String {
201            self.spec
202                .service_account_name
203                .clone()
204                .unwrap_or_else(|| self.name_unchecked())
205        }
206
207        pub fn role_name(&self) -> String {
208            self.name_unchecked()
209        }
210
211        pub fn role_binding_name(&self) -> String {
212            self.name_unchecked()
213        }
214
215        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
216            self.name_prefixed(&format!("environmentd-{generation}"))
217        }
218
219        pub fn environmentd_app_name(&self) -> String {
220            "environmentd".to_owned()
221        }
222
223        pub fn environmentd_service_name(&self) -> String {
224            self.name_prefixed("environmentd")
225        }
226
227        pub fn environmentd_service_internal_fqdn(&self) -> String {
228            format!(
229                "{}.{}.svc.cluster.local",
230                self.environmentd_service_name(),
231                self.meta().namespace.as_ref().unwrap()
232            )
233        }
234
235        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
236            self.name_prefixed(&format!("environmentd-{generation}"))
237        }
238
239        pub fn balancerd_app_name(&self) -> String {
240            "balancerd".to_owned()
241        }
242
243        pub fn environmentd_certificate_name(&self) -> String {
244            self.name_prefixed("environmentd-external")
245        }
246
247        pub fn environmentd_certificate_secret_name(&self) -> String {
248            self.name_prefixed("environmentd-tls")
249        }
250
251        pub fn balancerd_deployment_name(&self) -> String {
252            self.name_prefixed("balancerd")
253        }
254
255        pub fn balancerd_service_name(&self) -> String {
256            self.name_prefixed("balancerd")
257        }
258
259        pub fn console_app_name(&self) -> String {
260            "console".to_owned()
261        }
262
263        pub fn balancerd_external_certificate_name(&self) -> String {
264            self.name_prefixed("balancerd-external")
265        }
266
267        pub fn balancerd_external_certificate_secret_name(&self) -> String {
268            self.name_prefixed("balancerd-external-tls")
269        }
270
271        pub fn console_configmap_name(&self) -> String {
272            self.name_prefixed("console")
273        }
274
275        pub fn console_deployment_name(&self) -> String {
276            self.name_prefixed("console")
277        }
278
279        pub fn console_service_name(&self) -> String {
280            self.name_prefixed("console")
281        }
282
283        pub fn console_external_certificate_name(&self) -> String {
284            self.name_prefixed("console-external")
285        }
286
287        pub fn console_external_certificate_secret_name(&self) -> String {
288            self.name_prefixed("console-external-tls")
289        }
290
291        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
292            self.name_prefixed(&format!("persist-pubsub-{generation}"))
293        }
294
295        pub fn listeners_configmap_name(&self, generation: u64) -> String {
296            self.name_prefixed(&format!("listeners-{generation}"))
297        }
298
299        pub fn name_prefixed(&self, suffix: &str) -> String {
300            format!("mz{}-{}", self.resource_id(), suffix)
301        }
302
303        pub fn resource_id(&self) -> &str {
304            &self.status.as_ref().unwrap().resource_id
305        }
306
307        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
308            self.spec
309                .environmentd_scratch_volume_storage_requirement
310                .clone()
311                .unwrap_or_else(|| {
312                    self.spec
313                        .environmentd_resource_requirements
314                        .as_ref()
315                        .and_then(|requirements| {
316                            requirements
317                                .requests
318                                .as_ref()
319                                .or(requirements.limits.as_ref())
320                        })
321                        // TODO: in cloud, we've been defaulting to twice the
322                        // memory limit, but k8s-openapi doesn't seem to
323                        // provide any way to parse Quantity values, so there
324                        // isn't an easy way to do arithmetic on it
325                        .and_then(|requirements| requirements.get("memory").cloned())
326                        // TODO: is there a better default to use here?
327                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
328                })
329        }
330
331        pub fn default_labels(&self) -> BTreeMap<String, String> {
332            BTreeMap::from_iter([
333                (
334                    "materialize.cloud/organization-name".to_owned(),
335                    self.name_unchecked(),
336                ),
337                (
338                    "materialize.cloud/organization-namespace".to_owned(),
339                    self.namespace(),
340                ),
341                (
342                    "materialize.cloud/mz-resource-id".to_owned(),
343                    self.resource_id().to_owned(),
344                ),
345            ])
346        }
347
348        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
349            format!(
350                "{}-{}-{}-0",
351                cloud_provider, region, self.spec.environment_id,
352            )
353        }
354
355        pub fn requested_reconciliation_id(&self) -> Uuid {
356            self.spec.request_rollout
357        }
358
359        pub fn in_place_rollout(&self) -> bool {
360            self.spec.in_place_rollout
361        }
362
363        pub fn rollout_requested(&self) -> bool {
364            self.requested_reconciliation_id()
365                != self
366                    .status
367                    .as_ref()
368                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
369        }
370
371        pub fn set_force_promote(&mut self) {
372            self.spec.force_promote = self.spec.request_rollout;
373        }
374
375        pub fn should_force_promote(&self) -> bool {
376            self.spec.force_promote == self.spec.request_rollout
377        }
378
379        pub fn conditions_need_update(&self) -> bool {
380            let Some(status) = self.status.as_ref() else {
381                return true;
382            };
383            if status.conditions.is_empty() {
384                return true;
385            }
386            for condition in &status.conditions {
387                if condition.observed_generation != self.meta().generation {
388                    return true;
389                }
390            }
391            false
392        }
393
394        pub fn update_in_progress(&self) -> bool {
395            let Some(status) = self.status.as_ref() else {
396                return false;
397            };
398            if status.conditions.is_empty() {
399                return false;
400            }
401            for condition in &status.conditions {
402                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
403                    return true;
404                }
405            }
406            false
407        }
408
409        /// Checks that the given version is greater than or equal
410        /// to the existing version, if the existing version
411        /// can be parsed.
412        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
413            let version = parse_image_ref(&self.spec.environmentd_image_ref);
414            match version {
415                Some(version) => &version >= minimum,
416                // In the rare case that we see an image reference
417                // that we can't parse, we assume that it satisfies all
418                // version checks. Usually these are custom images that have
419                // been by a developer on a branch forked from a recent copy
420                // of main, and so this works out reasonably well in practice.
421                None => {
422                    tracing::warn!(
423                        image_ref = %self.spec.environmentd_image_ref,
424                        "failed to parse image ref",
425                    );
426                    true
427                }
428            }
429        }
430
431        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
432            ObjectMeta {
433                namespace: Some(self.namespace()),
434                name: Some(name),
435                labels: Some(self.default_labels()),
436                owner_references: Some(vec![owner_reference(self)]),
437                ..Default::default()
438            }
439        }
440
441        pub fn status(&self) -> MaterializeStatus {
442            self.status.clone().unwrap_or_else(|| {
443                let mut status = MaterializeStatus::default();
444                // DNS-1035 names are supposed to be case insensitive,
445                // so we define our own character set, rather than use the
446                // built-in Alphanumeric distribution from rand, which
447                // includes both upper and lowercase letters.
448                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
449                status.resource_id = rand::thread_rng()
450                    .sample_iter(Uniform::new(0, CHARSET.len()))
451                    .take(10)
452                    .map(|i| char::from(CHARSET[i]))
453                    .collect();
454
455                // If we're creating the initial status on an un-soft-deleted
456                // Environment we need to ensure that the last active generation
457                // is restored, otherwise the env will crash loop indefinitely
458                // as its catalog would have durably recorded a greater generation
459                if let Some(last_active_generation) = self
460                    .annotations()
461                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
462                {
463                    status.active_generation = last_active_generation
464                        .parse()
465                        .expect("valid int generation");
466                }
467
468                status
469            })
470        }
471    }
472
473    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
474    #[serde(rename_all = "camelCase")]
475    pub struct MaterializeStatus {
476        pub resource_id: String,
477        pub active_generation: u64,
478        pub last_completed_rollout_request: Uuid,
479        pub resources_hash: String,
480        pub conditions: Vec<Condition>,
481    }
482
483    impl MaterializeStatus {
484        pub fn needs_update(&self, other: &Self) -> bool {
485            let now = chrono::offset::Utc::now();
486            let mut a = self.clone();
487            for condition in &mut a.conditions {
488                condition.last_transition_time = Time(now);
489            }
490            let mut b = other.clone();
491            for condition in &mut b.conditions {
492                condition.last_transition_time = Time(now);
493            }
494            a != b
495        }
496    }
497}
498
499fn parse_image_ref(image_ref: &str) -> Option<Version> {
500    image_ref
501        .rsplit_once(':')
502        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
503        .and_then(|tag| {
504            // To work around Docker tag restrictions, build metadata in
505            // a Docker tag is delimited by `--` rather than the SemVer
506            // `+` delimiter. So we need to swap the delimiter back to
507            // `+` before parsing it as SemVer.
508            let tag = tag.replace("--", "+");
509            Version::parse(&tag).ok()
510        })
511}
512
513fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
514    OwnerReference {
515        api_version: T::api_version(&()).to_string(),
516        kind: T::kind(&()).to_string(),
517        name: t.name_unchecked(),
518        uid: t.uid().unwrap(),
519        block_owner_deletion: Some(true),
520        ..Default::default()
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use kube::core::ObjectMeta;
527    use semver::Version;
528
529    use super::v1alpha1::{Materialize, MaterializeSpec};
530
531    #[mz_ore::test]
532    fn meets_minimum_version() {
533        let mut mz = Materialize {
534            spec: MaterializeSpec {
535                environmentd_image_ref:
536                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
537                        .to_owned(),
538                ..Default::default()
539            },
540            metadata: ObjectMeta {
541                ..Default::default()
542            },
543            status: None,
544        };
545
546        // true cases
547        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
548        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
549        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
550        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
551        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
552        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
553        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
554        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
555        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
556        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
557        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
558        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
559        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
560        mz.spec.environmentd_image_ref =
561            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
562                .to_owned();
563        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
564
565        // false cases
566        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
567        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
568        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
569        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
570        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
571        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
572        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
573        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
574    }
575}