1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34 "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38 use super::*;
39
40 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44 #[serde(rename_all = "camelCase")]
45 pub struct MaterializeCertSpec {
46 pub dns_names: Option<Vec<String>>,
48 pub duration: Option<String>,
52 pub renew_before: Option<String>,
56 pub issuer_ref: Option<CertificateIssuerRef>,
58 pub secret_template: Option<CertificateSecretTemplate>,
60 }
61 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
62 pub enum MaterializeRolloutStrategy {
63 #[default]
67 WaitUntilReady,
68
69 ImmediatelyPromoteCausingDowntime,
80 }
81
82 #[derive(
83 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
84 )]
85 #[serde(rename_all = "camelCase")]
86 #[kube(
87 namespaced,
88 group = "materialize.cloud",
89 version = "v1alpha1",
90 kind = "Materialize",
91 singular = "materialize",
92 plural = "materializes",
93 shortname = "mzs",
94 status = "MaterializeStatus",
95 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
96 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
97 )]
98 pub struct MaterializeSpec {
99 pub environmentd_image_ref: String,
101 pub environmentd_extra_args: Option<Vec<String>>,
103 pub environmentd_extra_env: Option<Vec<EnvVar>>,
105 pub environmentd_iam_role_arn: Option<String>,
111 pub environmentd_connection_role_arn: Option<String>,
114 pub environmentd_resource_requirements: Option<ResourceRequirements>,
116 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
118 pub balancerd_resource_requirements: Option<ResourceRequirements>,
120 pub console_resource_requirements: Option<ResourceRequirements>,
122 pub balancerd_replicas: Option<i32>,
124 pub console_replicas: Option<i32>,
126
127 pub service_account_name: Option<String>,
130 pub service_account_annotations: Option<BTreeMap<String, String>>,
137 pub service_account_labels: Option<BTreeMap<String, String>>,
139 pub pod_annotations: Option<BTreeMap<String, String>>,
141 pub pod_labels: Option<BTreeMap<String, String>>,
143
144 #[serde(default)]
156 pub request_rollout: Uuid,
157 #[serde(default)]
162 pub force_promote: Uuid,
163 #[serde(default)]
169 pub force_rollout: Uuid,
170 #[serde(default)]
172 pub in_place_rollout: bool,
173 #[serde(default)]
175 pub rollout_strategy: MaterializeRolloutStrategy,
176 pub backend_secret_name: String,
180 #[serde(default)]
183 pub authenticator_kind: AuthenticatorKind,
184 #[serde(default)]
186 pub enable_rbac: bool,
187
188 #[serde(default)]
195 pub environment_id: Uuid,
196
197 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
201 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
206 pub internal_certificate_spec: Option<MaterializeCertSpec>,
210 }
211
212 impl Materialize {
213 pub fn backend_secret_name(&self) -> String {
214 self.spec.backend_secret_name.clone()
215 }
216
217 pub fn namespace(&self) -> String {
218 self.meta().namespace.clone().unwrap()
219 }
220
221 pub fn create_service_account(&self) -> bool {
222 self.spec.service_account_name.is_none()
223 }
224
225 pub fn service_account_name(&self) -> String {
226 self.spec
227 .service_account_name
228 .clone()
229 .unwrap_or_else(|| self.name_unchecked())
230 }
231
232 pub fn role_name(&self) -> String {
233 self.name_unchecked()
234 }
235
236 pub fn role_binding_name(&self) -> String {
237 self.name_unchecked()
238 }
239
240 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
241 self.name_prefixed(&format!("environmentd-{generation}"))
242 }
243
244 pub fn environmentd_app_name(&self) -> String {
245 "environmentd".to_owned()
246 }
247
248 pub fn environmentd_service_name(&self) -> String {
249 self.name_prefixed("environmentd")
250 }
251
252 pub fn environmentd_service_internal_fqdn(&self) -> String {
253 format!(
254 "{}.{}.svc.cluster.local",
255 self.environmentd_service_name(),
256 self.meta().namespace.as_ref().unwrap()
257 )
258 }
259
260 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
261 self.name_prefixed(&format!("environmentd-{generation}"))
262 }
263
264 pub fn balancerd_app_name(&self) -> String {
265 "balancerd".to_owned()
266 }
267
268 pub fn environmentd_certificate_name(&self) -> String {
269 self.name_prefixed("environmentd-external")
270 }
271
272 pub fn environmentd_certificate_secret_name(&self) -> String {
273 self.name_prefixed("environmentd-tls")
274 }
275
276 pub fn balancerd_deployment_name(&self) -> String {
277 self.name_prefixed("balancerd")
278 }
279
280 pub fn balancerd_service_name(&self) -> String {
281 self.name_prefixed("balancerd")
282 }
283
284 pub fn console_app_name(&self) -> String {
285 "console".to_owned()
286 }
287
288 pub fn balancerd_external_certificate_name(&self) -> String {
289 self.name_prefixed("balancerd-external")
290 }
291
292 pub fn balancerd_external_certificate_secret_name(&self) -> String {
293 self.name_prefixed("balancerd-external-tls")
294 }
295
296 pub fn balancerd_replicas(&self) -> i32 {
297 self.spec.balancerd_replicas.unwrap_or(2)
298 }
299
300 pub fn console_replicas(&self) -> i32 {
301 self.spec.console_replicas.unwrap_or(2)
302 }
303
304 pub fn console_configmap_name(&self) -> String {
305 self.name_prefixed("console")
306 }
307
308 pub fn console_deployment_name(&self) -> String {
309 self.name_prefixed("console")
310 }
311
312 pub fn console_service_name(&self) -> String {
313 self.name_prefixed("console")
314 }
315
316 pub fn console_external_certificate_name(&self) -> String {
317 self.name_prefixed("console-external")
318 }
319
320 pub fn console_external_certificate_secret_name(&self) -> String {
321 self.name_prefixed("console-external-tls")
322 }
323
324 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
325 self.name_prefixed(&format!("persist-pubsub-{generation}"))
326 }
327
328 pub fn listeners_configmap_name(&self, generation: u64) -> String {
329 self.name_prefixed(&format!("listeners-{generation}"))
330 }
331
332 pub fn name_prefixed(&self, suffix: &str) -> String {
333 format!("mz{}-{}", self.resource_id(), suffix)
334 }
335
336 pub fn resource_id(&self) -> &str {
337 &self.status.as_ref().unwrap().resource_id
338 }
339
340 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
341 self.spec
342 .environmentd_scratch_volume_storage_requirement
343 .clone()
344 .unwrap_or_else(|| {
345 self.spec
346 .environmentd_resource_requirements
347 .as_ref()
348 .and_then(|requirements| {
349 requirements
350 .requests
351 .as_ref()
352 .or(requirements.limits.as_ref())
353 })
354 .and_then(|requirements| requirements.get("memory").cloned())
359 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
361 })
362 }
363
364 pub fn default_labels(&self) -> BTreeMap<String, String> {
365 BTreeMap::from_iter([
366 (
367 "materialize.cloud/organization-name".to_owned(),
368 self.name_unchecked(),
369 ),
370 (
371 "materialize.cloud/organization-namespace".to_owned(),
372 self.namespace(),
373 ),
374 (
375 "materialize.cloud/mz-resource-id".to_owned(),
376 self.resource_id().to_owned(),
377 ),
378 ])
379 }
380
381 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
382 format!(
383 "{}-{}-{}-0",
384 cloud_provider, region, self.spec.environment_id,
385 )
386 }
387
388 pub fn requested_reconciliation_id(&self) -> Uuid {
389 self.spec.request_rollout
390 }
391
392 pub fn rollout_requested(&self) -> bool {
393 self.requested_reconciliation_id()
394 != self
395 .status
396 .as_ref()
397 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
398 }
399
400 pub fn set_force_promote(&mut self) {
401 self.spec.force_promote = self.spec.request_rollout;
402 }
403
404 pub fn should_force_promote(&self) -> bool {
405 self.spec.force_promote == self.spec.request_rollout
406 || self.spec.rollout_strategy
407 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
408 }
409
410 pub fn conditions_need_update(&self) -> bool {
411 let Some(status) = self.status.as_ref() else {
412 return true;
413 };
414 if status.conditions.is_empty() {
415 return true;
416 }
417 for condition in &status.conditions {
418 if condition.observed_generation != self.meta().generation {
419 return true;
420 }
421 }
422 false
423 }
424
425 pub fn is_promoting(&self) -> bool {
426 let Some(status) = self.status.as_ref() else {
427 return false;
428 };
429 if status.conditions.is_empty() {
430 return false;
431 }
432 status
433 .conditions
434 .iter()
435 .any(|condition| condition.reason == "Promoting")
436 }
437
438 pub fn update_in_progress(&self) -> bool {
439 let Some(status) = self.status.as_ref() else {
440 return false;
441 };
442 if status.conditions.is_empty() {
443 return false;
444 }
445 for condition in &status.conditions {
446 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
447 return true;
448 }
449 }
450 false
451 }
452
453 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
457 let version = parse_image_ref(&self.spec.environmentd_image_ref);
458 match version {
459 Some(version) => &version >= minimum,
460 None => {
466 tracing::warn!(
467 image_ref = %self.spec.environmentd_image_ref,
468 "failed to parse image ref",
469 );
470 true
471 }
472 }
473 }
474
475 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
476 ObjectMeta {
477 namespace: Some(self.namespace()),
478 name: Some(name),
479 labels: Some(self.default_labels()),
480 owner_references: Some(vec![owner_reference(self)]),
481 ..Default::default()
482 }
483 }
484
485 pub fn status(&self) -> MaterializeStatus {
486 self.status.clone().unwrap_or_else(|| {
487 let mut status = MaterializeStatus::default();
488 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
493 status.resource_id = rand::thread_rng()
494 .sample_iter(Uniform::new(0, CHARSET.len()))
495 .take(10)
496 .map(|i| char::from(CHARSET[i]))
497 .collect();
498
499 if let Some(last_active_generation) = self
504 .annotations()
505 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
506 {
507 status.active_generation = last_active_generation
508 .parse()
509 .expect("valid int generation");
510 }
511
512 status
513 })
514 }
515 }
516
517 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
518 #[serde(rename_all = "camelCase")]
519 pub struct MaterializeStatus {
520 pub resource_id: String,
521 pub active_generation: u64,
522 pub last_completed_rollout_request: Uuid,
523 pub resources_hash: String,
524 pub conditions: Vec<Condition>,
525 }
526
527 impl MaterializeStatus {
528 pub fn needs_update(&self, other: &Self) -> bool {
529 let now = chrono::offset::Utc::now();
530 let mut a = self.clone();
531 for condition in &mut a.conditions {
532 condition.last_transition_time = Time(now);
533 }
534 let mut b = other.clone();
535 for condition in &mut b.conditions {
536 condition.last_transition_time = Time(now);
537 }
538 a != b
539 }
540 }
541}
542
543fn parse_image_ref(image_ref: &str) -> Option<Version> {
544 image_ref
545 .rsplit_once(':')
546 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
547 .and_then(|tag| {
548 let tag = tag.replace("--", "+");
553 Version::parse(&tag).ok()
554 })
555}
556
557fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
558 OwnerReference {
559 api_version: T::api_version(&()).to_string(),
560 kind: T::kind(&()).to_string(),
561 name: t.name_unchecked(),
562 uid: t.uid().unwrap(),
563 block_owner_deletion: Some(true),
564 ..Default::default()
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use kube::core::ObjectMeta;
571 use semver::Version;
572
573 use super::v1alpha1::{Materialize, MaterializeSpec};
574
575 #[mz_ore::test]
576 fn meets_minimum_version() {
577 let mut mz = Materialize {
578 spec: MaterializeSpec {
579 environmentd_image_ref:
580 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
581 .to_owned(),
582 ..Default::default()
583 },
584 metadata: ObjectMeta {
585 ..Default::default()
586 },
587 status: None,
588 };
589
590 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
592 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
593 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
594 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
595 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
596 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
597 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
598 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
599 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
600 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
601 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
602 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
603 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
604 mz.spec.environmentd_image_ref =
605 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
606 .to_owned();
607 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
608
609 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
611 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
612 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
613 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
614 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
615 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
616 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
617 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
618 }
619}