1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distr::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34 "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38 use super::*;
39
40 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44 #[serde(rename_all = "camelCase")]
45 pub struct MaterializeCertSpec {
46 pub dns_names: Option<Vec<String>>,
48 pub duration: Option<String>,
52 pub renew_before: Option<String>,
56 pub issuer_ref: Option<CertificateIssuerRef>,
58 pub secret_template: Option<CertificateSecretTemplate>,
60 }
61 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62 pub enum MaterializeRolloutStrategy {
63 #[default]
67 WaitUntilReady,
68
69 ImmediatelyPromoteCausingDowntime,
80 }
81
82 #[derive(
83 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84 )]
85 #[serde(rename_all = "camelCase")]
86 #[kube(
87 namespaced,
88 group = "materialize.cloud",
89 version = "v1alpha1",
90 kind = "Materialize",
91 singular = "materialize",
92 plural = "materializes",
93 shortname = "mzs",
94 status = "MaterializeStatus",
95 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97 )]
98 pub struct MaterializeSpec {
99 pub environmentd_image_ref: String,
101 pub environmentd_extra_args: Option<Vec<String>>,
103 pub environmentd_extra_env: Option<Vec<EnvVar>>,
105 #[kube(deprecated)]
114 pub environmentd_iam_role_arn: Option<String>,
115 pub environmentd_connection_role_arn: Option<String>,
118 pub environmentd_resource_requirements: Option<ResourceRequirements>,
120 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
122 pub balancerd_resource_requirements: Option<ResourceRequirements>,
124 pub console_resource_requirements: Option<ResourceRequirements>,
126 pub balancerd_replicas: Option<i32>,
128 pub console_replicas: Option<i32>,
130
131 pub service_account_name: Option<String>,
134 pub service_account_annotations: Option<BTreeMap<String, String>>,
141 pub service_account_labels: Option<BTreeMap<String, String>>,
143 pub pod_annotations: Option<BTreeMap<String, String>>,
145 pub pod_labels: Option<BTreeMap<String, String>>,
147
148 #[serde(default)]
160 pub request_rollout: Uuid,
161 #[serde(default)]
166 pub force_promote: Uuid,
167 #[serde(default)]
174 pub force_rollout: Uuid,
175 #[kube(deprecated)]
179 #[serde(default)]
180 pub in_place_rollout: bool,
181 #[serde(default)]
183 pub rollout_strategy: MaterializeRolloutStrategy,
184 pub backend_secret_name: String,
188 #[serde(default)]
190 pub authenticator_kind: AuthenticatorKind,
191 #[serde(default)]
193 pub enable_rbac: bool,
194
195 #[serde(default)]
202 pub environment_id: Uuid,
203
204 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
208 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
213 pub internal_certificate_spec: Option<MaterializeCertSpec>,
217 }
218
219 impl Materialize {
220 pub fn backend_secret_name(&self) -> String {
221 self.spec.backend_secret_name.clone()
222 }
223
224 pub fn namespace(&self) -> String {
225 self.meta().namespace.clone().unwrap()
226 }
227
228 pub fn create_service_account(&self) -> bool {
229 self.spec.service_account_name.is_none()
230 }
231
232 pub fn service_account_name(&self) -> String {
233 self.spec
234 .service_account_name
235 .clone()
236 .unwrap_or_else(|| self.name_unchecked())
237 }
238
239 pub fn role_name(&self) -> String {
240 self.name_unchecked()
241 }
242
243 pub fn role_binding_name(&self) -> String {
244 self.name_unchecked()
245 }
246
247 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
248 self.name_prefixed(&format!("environmentd-{generation}"))
249 }
250
251 pub fn environmentd_app_name(&self) -> String {
252 "environmentd".to_owned()
253 }
254
255 pub fn environmentd_service_name(&self) -> String {
256 self.name_prefixed("environmentd")
257 }
258
259 pub fn environmentd_service_internal_fqdn(&self) -> String {
260 format!(
261 "{}.{}.svc.cluster.local",
262 self.environmentd_service_name(),
263 self.meta().namespace.as_ref().unwrap()
264 )
265 }
266
267 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
268 self.name_prefixed(&format!("environmentd-{generation}"))
269 }
270
271 pub fn balancerd_app_name(&self) -> String {
272 "balancerd".to_owned()
273 }
274
275 pub fn environmentd_certificate_name(&self) -> String {
276 self.name_prefixed("environmentd-external")
277 }
278
279 pub fn environmentd_certificate_secret_name(&self) -> String {
280 self.name_prefixed("environmentd-tls")
281 }
282
283 pub fn balancerd_deployment_name(&self) -> String {
284 self.name_prefixed("balancerd")
285 }
286
287 pub fn balancerd_service_name(&self) -> String {
288 self.name_prefixed("balancerd")
289 }
290
291 pub fn console_app_name(&self) -> String {
292 "console".to_owned()
293 }
294
295 pub fn balancerd_external_certificate_name(&self) -> String {
296 self.name_prefixed("balancerd-external")
297 }
298
299 pub fn balancerd_external_certificate_secret_name(&self) -> String {
300 self.name_prefixed("balancerd-external-tls")
301 }
302
303 pub fn balancerd_replicas(&self) -> i32 {
304 self.spec.balancerd_replicas.unwrap_or(2)
305 }
306
307 pub fn console_replicas(&self) -> i32 {
308 self.spec.console_replicas.unwrap_or(2)
309 }
310
311 pub fn console_configmap_name(&self) -> String {
312 self.name_prefixed("console")
313 }
314
315 pub fn console_deployment_name(&self) -> String {
316 self.name_prefixed("console")
317 }
318
319 pub fn console_service_name(&self) -> String {
320 self.name_prefixed("console")
321 }
322
323 pub fn console_external_certificate_name(&self) -> String {
324 self.name_prefixed("console-external")
325 }
326
327 pub fn console_external_certificate_secret_name(&self) -> String {
328 self.name_prefixed("console-external-tls")
329 }
330
331 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
332 self.name_prefixed(&format!("persist-pubsub-{generation}"))
333 }
334
335 pub fn listeners_configmap_name(&self, generation: u64) -> String {
336 self.name_prefixed(&format!("listeners-{generation}"))
337 }
338
339 pub fn name_prefixed(&self, suffix: &str) -> String {
340 format!("mz{}-{}", self.resource_id(), suffix)
341 }
342
343 pub fn resource_id(&self) -> &str {
344 &self.status.as_ref().unwrap().resource_id
345 }
346
347 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
348 self.spec
349 .environmentd_scratch_volume_storage_requirement
350 .clone()
351 .unwrap_or_else(|| {
352 self.spec
353 .environmentd_resource_requirements
354 .as_ref()
355 .and_then(|requirements| {
356 requirements
357 .requests
358 .as_ref()
359 .or(requirements.limits.as_ref())
360 })
361 .and_then(|requirements| requirements.get("memory").cloned())
366 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
368 })
369 }
370
371 pub fn default_labels(&self) -> BTreeMap<String, String> {
372 BTreeMap::from_iter([
373 (
374 "materialize.cloud/organization-name".to_owned(),
375 self.name_unchecked(),
376 ),
377 (
378 "materialize.cloud/organization-namespace".to_owned(),
379 self.namespace(),
380 ),
381 (
382 "materialize.cloud/mz-resource-id".to_owned(),
383 self.resource_id().to_owned(),
384 ),
385 ])
386 }
387
388 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
389 format!(
390 "{}-{}-{}-0",
391 cloud_provider, region, self.spec.environment_id,
392 )
393 }
394
395 pub fn requested_reconciliation_id(&self) -> Uuid {
396 self.spec.request_rollout
397 }
398
399 pub fn rollout_requested(&self) -> bool {
400 self.requested_reconciliation_id()
401 != self
402 .status
403 .as_ref()
404 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
405 }
406
407 pub fn set_force_promote(&mut self) {
408 self.spec.force_promote = self.spec.request_rollout;
409 }
410
411 pub fn should_force_promote(&self) -> bool {
412 self.spec.force_promote == self.spec.request_rollout
413 || self.spec.rollout_strategy
414 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
415 }
416
417 pub fn conditions_need_update(&self) -> bool {
418 let Some(status) = self.status.as_ref() else {
419 return true;
420 };
421 if status.conditions.is_empty() {
422 return true;
423 }
424 for condition in &status.conditions {
425 if condition.observed_generation != self.meta().generation {
426 return true;
427 }
428 }
429 false
430 }
431
432 pub fn is_promoting(&self) -> bool {
433 let Some(status) = self.status.as_ref() else {
434 return false;
435 };
436 if status.conditions.is_empty() {
437 return false;
438 }
439 status
440 .conditions
441 .iter()
442 .any(|condition| condition.reason == "Promoting")
443 }
444
445 pub fn update_in_progress(&self) -> bool {
446 let Some(status) = self.status.as_ref() else {
447 return false;
448 };
449 if status.conditions.is_empty() {
450 return false;
451 }
452 for condition in &status.conditions {
453 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
454 return true;
455 }
456 }
457 false
458 }
459
460 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
464 let version = parse_image_ref(&self.spec.environmentd_image_ref);
465 match version {
466 Some(version) => &version >= minimum,
467 None => {
473 tracing::warn!(
474 image_ref = %self.spec.environmentd_image_ref,
475 "failed to parse image ref",
476 );
477 true
478 }
479 }
480 }
481
482 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
483 ObjectMeta {
484 namespace: Some(self.namespace()),
485 name: Some(name),
486 labels: Some(self.default_labels()),
487 owner_references: Some(vec![owner_reference(self)]),
488 ..Default::default()
489 }
490 }
491
492 pub fn status(&self) -> MaterializeStatus {
493 self.status.clone().unwrap_or_else(|| {
494 let mut status = MaterializeStatus::default();
495 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
500 status.resource_id = rand::rng()
501 .sample_iter(Uniform::new(0, CHARSET.len()).expect("valid range"))
502 .take(10)
503 .map(|i| char::from(CHARSET[i]))
504 .collect();
505
506 if let Some(last_active_generation) = self
511 .annotations()
512 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
513 {
514 status.active_generation = last_active_generation
515 .parse()
516 .expect("valid int generation");
517 }
518
519 status
520 })
521 }
522 }
523
524 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
525 #[serde(rename_all = "camelCase")]
526 pub struct MaterializeStatus {
527 pub resource_id: String,
529 pub active_generation: u64,
531 pub last_completed_rollout_request: Uuid,
533 pub resources_hash: String,
538 pub conditions: Vec<Condition>,
539 }
540
541 impl MaterializeStatus {
542 pub fn needs_update(&self, other: &Self) -> bool {
543 let now = chrono::offset::Utc::now();
544 let mut a = self.clone();
545 for condition in &mut a.conditions {
546 condition.last_transition_time = Time(now);
547 }
548 let mut b = other.clone();
549 for condition in &mut b.conditions {
550 condition.last_transition_time = Time(now);
551 }
552 a != b
553 }
554 }
555}
556
557fn parse_image_ref(image_ref: &str) -> Option<Version> {
558 image_ref
559 .rsplit_once(':')
560 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
561 .and_then(|tag| {
562 let tag = tag.replace("--", "+");
567 Version::parse(&tag).ok()
568 })
569}
570
571fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
572 OwnerReference {
573 api_version: T::api_version(&()).to_string(),
574 kind: T::kind(&()).to_string(),
575 name: t.name_unchecked(),
576 uid: t.uid().unwrap(),
577 block_owner_deletion: Some(true),
578 ..Default::default()
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use kube::core::ObjectMeta;
585 use semver::Version;
586
587 use super::v1alpha1::{Materialize, MaterializeSpec};
588
589 #[mz_ore::test]
590 fn meets_minimum_version() {
591 let mut mz = Materialize {
592 spec: MaterializeSpec {
593 environmentd_image_ref:
594 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
595 .to_owned(),
596 ..Default::default()
597 },
598 metadata: ObjectMeta {
599 ..Default::default()
600 },
601 status: None,
602 };
603
604 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
606 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
607 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
608 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
609 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
610 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
611 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
612 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
613 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
614 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
615 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
616 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
617 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
618 mz.spec.environmentd_image_ref =
619 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
620 .to_owned();
621 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
622
623 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
625 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
626 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
627 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
628 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
629 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
630 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
631 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
632 }
633}