mz_cloud_resources/crd/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::crd::generated::cert_manager::certificates::{
28    CertificateIssuerRef, CertificateSecretTemplate,
29};
30
31pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
32    "materialize.cloud/last-known-active-generation";
33
34pub mod v1alpha1 {
35
36    use super::*;
37
38    // This is intentionally a subset of the fields of a Certificate.
39    // We do not want customers to configure options that may conflict with
40    // things we override or expand in our code.
41    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
42    #[serde(rename_all = "camelCase")]
43    pub struct MaterializeCertSpec {
44        // Additional DNS names the cert will be valid for.
45        pub dns_names: Option<Vec<String>>,
46        // Duration the certificate will be requested for.
47        // Value must be in units accepted by Go time.ParseDuration
48        // https://golang.org/pkg/time/#ParseDuration.
49        pub duration: Option<String>,
50        // Duration before expiration the certificate will be renewed.
51        // Value must be in units accepted by Go time.ParseDuration
52        // https://golang.org/pkg/time/#ParseDuration.
53        pub renew_before: Option<String>,
54        // Reference to an Issuer or ClusterIssuer that will generate the certificate.
55        pub issuer_ref: Option<CertificateIssuerRef>,
56        // Additional annotations and labels to include in the Certificate object.
57        pub secret_template: Option<CertificateSecretTemplate>,
58    }
59
60    #[derive(
61        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
62    )]
63    #[serde(rename_all = "camelCase")]
64    #[kube(
65        namespaced,
66        group = "materialize.cloud",
67        version = "v1alpha1",
68        kind = "Materialize",
69        singular = "materialize",
70        plural = "materializes",
71        shortname = "mzs",
72        status = "MaterializeStatus",
73        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.imageRef", "priority": 1}"#,
74        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
75    )]
76    pub struct MaterializeSpec {
77        // The environmentd image to run
78        pub environmentd_image_ref: String,
79        // Extra args to pass to the environmentd binary
80        pub environmentd_extra_args: Option<Vec<String>>,
81        // Extra environment variables to pass to the environmentd binary
82        pub environmentd_extra_env: Option<Vec<EnvVar>>,
83        // If running in AWS, override the IAM role to use to give
84        // environmentd access to the persist S3 bucket
85        pub environmentd_iam_role_arn: Option<String>,
86        // If running in AWS, override the IAM role to use to support
87        // the CREATE CONNECTION feature
88        pub environmentd_connection_role_arn: Option<String>,
89        // Resource requirements for the environmentd pod
90        pub environmentd_resource_requirements: Option<ResourceRequirements>,
91        // Amount of disk to allocate, if a storage class is provided
92        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
93        // Resource requirements for the balancerd pod
94        pub balancerd_resource_requirements: Option<ResourceRequirements>,
95        // Resource requirements for the console pod
96        pub console_resource_requirements: Option<ResourceRequirements>,
97
98        // When changes are made to the environmentd resources (either via
99        // modifying fields in the spec here or by deploying a new
100        // orchestratord version which changes how resources are generated),
101        // existing environmentd processes won't be automatically restarted.
102        // In order to trigger a restart, the request_rollout field should be
103        // set to a new (random) value. Once the rollout completes, the value
104        // of status.last_completed_rollout_request will be set to this value
105        // to indicate completion.
106        //
107        // Defaults to a random value in order to ensure that the first
108        // generation rollout is automatically triggered.
109        #[serde(default = "Uuid::new_v4")]
110        pub request_rollout: Uuid,
111        // If force_promote is set to the same value as request_rollout, the
112        // current rollout will skip waiting for clusters in the new
113        // generation to rehydrate before promoting the new environmentd to
114        // leader.
115        #[serde(default)]
116        pub force_promote: Uuid,
117        // This value will be written to an annotation in the generated
118        // environmentd statefulset, in order to force the controller to
119        // detect the generated resources as changed even if no other changes
120        // happened. This can be used to force a rollout to a new generation
121        // even without making any meaningful changes.
122        #[serde(default)]
123        pub force_rollout: Uuid,
124        // If false (the default), orchestratord will use the leader
125        // promotion codepath to minimize downtime during rollouts. If true,
126        // it will just kill the environmentd pod directly.
127        #[serde(default)]
128        pub in_place_rollout: bool,
129        // The name of a secret containing metadata_backend_url and persist_backend_url.
130        pub backend_secret_name: String,
131
132        // The value used by environmentd (via the --environment-id flag) to
133        // uniquely identify this instance. Must be globally unique, and
134        // defaults to a random value.
135        // NOTE: This value MUST NOT be changed in an existing instance,
136        // since it affects things like the way data is stored in the persist
137        // backend.
138        // This is safe to be set via a default because the controller code
139        // runs an initial reconcile loop in order to set the finalizer on
140        // the resource before running any user code, and that initial loop
141        // will populate any defaults.
142        #[serde(default = "Uuid::new_v4")]
143        pub environment_id: Uuid,
144
145        // The configuration for generating an x509 certificate using cert-manager for balancerd
146        // to present to incoming connections.
147        // The dns_names and issuer_ref fields are required.
148        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
149        // The configuration for generating an x509 certificate using cert-manager for the console
150        // to present to incoming connections.
151        // The dns_names and issuer_ref fields are required.
152        // Not yet implemented.
153        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
154        // The cert-manager Issuer or ClusterIssuer to use for database internal communication.
155        // The issuer_ref field is required.
156        // This currently is only used for environmentd, but will eventually support clusterd.
157        pub internal_certificate_spec: Option<MaterializeCertSpec>,
158    }
159
160    impl Materialize {
161        pub fn backend_secret_name(&self) -> String {
162            self.spec.backend_secret_name.clone()
163        }
164
165        pub fn namespace(&self) -> String {
166            self.meta().namespace.clone().unwrap()
167        }
168
169        pub fn service_account_name(&self) -> String {
170            self.name_unchecked()
171        }
172
173        pub fn role_name(&self) -> String {
174            self.name_unchecked()
175        }
176
177        pub fn role_binding_name(&self) -> String {
178            self.name_unchecked()
179        }
180
181        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
182            self.name_prefixed(&format!("environmentd-{generation}"))
183        }
184
185        pub fn environmentd_app_name(&self) -> String {
186            "environmentd".to_owned()
187        }
188
189        pub fn environmentd_service_name(&self) -> String {
190            self.name_prefixed("environmentd")
191        }
192
193        pub fn environmentd_service_internal_fqdn(&self) -> String {
194            format!(
195                "{}.{}.svc.cluster.local",
196                self.environmentd_service_name(),
197                self.meta().namespace.as_ref().unwrap()
198            )
199        }
200
201        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
202            self.name_prefixed(&format!("environmentd-{generation}"))
203        }
204
205        pub fn balancerd_app_name(&self) -> String {
206            "balancerd".to_owned()
207        }
208
209        pub fn environmentd_certificate_name(&self) -> String {
210            self.name_prefixed("environmentd-external")
211        }
212
213        pub fn environmentd_certificate_secret_name(&self) -> String {
214            self.name_prefixed("environmentd-tls")
215        }
216
217        pub fn balancerd_deployment_name(&self) -> String {
218            self.name_prefixed("balancerd")
219        }
220
221        pub fn balancerd_service_name(&self) -> String {
222            self.name_prefixed("balancerd")
223        }
224
225        pub fn console_app_name(&self) -> String {
226            "console".to_owned()
227        }
228
229        pub fn balancerd_external_certificate_name(&self) -> String {
230            self.name_prefixed("balancerd-external")
231        }
232
233        pub fn balancerd_external_certificate_secret_name(&self) -> String {
234            self.name_prefixed("balancerd-external-tls")
235        }
236
237        pub fn console_deployment_name(&self) -> String {
238            self.name_prefixed("console")
239        }
240
241        pub fn console_service_name(&self) -> String {
242            self.name_prefixed("console")
243        }
244
245        pub fn console_external_certificate_name(&self) -> String {
246            self.name_prefixed("console-external")
247        }
248
249        pub fn console_external_certificate_secret_name(&self) -> String {
250            self.name_prefixed("console-external-tls")
251        }
252
253        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
254            self.name_prefixed(&format!("persist-pubsub-{generation}"))
255        }
256
257        pub fn name_prefixed(&self, suffix: &str) -> String {
258            format!("mz{}-{}", self.resource_id(), suffix)
259        }
260
261        pub fn resource_id(&self) -> &str {
262            &self.status.as_ref().unwrap().resource_id
263        }
264
265        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
266            self.spec
267                .environmentd_scratch_volume_storage_requirement
268                .clone()
269                .unwrap_or_else(|| {
270                    self.spec
271                        .environmentd_resource_requirements
272                        .as_ref()
273                        .and_then(|requirements| {
274                            requirements
275                                .requests
276                                .as_ref()
277                                .or(requirements.limits.as_ref())
278                        })
279                        // TODO: in cloud, we've been defaulting to twice the
280                        // memory limit, but k8s-openapi doesn't seem to
281                        // provide any way to parse Quantity values, so there
282                        // isn't an easy way to do arithmetic on it
283                        .and_then(|requirements| requirements.get("memory").cloned())
284                        // TODO: is there a better default to use here?
285                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
286                })
287        }
288
289        pub fn default_labels(&self) -> BTreeMap<String, String> {
290            BTreeMap::from_iter([
291                (
292                    "materialize.cloud/organization-name".to_owned(),
293                    self.name_unchecked(),
294                ),
295                (
296                    "materialize.cloud/organization-namespace".to_owned(),
297                    self.namespace(),
298                ),
299                (
300                    "materialize.cloud/mz-resource-id".to_owned(),
301                    self.resource_id().to_owned(),
302                ),
303            ])
304        }
305
306        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
307            format!(
308                "{}-{}-{}-0",
309                cloud_provider, region, self.spec.environment_id,
310            )
311        }
312
313        pub fn requested_reconciliation_id(&self) -> Uuid {
314            self.spec.request_rollout
315        }
316
317        pub fn in_place_rollout(&self) -> bool {
318            self.spec.in_place_rollout
319        }
320
321        pub fn rollout_requested(&self) -> bool {
322            self.requested_reconciliation_id()
323                != self
324                    .status
325                    .as_ref()
326                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
327        }
328
329        pub fn set_force_promote(&mut self) {
330            self.spec.force_promote = self.spec.request_rollout;
331        }
332
333        pub fn should_force_promote(&self) -> bool {
334            self.spec.force_promote == self.spec.request_rollout
335        }
336
337        pub fn conditions_need_update(&self) -> bool {
338            let Some(status) = self.status.as_ref() else {
339                return true;
340            };
341            if status.conditions.is_empty() {
342                return true;
343            }
344            for condition in &status.conditions {
345                if condition.observed_generation != self.meta().generation {
346                    return true;
347                }
348            }
349            false
350        }
351
352        pub fn update_in_progress(&self) -> bool {
353            let Some(status) = self.status.as_ref() else {
354                return false;
355            };
356            if status.conditions.is_empty() {
357                return false;
358            }
359            for condition in &status.conditions {
360                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
361                    return true;
362                }
363            }
364            false
365        }
366
367        /// Checks that the given version is greater than or equal
368        /// to the existing version, if the existing version
369        /// can be parsed.
370        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
371            let version = parse_image_ref(&self.spec.environmentd_image_ref);
372            match version {
373                Some(version) => &version >= minimum,
374                // In the rare case that we see an image reference
375                // that we can't parse, we assume that it satisfies all
376                // version checks. Usually these are custom images that have
377                // been by a developer on a branch forked from a recent copy
378                // of main, and so this works out reasonably well in practice.
379                None => true,
380            }
381        }
382
383        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
384            ObjectMeta {
385                namespace: Some(self.namespace()),
386                name: Some(name),
387                labels: Some(self.default_labels()),
388                owner_references: Some(vec![owner_reference(self)]),
389                ..Default::default()
390            }
391        }
392
393        pub fn status(&self) -> MaterializeStatus {
394            self.status.clone().unwrap_or_else(|| {
395                let mut status = MaterializeStatus::default();
396                // DNS-1035 names are supposed to be case insensitive,
397                // so we define our own character set, rather than use the
398                // built-in Alphanumeric distribution from rand, which
399                // includes both upper and lowercase letters.
400                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
401                status.resource_id = rand::thread_rng()
402                    .sample_iter(Uniform::new(0, CHARSET.len()))
403                    .take(10)
404                    .map(|i| char::from(CHARSET[i]))
405                    .collect();
406
407                // If we're creating the initial status on an un-soft-deleted
408                // Environment we need to ensure that the last active generation
409                // is restored, otherwise the env will crash loop indefinitely
410                // as its catalog would have durably recorded a greater generation
411                if let Some(last_active_generation) = self
412                    .annotations()
413                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
414                {
415                    status.active_generation = last_active_generation
416                        .parse()
417                        .expect("valid int generation");
418                }
419
420                status
421            })
422        }
423    }
424
425    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
426    #[serde(rename_all = "camelCase")]
427    pub struct MaterializeStatus {
428        pub resource_id: String,
429        pub active_generation: u64,
430        pub last_completed_rollout_request: Uuid,
431        pub resources_hash: String,
432        pub conditions: Vec<Condition>,
433    }
434
435    impl MaterializeStatus {
436        pub fn needs_update(&self, other: &Self) -> bool {
437            let now = chrono::offset::Utc::now();
438            let mut a = self.clone();
439            for condition in &mut a.conditions {
440                condition.last_transition_time = Time(now);
441            }
442            let mut b = other.clone();
443            for condition in &mut b.conditions {
444                condition.last_transition_time = Time(now);
445            }
446            a != b
447        }
448    }
449}
450
451fn parse_image_ref(image_ref: &str) -> Option<Version> {
452    image_ref
453        .rsplit_once(':')
454        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
455        .and_then(|tag| {
456            // To work around Docker tag restrictions, build metadata in
457            // a Docker tag is delimited by `--` rather than the SemVer
458            // `+` delimiter. So we need to swap the delimiter back to
459            // `+` before parsing it as SemVer.
460            let tag = tag.replace("--", "+");
461            Version::parse(&tag).ok()
462        })
463}
464
465fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
466    OwnerReference {
467        api_version: T::api_version(&()).to_string(),
468        kind: T::kind(&()).to_string(),
469        name: t.name_unchecked(),
470        uid: t.uid().unwrap(),
471        block_owner_deletion: Some(true),
472        ..Default::default()
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use kube::core::ObjectMeta;
479    use semver::Version;
480
481    use super::v1alpha1::{Materialize, MaterializeSpec};
482
483    #[mz_ore::test]
484    fn meets_minimum_version() {
485        let mut mz = Materialize {
486            spec: MaterializeSpec {
487                environmentd_image_ref:
488                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
489                        .to_owned(),
490                ..Default::default()
491            },
492            metadata: ObjectMeta {
493                ..Default::default()
494            },
495            status: None,
496        };
497
498        // true cases
499        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
500        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
501        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
502        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
503        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
504        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
505        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
506        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
507        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
508        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
509        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
510        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
511        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
512
513        // false cases
514        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
515        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
516        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
517        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
518        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
519        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
520        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
521        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
522    }
523}