1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34 "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38 use super::*;
39
40 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44 #[serde(rename_all = "camelCase")]
45 pub struct MaterializeCertSpec {
46 pub dns_names: Option<Vec<String>>,
48 pub duration: Option<String>,
52 pub renew_before: Option<String>,
56 pub issuer_ref: Option<CertificateIssuerRef>,
58 pub secret_template: Option<CertificateSecretTemplate>,
60 }
61
62 #[derive(
63 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
64 )]
65 #[serde(rename_all = "camelCase")]
66 #[kube(
67 namespaced,
68 group = "materialize.cloud",
69 version = "v1alpha1",
70 kind = "Materialize",
71 singular = "materialize",
72 plural = "materializes",
73 shortname = "mzs",
74 status = "MaterializeStatus",
75 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
76 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
77 )]
78 pub struct MaterializeSpec {
79 pub environmentd_image_ref: String,
81 pub environmentd_extra_args: Option<Vec<String>>,
83 pub environmentd_extra_env: Option<Vec<EnvVar>>,
85 pub environmentd_iam_role_arn: Option<String>,
91 pub environmentd_connection_role_arn: Option<String>,
94 pub environmentd_resource_requirements: Option<ResourceRequirements>,
96 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
98 pub balancerd_resource_requirements: Option<ResourceRequirements>,
100 pub console_resource_requirements: Option<ResourceRequirements>,
102 pub balancerd_replicas: Option<i32>,
104 pub console_replicas: Option<i32>,
106
107 pub service_account_name: Option<String>,
110 pub service_account_annotations: Option<BTreeMap<String, String>>,
117 pub service_account_labels: Option<BTreeMap<String, String>>,
119 pub pod_annotations: Option<BTreeMap<String, String>>,
121 pub pod_labels: Option<BTreeMap<String, String>>,
123
124 #[serde(default)]
136 pub request_rollout: Uuid,
137 #[serde(default)]
142 pub force_promote: Uuid,
143 #[serde(default)]
149 pub force_rollout: Uuid,
150 #[serde(default)]
154 pub in_place_rollout: bool,
155 pub backend_secret_name: String,
159 #[serde(default)]
162 pub authenticator_kind: AuthenticatorKind,
163 #[serde(default)]
165 pub enable_rbac: bool,
166
167 #[serde(default)]
174 pub environment_id: Uuid,
175
176 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
180 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
185 pub internal_certificate_spec: Option<MaterializeCertSpec>,
189 }
190
191 impl Materialize {
192 pub fn backend_secret_name(&self) -> String {
193 self.spec.backend_secret_name.clone()
194 }
195
196 pub fn namespace(&self) -> String {
197 self.meta().namespace.clone().unwrap()
198 }
199
200 pub fn create_service_account(&self) -> bool {
201 self.spec.service_account_name.is_none()
202 }
203
204 pub fn service_account_name(&self) -> String {
205 self.spec
206 .service_account_name
207 .clone()
208 .unwrap_or_else(|| self.name_unchecked())
209 }
210
211 pub fn role_name(&self) -> String {
212 self.name_unchecked()
213 }
214
215 pub fn role_binding_name(&self) -> String {
216 self.name_unchecked()
217 }
218
219 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
220 self.name_prefixed(&format!("environmentd-{generation}"))
221 }
222
223 pub fn environmentd_app_name(&self) -> String {
224 "environmentd".to_owned()
225 }
226
227 pub fn environmentd_service_name(&self) -> String {
228 self.name_prefixed("environmentd")
229 }
230
231 pub fn environmentd_service_internal_fqdn(&self) -> String {
232 format!(
233 "{}.{}.svc.cluster.local",
234 self.environmentd_service_name(),
235 self.meta().namespace.as_ref().unwrap()
236 )
237 }
238
239 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
240 self.name_prefixed(&format!("environmentd-{generation}"))
241 }
242
243 pub fn balancerd_app_name(&self) -> String {
244 "balancerd".to_owned()
245 }
246
247 pub fn environmentd_certificate_name(&self) -> String {
248 self.name_prefixed("environmentd-external")
249 }
250
251 pub fn environmentd_certificate_secret_name(&self) -> String {
252 self.name_prefixed("environmentd-tls")
253 }
254
255 pub fn balancerd_deployment_name(&self) -> String {
256 self.name_prefixed("balancerd")
257 }
258
259 pub fn balancerd_service_name(&self) -> String {
260 self.name_prefixed("balancerd")
261 }
262
263 pub fn console_app_name(&self) -> String {
264 "console".to_owned()
265 }
266
267 pub fn balancerd_external_certificate_name(&self) -> String {
268 self.name_prefixed("balancerd-external")
269 }
270
271 pub fn balancerd_external_certificate_secret_name(&self) -> String {
272 self.name_prefixed("balancerd-external-tls")
273 }
274
275 pub fn balancerd_replicas(&self) -> i32 {
276 self.spec.balancerd_replicas.unwrap_or(2)
277 }
278
279 pub fn console_replicas(&self) -> i32 {
280 self.spec.console_replicas.unwrap_or(2)
281 }
282
283 pub fn console_configmap_name(&self) -> String {
284 self.name_prefixed("console")
285 }
286
287 pub fn console_deployment_name(&self) -> String {
288 self.name_prefixed("console")
289 }
290
291 pub fn console_service_name(&self) -> String {
292 self.name_prefixed("console")
293 }
294
295 pub fn console_external_certificate_name(&self) -> String {
296 self.name_prefixed("console-external")
297 }
298
299 pub fn console_external_certificate_secret_name(&self) -> String {
300 self.name_prefixed("console-external-tls")
301 }
302
303 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
304 self.name_prefixed(&format!("persist-pubsub-{generation}"))
305 }
306
307 pub fn listeners_configmap_name(&self, generation: u64) -> String {
308 self.name_prefixed(&format!("listeners-{generation}"))
309 }
310
311 pub fn name_prefixed(&self, suffix: &str) -> String {
312 format!("mz{}-{}", self.resource_id(), suffix)
313 }
314
315 pub fn resource_id(&self) -> &str {
316 &self.status.as_ref().unwrap().resource_id
317 }
318
319 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
320 self.spec
321 .environmentd_scratch_volume_storage_requirement
322 .clone()
323 .unwrap_or_else(|| {
324 self.spec
325 .environmentd_resource_requirements
326 .as_ref()
327 .and_then(|requirements| {
328 requirements
329 .requests
330 .as_ref()
331 .or(requirements.limits.as_ref())
332 })
333 .and_then(|requirements| requirements.get("memory").cloned())
338 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
340 })
341 }
342
343 pub fn default_labels(&self) -> BTreeMap<String, String> {
344 BTreeMap::from_iter([
345 (
346 "materialize.cloud/organization-name".to_owned(),
347 self.name_unchecked(),
348 ),
349 (
350 "materialize.cloud/organization-namespace".to_owned(),
351 self.namespace(),
352 ),
353 (
354 "materialize.cloud/mz-resource-id".to_owned(),
355 self.resource_id().to_owned(),
356 ),
357 ])
358 }
359
360 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
361 format!(
362 "{}-{}-{}-0",
363 cloud_provider, region, self.spec.environment_id,
364 )
365 }
366
367 pub fn requested_reconciliation_id(&self) -> Uuid {
368 self.spec.request_rollout
369 }
370
371 pub fn in_place_rollout(&self) -> bool {
372 self.spec.in_place_rollout
373 }
374
375 pub fn rollout_requested(&self) -> bool {
376 self.requested_reconciliation_id()
377 != self
378 .status
379 .as_ref()
380 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
381 }
382
383 pub fn set_force_promote(&mut self) {
384 self.spec.force_promote = self.spec.request_rollout;
385 }
386
387 pub fn should_force_promote(&self) -> bool {
388 self.spec.force_promote == self.spec.request_rollout
389 }
390
391 pub fn conditions_need_update(&self) -> bool {
392 let Some(status) = self.status.as_ref() else {
393 return true;
394 };
395 if status.conditions.is_empty() {
396 return true;
397 }
398 for condition in &status.conditions {
399 if condition.observed_generation != self.meta().generation {
400 return true;
401 }
402 }
403 false
404 }
405
406 pub fn update_in_progress(&self) -> bool {
407 let Some(status) = self.status.as_ref() else {
408 return false;
409 };
410 if status.conditions.is_empty() {
411 return false;
412 }
413 for condition in &status.conditions {
414 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
415 return true;
416 }
417 }
418 false
419 }
420
421 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
425 let version = parse_image_ref(&self.spec.environmentd_image_ref);
426 match version {
427 Some(version) => &version >= minimum,
428 None => {
434 tracing::warn!(
435 image_ref = %self.spec.environmentd_image_ref,
436 "failed to parse image ref",
437 );
438 true
439 }
440 }
441 }
442
443 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
444 ObjectMeta {
445 namespace: Some(self.namespace()),
446 name: Some(name),
447 labels: Some(self.default_labels()),
448 owner_references: Some(vec![owner_reference(self)]),
449 ..Default::default()
450 }
451 }
452
453 pub fn status(&self) -> MaterializeStatus {
454 self.status.clone().unwrap_or_else(|| {
455 let mut status = MaterializeStatus::default();
456 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
461 status.resource_id = rand::thread_rng()
462 .sample_iter(Uniform::new(0, CHARSET.len()))
463 .take(10)
464 .map(|i| char::from(CHARSET[i]))
465 .collect();
466
467 if let Some(last_active_generation) = self
472 .annotations()
473 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
474 {
475 status.active_generation = last_active_generation
476 .parse()
477 .expect("valid int generation");
478 }
479
480 status
481 })
482 }
483 }
484
485 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
486 #[serde(rename_all = "camelCase")]
487 pub struct MaterializeStatus {
488 pub resource_id: String,
489 pub active_generation: u64,
490 pub last_completed_rollout_request: Uuid,
491 pub resources_hash: String,
492 pub conditions: Vec<Condition>,
493 }
494
495 impl MaterializeStatus {
496 pub fn needs_update(&self, other: &Self) -> bool {
497 let now = chrono::offset::Utc::now();
498 let mut a = self.clone();
499 for condition in &mut a.conditions {
500 condition.last_transition_time = Time(now);
501 }
502 let mut b = other.clone();
503 for condition in &mut b.conditions {
504 condition.last_transition_time = Time(now);
505 }
506 a != b
507 }
508 }
509}
510
511fn parse_image_ref(image_ref: &str) -> Option<Version> {
512 image_ref
513 .rsplit_once(':')
514 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
515 .and_then(|tag| {
516 let tag = tag.replace("--", "+");
521 Version::parse(&tag).ok()
522 })
523}
524
525fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
526 OwnerReference {
527 api_version: T::api_version(&()).to_string(),
528 kind: T::kind(&()).to_string(),
529 name: t.name_unchecked(),
530 uid: t.uid().unwrap(),
531 block_owner_deletion: Some(true),
532 ..Default::default()
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use kube::core::ObjectMeta;
539 use semver::Version;
540
541 use super::v1alpha1::{Materialize, MaterializeSpec};
542
543 #[mz_ore::test]
544 fn meets_minimum_version() {
545 let mut mz = Materialize {
546 spec: MaterializeSpec {
547 environmentd_image_ref:
548 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
549 .to_owned(),
550 ..Default::default()
551 },
552 metadata: ObjectMeta {
553 ..Default::default()
554 },
555 status: None,
556 };
557
558 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
560 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
561 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
562 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
563 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
564 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
565 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
566 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
567 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
568 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
569 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
570 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
571 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
572 mz.spec.environmentd_image_ref =
573 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
574 .to_owned();
575 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
576
577 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
579 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
580 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
581 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
582 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
583 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
584 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
585 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
586 }
587}