1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::core::v1::{EnvVar, ResourceRequirements},
14    apimachinery::pkg::{
15        api::resource::Quantity,
16        apis::meta::v1::{Condition, OwnerReference, Time},
17    },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30    CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34    "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38    use super::*;
39
40    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44    #[serde(rename_all = "camelCase")]
45    pub struct MaterializeCertSpec {
46        pub dns_names: Option<Vec<String>>,
48        pub duration: Option<String>,
52        pub renew_before: Option<String>,
56        pub issuer_ref: Option<CertificateIssuerRef>,
58        pub secret_template: Option<CertificateSecretTemplate>,
60    }
61    #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62    pub enum MaterializeRolloutStrategy {
63        #[default]
67        WaitUntilReady,
68
69        ImmediatelyPromoteCausingDowntime,
80    }
81
82    #[derive(
83        CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84    )]
85    #[serde(rename_all = "camelCase")]
86    #[kube(
87        namespaced,
88        group = "materialize.cloud",
89        version = "v1alpha1",
90        kind = "Materialize",
91        singular = "materialize",
92        plural = "materializes",
93        shortname = "mzs",
94        status = "MaterializeStatus",
95        printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96        printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97    )]
98    pub struct MaterializeSpec {
99        pub environmentd_image_ref: String,
101        pub environmentd_extra_args: Option<Vec<String>>,
103        pub environmentd_extra_env: Option<Vec<EnvVar>>,
105        pub environmentd_iam_role_arn: Option<String>,
111        pub environmentd_connection_role_arn: Option<String>,
114        pub environmentd_resource_requirements: Option<ResourceRequirements>,
116        pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
118        pub balancerd_resource_requirements: Option<ResourceRequirements>,
120        pub console_resource_requirements: Option<ResourceRequirements>,
122        pub balancerd_replicas: Option<i32>,
124        pub console_replicas: Option<i32>,
126
127        pub service_account_name: Option<String>,
130        pub service_account_annotations: Option<BTreeMap<String, String>>,
137        pub service_account_labels: Option<BTreeMap<String, String>>,
139        pub pod_annotations: Option<BTreeMap<String, String>>,
141        pub pod_labels: Option<BTreeMap<String, String>>,
143
144        #[serde(default)]
156        pub request_rollout: Uuid,
157        #[serde(default)]
162        pub force_promote: Uuid,
163        #[serde(default)]
169        pub force_rollout: Uuid,
170        #[serde(default)]
172        pub in_place_rollout: bool,
173        #[serde(default)]
175        pub rollout_strategy: MaterializeRolloutStrategy,
176        pub backend_secret_name: String,
180        #[serde(default)]
183        pub authenticator_kind: AuthenticatorKind,
184        #[serde(default)]
186        pub enable_rbac: bool,
187
188        #[serde(default)]
195        pub environment_id: Uuid,
196
197        pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
201        pub console_external_certificate_spec: Option<MaterializeCertSpec>,
206        pub internal_certificate_spec: Option<MaterializeCertSpec>,
210    }
211
212    impl Materialize {
213        pub fn backend_secret_name(&self) -> String {
214            self.spec.backend_secret_name.clone()
215        }
216
217        pub fn namespace(&self) -> String {
218            self.meta().namespace.clone().unwrap()
219        }
220
221        pub fn create_service_account(&self) -> bool {
222            self.spec.service_account_name.is_none()
223        }
224
225        pub fn service_account_name(&self) -> String {
226            self.spec
227                .service_account_name
228                .clone()
229                .unwrap_or_else(|| self.name_unchecked())
230        }
231
232        pub fn role_name(&self) -> String {
233            self.name_unchecked()
234        }
235
236        pub fn role_binding_name(&self) -> String {
237            self.name_unchecked()
238        }
239
240        pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
241            self.name_prefixed(&format!("environmentd-{generation}"))
242        }
243
244        pub fn environmentd_app_name(&self) -> String {
245            "environmentd".to_owned()
246        }
247
248        pub fn environmentd_service_name(&self) -> String {
249            self.name_prefixed("environmentd")
250        }
251
252        pub fn environmentd_service_internal_fqdn(&self) -> String {
253            format!(
254                "{}.{}.svc.cluster.local",
255                self.environmentd_service_name(),
256                self.meta().namespace.as_ref().unwrap()
257            )
258        }
259
260        pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
261            self.name_prefixed(&format!("environmentd-{generation}"))
262        }
263
264        pub fn balancerd_app_name(&self) -> String {
265            "balancerd".to_owned()
266        }
267
268        pub fn environmentd_certificate_name(&self) -> String {
269            self.name_prefixed("environmentd-external")
270        }
271
272        pub fn environmentd_certificate_secret_name(&self) -> String {
273            self.name_prefixed("environmentd-tls")
274        }
275
276        pub fn balancerd_deployment_name(&self) -> String {
277            self.name_prefixed("balancerd")
278        }
279
280        pub fn balancerd_service_name(&self) -> String {
281            self.name_prefixed("balancerd")
282        }
283
284        pub fn console_app_name(&self) -> String {
285            "console".to_owned()
286        }
287
288        pub fn balancerd_external_certificate_name(&self) -> String {
289            self.name_prefixed("balancerd-external")
290        }
291
292        pub fn balancerd_external_certificate_secret_name(&self) -> String {
293            self.name_prefixed("balancerd-external-tls")
294        }
295
296        pub fn balancerd_replicas(&self) -> i32 {
297            self.spec.balancerd_replicas.unwrap_or(2)
298        }
299
300        pub fn console_replicas(&self) -> i32 {
301            self.spec.console_replicas.unwrap_or(2)
302        }
303
304        pub fn console_configmap_name(&self) -> String {
305            self.name_prefixed("console")
306        }
307
308        pub fn console_deployment_name(&self) -> String {
309            self.name_prefixed("console")
310        }
311
312        pub fn console_service_name(&self) -> String {
313            self.name_prefixed("console")
314        }
315
316        pub fn console_external_certificate_name(&self) -> String {
317            self.name_prefixed("console-external")
318        }
319
320        pub fn console_external_certificate_secret_name(&self) -> String {
321            self.name_prefixed("console-external-tls")
322        }
323
324        pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
325            self.name_prefixed(&format!("persist-pubsub-{generation}"))
326        }
327
328        pub fn listeners_configmap_name(&self, generation: u64) -> String {
329            self.name_prefixed(&format!("listeners-{generation}"))
330        }
331
332        pub fn name_prefixed(&self, suffix: &str) -> String {
333            format!("mz{}-{}", self.resource_id(), suffix)
334        }
335
336        pub fn resource_id(&self) -> &str {
337            &self.status.as_ref().unwrap().resource_id
338        }
339
340        pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
341            self.spec
342                .environmentd_scratch_volume_storage_requirement
343                .clone()
344                .unwrap_or_else(|| {
345                    self.spec
346                        .environmentd_resource_requirements
347                        .as_ref()
348                        .and_then(|requirements| {
349                            requirements
350                                .requests
351                                .as_ref()
352                                .or(requirements.limits.as_ref())
353                        })
354                        .and_then(|requirements| requirements.get("memory").cloned())
359                        .unwrap_or_else(|| Quantity("4096Mi".to_string()))
361                })
362        }
363
364        pub fn default_labels(&self) -> BTreeMap<String, String> {
365            BTreeMap::from_iter([
366                (
367                    "materialize.cloud/organization-name".to_owned(),
368                    self.name_unchecked(),
369                ),
370                (
371                    "materialize.cloud/organization-namespace".to_owned(),
372                    self.namespace(),
373                ),
374                (
375                    "materialize.cloud/mz-resource-id".to_owned(),
376                    self.resource_id().to_owned(),
377                ),
378            ])
379        }
380
381        pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
382            format!(
383                "{}-{}-{}-0",
384                cloud_provider, region, self.spec.environment_id,
385            )
386        }
387
388        pub fn requested_reconciliation_id(&self) -> Uuid {
389            self.spec.request_rollout
390        }
391
392        pub fn rollout_requested(&self) -> bool {
393            self.requested_reconciliation_id()
394                != self
395                    .status
396                    .as_ref()
397                    .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
398        }
399
400        pub fn set_force_promote(&mut self) {
401            self.spec.force_promote = self.spec.request_rollout;
402        }
403
404        pub fn should_force_promote(&self) -> bool {
405            self.spec.force_promote == self.spec.request_rollout
406                || self.spec.rollout_strategy
407                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
408        }
409
410        pub fn conditions_need_update(&self) -> bool {
411            let Some(status) = self.status.as_ref() else {
412                return true;
413            };
414            if status.conditions.is_empty() {
415                return true;
416            }
417            for condition in &status.conditions {
418                if condition.observed_generation != self.meta().generation {
419                    return true;
420                }
421            }
422            false
423        }
424
425        pub fn is_promoting(&self) -> bool {
426            let Some(status) = self.status.as_ref() else {
427                return false;
428            };
429            if status.conditions.is_empty() {
430                return false;
431            }
432            status
433                .conditions
434                .iter()
435                .any(|condition| condition.reason == "Promoting")
436        }
437
438        pub fn update_in_progress(&self) -> bool {
439            let Some(status) = self.status.as_ref() else {
440                return false;
441            };
442            if status.conditions.is_empty() {
443                return false;
444            }
445            for condition in &status.conditions {
446                if condition.type_ == "UpToDate" && condition.status == "Unknown" {
447                    return true;
448                }
449            }
450            false
451        }
452
453        pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
457            let version = parse_image_ref(&self.spec.environmentd_image_ref);
458            match version {
459                Some(version) => &version >= minimum,
460                None => {
466                    tracing::warn!(
467                        image_ref = %self.spec.environmentd_image_ref,
468                        "failed to parse image ref",
469                    );
470                    true
471                }
472            }
473        }
474
475        pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
476            ObjectMeta {
477                namespace: Some(self.namespace()),
478                name: Some(name),
479                labels: Some(self.default_labels()),
480                owner_references: Some(vec![owner_reference(self)]),
481                ..Default::default()
482            }
483        }
484
485        pub fn status(&self) -> MaterializeStatus {
486            self.status.clone().unwrap_or_else(|| {
487                let mut status = MaterializeStatus::default();
488                const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
493                status.resource_id = rand::thread_rng()
494                    .sample_iter(Uniform::new(0, CHARSET.len()))
495                    .take(10)
496                    .map(|i| char::from(CHARSET[i]))
497                    .collect();
498
499                if let Some(last_active_generation) = self
504                    .annotations()
505                    .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
506                {
507                    status.active_generation = last_active_generation
508                        .parse()
509                        .expect("valid int generation");
510                }
511
512                status
513            })
514        }
515    }
516
517    #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
518    #[serde(rename_all = "camelCase")]
519    pub struct MaterializeStatus {
520        pub resource_id: String,
521        pub active_generation: u64,
522        pub last_completed_rollout_request: Uuid,
523        pub resources_hash: String,
524        pub conditions: Vec<Condition>,
525    }
526
527    impl MaterializeStatus {
528        pub fn needs_update(&self, other: &Self) -> bool {
529            let now = chrono::offset::Utc::now();
530            let mut a = self.clone();
531            for condition in &mut a.conditions {
532                condition.last_transition_time = Time(now);
533            }
534            let mut b = other.clone();
535            for condition in &mut b.conditions {
536                condition.last_transition_time = Time(now);
537            }
538            a != b
539        }
540    }
541}
542
543fn parse_image_ref(image_ref: &str) -> Option<Version> {
544    image_ref
545        .rsplit_once(':')
546        .and_then(|(_repo, tag)| tag.strip_prefix('v'))
547        .and_then(|tag| {
548            let tag = tag.replace("--", "+");
553            Version::parse(&tag).ok()
554        })
555}
556
557fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
558    OwnerReference {
559        api_version: T::api_version(&()).to_string(),
560        kind: T::kind(&()).to_string(),
561        name: t.name_unchecked(),
562        uid: t.uid().unwrap(),
563        block_owner_deletion: Some(true),
564        ..Default::default()
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use kube::core::ObjectMeta;
571    use semver::Version;
572
573    use super::v1alpha1::{Materialize, MaterializeSpec};
574
575    #[mz_ore::test]
576    fn meets_minimum_version() {
577        let mut mz = Materialize {
578            spec: MaterializeSpec {
579                environmentd_image_ref:
580                    "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
581                        .to_owned(),
582                ..Default::default()
583            },
584            metadata: ObjectMeta {
585                ..Default::default()
586            },
587            status: None,
588        };
589
590        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
592        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
593        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
594        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
595        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
596        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
597        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
598        mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
599        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
600        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
601        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
602        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
603        assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
604        mz.spec.environmentd_image_ref =
605            "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
606                .to_owned();
607        assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
608
609        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
611        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
612        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
613        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
614        mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
615        assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
616        mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
617        assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
618    }
619}