1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34 "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38 use super::*;
39
40 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44 #[serde(rename_all = "camelCase")]
45 pub struct MaterializeCertSpec {
46 pub dns_names: Option<Vec<String>>,
48 pub duration: Option<String>,
52 pub renew_before: Option<String>,
56 pub issuer_ref: Option<CertificateIssuerRef>,
58 pub secret_template: Option<CertificateSecretTemplate>,
60 }
61 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62 pub enum MaterializeRolloutStrategy {
63 #[default]
67 WaitUntilReady,
68
69 ImmediatelyPromoteCausingDowntime,
80 }
81
82 #[derive(
83 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84 )]
85 #[serde(rename_all = "camelCase")]
86 #[kube(
87 namespaced,
88 group = "materialize.cloud",
89 version = "v1alpha1",
90 kind = "Materialize",
91 singular = "materialize",
92 plural = "materializes",
93 shortname = "mzs",
94 status = "MaterializeStatus",
95 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97 )]
98 pub struct MaterializeSpec {
99 pub environmentd_image_ref: String,
101 pub environmentd_extra_args: Option<Vec<String>>,
103 pub environmentd_extra_env: Option<Vec<EnvVar>>,
105 pub environmentd_iam_role_arn: Option<String>,
111 pub environmentd_connection_role_arn: Option<String>,
114 pub environmentd_resource_requirements: Option<ResourceRequirements>,
116 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
118 pub balancerd_resource_requirements: Option<ResourceRequirements>,
120 pub console_resource_requirements: Option<ResourceRequirements>,
122 pub balancerd_replicas: Option<i32>,
124 pub console_replicas: Option<i32>,
126
127 pub service_account_name: Option<String>,
130 pub service_account_annotations: Option<BTreeMap<String, String>>,
137 pub service_account_labels: Option<BTreeMap<String, String>>,
139 pub pod_annotations: Option<BTreeMap<String, String>>,
141 pub pod_labels: Option<BTreeMap<String, String>>,
143
144 #[serde(default)]
156 pub request_rollout: Uuid,
157 #[serde(default)]
162 pub force_promote: Uuid,
163 #[serde(default)]
169 pub force_rollout: Uuid,
170 #[serde(default)]
172 pub in_place_rollout: bool,
173 #[serde(default)]
175 pub rollout_strategy: MaterializeRolloutStrategy,
176 pub backend_secret_name: String,
180 #[serde(default)]
184 pub authenticator_kind: AuthenticatorKind,
185 #[serde(default)]
187 pub enable_rbac: bool,
188
189 #[serde(default)]
196 pub environment_id: Uuid,
197
198 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
202 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
207 pub internal_certificate_spec: Option<MaterializeCertSpec>,
211 }
212
213 impl Materialize {
214 pub fn backend_secret_name(&self) -> String {
215 self.spec.backend_secret_name.clone()
216 }
217
218 pub fn namespace(&self) -> String {
219 self.meta().namespace.clone().unwrap()
220 }
221
222 pub fn create_service_account(&self) -> bool {
223 self.spec.service_account_name.is_none()
224 }
225
226 pub fn service_account_name(&self) -> String {
227 self.spec
228 .service_account_name
229 .clone()
230 .unwrap_or_else(|| self.name_unchecked())
231 }
232
233 pub fn role_name(&self) -> String {
234 self.name_unchecked()
235 }
236
237 pub fn role_binding_name(&self) -> String {
238 self.name_unchecked()
239 }
240
241 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
242 self.name_prefixed(&format!("environmentd-{generation}"))
243 }
244
245 pub fn environmentd_app_name(&self) -> String {
246 "environmentd".to_owned()
247 }
248
249 pub fn environmentd_service_name(&self) -> String {
250 self.name_prefixed("environmentd")
251 }
252
253 pub fn environmentd_service_internal_fqdn(&self) -> String {
254 format!(
255 "{}.{}.svc.cluster.local",
256 self.environmentd_service_name(),
257 self.meta().namespace.as_ref().unwrap()
258 )
259 }
260
261 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
262 self.name_prefixed(&format!("environmentd-{generation}"))
263 }
264
265 pub fn balancerd_app_name(&self) -> String {
266 "balancerd".to_owned()
267 }
268
269 pub fn environmentd_certificate_name(&self) -> String {
270 self.name_prefixed("environmentd-external")
271 }
272
273 pub fn environmentd_certificate_secret_name(&self) -> String {
274 self.name_prefixed("environmentd-tls")
275 }
276
277 pub fn balancerd_deployment_name(&self) -> String {
278 self.name_prefixed("balancerd")
279 }
280
281 pub fn balancerd_service_name(&self) -> String {
282 self.name_prefixed("balancerd")
283 }
284
285 pub fn console_app_name(&self) -> String {
286 "console".to_owned()
287 }
288
289 pub fn balancerd_external_certificate_name(&self) -> String {
290 self.name_prefixed("balancerd-external")
291 }
292
293 pub fn balancerd_external_certificate_secret_name(&self) -> String {
294 self.name_prefixed("balancerd-external-tls")
295 }
296
297 pub fn balancerd_replicas(&self) -> i32 {
298 self.spec.balancerd_replicas.unwrap_or(2)
299 }
300
301 pub fn console_replicas(&self) -> i32 {
302 self.spec.console_replicas.unwrap_or(2)
303 }
304
305 pub fn console_configmap_name(&self) -> String {
306 self.name_prefixed("console")
307 }
308
309 pub fn console_deployment_name(&self) -> String {
310 self.name_prefixed("console")
311 }
312
313 pub fn console_service_name(&self) -> String {
314 self.name_prefixed("console")
315 }
316
317 pub fn console_external_certificate_name(&self) -> String {
318 self.name_prefixed("console-external")
319 }
320
321 pub fn console_external_certificate_secret_name(&self) -> String {
322 self.name_prefixed("console-external-tls")
323 }
324
325 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
326 self.name_prefixed(&format!("persist-pubsub-{generation}"))
327 }
328
329 pub fn listeners_configmap_name(&self, generation: u64) -> String {
330 self.name_prefixed(&format!("listeners-{generation}"))
331 }
332
333 pub fn name_prefixed(&self, suffix: &str) -> String {
334 format!("mz{}-{}", self.resource_id(), suffix)
335 }
336
337 pub fn resource_id(&self) -> &str {
338 &self.status.as_ref().unwrap().resource_id
339 }
340
341 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
342 self.spec
343 .environmentd_scratch_volume_storage_requirement
344 .clone()
345 .unwrap_or_else(|| {
346 self.spec
347 .environmentd_resource_requirements
348 .as_ref()
349 .and_then(|requirements| {
350 requirements
351 .requests
352 .as_ref()
353 .or(requirements.limits.as_ref())
354 })
355 .and_then(|requirements| requirements.get("memory").cloned())
360 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
362 })
363 }
364
365 pub fn default_labels(&self) -> BTreeMap<String, String> {
366 BTreeMap::from_iter([
367 (
368 "materialize.cloud/organization-name".to_owned(),
369 self.name_unchecked(),
370 ),
371 (
372 "materialize.cloud/organization-namespace".to_owned(),
373 self.namespace(),
374 ),
375 (
376 "materialize.cloud/mz-resource-id".to_owned(),
377 self.resource_id().to_owned(),
378 ),
379 ])
380 }
381
382 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
383 format!(
384 "{}-{}-{}-0",
385 cloud_provider, region, self.spec.environment_id,
386 )
387 }
388
389 pub fn requested_reconciliation_id(&self) -> Uuid {
390 self.spec.request_rollout
391 }
392
393 pub fn rollout_requested(&self) -> bool {
394 self.requested_reconciliation_id()
395 != self
396 .status
397 .as_ref()
398 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
399 }
400
401 pub fn set_force_promote(&mut self) {
402 self.spec.force_promote = self.spec.request_rollout;
403 }
404
405 pub fn should_force_promote(&self) -> bool {
406 self.spec.force_promote == self.spec.request_rollout
407 || self.spec.rollout_strategy
408 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
409 }
410
411 pub fn conditions_need_update(&self) -> bool {
412 let Some(status) = self.status.as_ref() else {
413 return true;
414 };
415 if status.conditions.is_empty() {
416 return true;
417 }
418 for condition in &status.conditions {
419 if condition.observed_generation != self.meta().generation {
420 return true;
421 }
422 }
423 false
424 }
425
426 pub fn is_promoting(&self) -> bool {
427 let Some(status) = self.status.as_ref() else {
428 return false;
429 };
430 if status.conditions.is_empty() {
431 return false;
432 }
433 status
434 .conditions
435 .iter()
436 .any(|condition| condition.reason == "Promoting")
437 }
438
439 pub fn update_in_progress(&self) -> bool {
440 let Some(status) = self.status.as_ref() else {
441 return false;
442 };
443 if status.conditions.is_empty() {
444 return false;
445 }
446 for condition in &status.conditions {
447 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
448 return true;
449 }
450 }
451 false
452 }
453
454 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
458 let version = parse_image_ref(&self.spec.environmentd_image_ref);
459 match version {
460 Some(version) => &version >= minimum,
461 None => {
467 tracing::warn!(
468 image_ref = %self.spec.environmentd_image_ref,
469 "failed to parse image ref",
470 );
471 true
472 }
473 }
474 }
475
476 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
477 ObjectMeta {
478 namespace: Some(self.namespace()),
479 name: Some(name),
480 labels: Some(self.default_labels()),
481 owner_references: Some(vec![owner_reference(self)]),
482 ..Default::default()
483 }
484 }
485
486 pub fn status(&self) -> MaterializeStatus {
487 self.status.clone().unwrap_or_else(|| {
488 let mut status = MaterializeStatus::default();
489 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
494 status.resource_id = rand::thread_rng()
495 .sample_iter(Uniform::new(0, CHARSET.len()))
496 .take(10)
497 .map(|i| char::from(CHARSET[i]))
498 .collect();
499
500 if let Some(last_active_generation) = self
505 .annotations()
506 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
507 {
508 status.active_generation = last_active_generation
509 .parse()
510 .expect("valid int generation");
511 }
512
513 status
514 })
515 }
516 }
517
518 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
519 #[serde(rename_all = "camelCase")]
520 pub struct MaterializeStatus {
521 pub resource_id: String,
522 pub active_generation: u64,
523 pub last_completed_rollout_request: Uuid,
524 pub resources_hash: String,
525 pub conditions: Vec<Condition>,
526 }
527
528 impl MaterializeStatus {
529 pub fn needs_update(&self, other: &Self) -> bool {
530 let now = chrono::offset::Utc::now();
531 let mut a = self.clone();
532 for condition in &mut a.conditions {
533 condition.last_transition_time = Time(now);
534 }
535 let mut b = other.clone();
536 for condition in &mut b.conditions {
537 condition.last_transition_time = Time(now);
538 }
539 a != b
540 }
541 }
542}
543
544fn parse_image_ref(image_ref: &str) -> Option<Version> {
545 image_ref
546 .rsplit_once(':')
547 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
548 .and_then(|tag| {
549 let tag = tag.replace("--", "+");
554 Version::parse(&tag).ok()
555 })
556}
557
558fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
559 OwnerReference {
560 api_version: T::api_version(&()).to_string(),
561 kind: T::kind(&()).to_string(),
562 name: t.name_unchecked(),
563 uid: t.uid().unwrap(),
564 block_owner_deletion: Some(true),
565 ..Default::default()
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use kube::core::ObjectMeta;
572 use semver::Version;
573
574 use super::v1alpha1::{Materialize, MaterializeSpec};
575
576 #[mz_ore::test]
577 fn meets_minimum_version() {
578 let mut mz = Materialize {
579 spec: MaterializeSpec {
580 environmentd_image_ref:
581 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
582 .to_owned(),
583 ..Default::default()
584 },
585 metadata: ObjectMeta {
586 ..Default::default()
587 },
588 status: None,
589 };
590
591 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
593 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
594 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
595 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
596 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
597 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
598 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
599 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
600 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
601 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
602 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
603 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
604 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
605 mz.spec.environmentd_image_ref =
606 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
607 .to_owned();
608 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
609
610 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
612 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
613 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
614 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
615 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
616 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
617 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
618 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
619 }
620}