1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use mz_server_core::listeners::AuthenticatorKind;
28
29use crate::crd::generated::cert_manager::certificates::{
30 CertificateIssuerRef, CertificateSecretTemplate,
31};
32
33pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
34 "materialize.cloud/last-known-active-generation";
35
36pub mod v1alpha1 {
37
38 use super::*;
39
40 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
44 #[serde(rename_all = "camelCase")]
45 pub struct MaterializeCertSpec {
46 pub dns_names: Option<Vec<String>>,
48 pub duration: Option<String>,
52 pub renew_before: Option<String>,
56 pub issuer_ref: Option<CertificateIssuerRef>,
58 pub secret_template: Option<CertificateSecretTemplate>,
60 }
61
62 #[derive(
63 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
64 )]
65 #[serde(rename_all = "camelCase")]
66 #[kube(
67 namespaced,
68 group = "materialize.cloud",
69 version = "v1alpha1",
70 kind = "Materialize",
71 singular = "materialize",
72 plural = "materializes",
73 shortname = "mzs",
74 status = "MaterializeStatus",
75 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.environmentdImageRef", "priority": 1}"#,
76 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
77 )]
78 pub struct MaterializeSpec {
79 pub environmentd_image_ref: String,
81 pub environmentd_extra_args: Option<Vec<String>>,
83 pub environmentd_extra_env: Option<Vec<EnvVar>>,
85 pub environmentd_iam_role_arn: Option<String>,
91 pub environmentd_connection_role_arn: Option<String>,
94 pub environmentd_resource_requirements: Option<ResourceRequirements>,
96 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
98 pub balancerd_resource_requirements: Option<ResourceRequirements>,
100 pub console_resource_requirements: Option<ResourceRequirements>,
102
103 pub service_account_name: Option<String>,
106 pub service_account_annotations: Option<BTreeMap<String, String>>,
113 pub service_account_labels: Option<BTreeMap<String, String>>,
115 pub pod_annotations: Option<BTreeMap<String, String>>,
117 pub pod_labels: Option<BTreeMap<String, String>>,
119
120 #[serde(default)]
132 pub request_rollout: Uuid,
133 #[serde(default)]
138 pub force_promote: Uuid,
139 #[serde(default)]
145 pub force_rollout: Uuid,
146 #[serde(default)]
150 pub in_place_rollout: bool,
151 pub backend_secret_name: String,
155 #[serde(default)]
158 pub authenticator_kind: AuthenticatorKind,
159 #[serde(default)]
161 pub enable_rbac: bool,
162
163 #[serde(default)]
170 pub environment_id: Uuid,
171
172 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
176 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
181 pub internal_certificate_spec: Option<MaterializeCertSpec>,
185 }
186
187 impl Materialize {
188 pub fn backend_secret_name(&self) -> String {
189 self.spec.backend_secret_name.clone()
190 }
191
192 pub fn namespace(&self) -> String {
193 self.meta().namespace.clone().unwrap()
194 }
195
196 pub fn create_service_account(&self) -> bool {
197 self.spec.service_account_name.is_none()
198 }
199
200 pub fn service_account_name(&self) -> String {
201 self.spec
202 .service_account_name
203 .clone()
204 .unwrap_or_else(|| self.name_unchecked())
205 }
206
207 pub fn role_name(&self) -> String {
208 self.name_unchecked()
209 }
210
211 pub fn role_binding_name(&self) -> String {
212 self.name_unchecked()
213 }
214
215 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
216 self.name_prefixed(&format!("environmentd-{generation}"))
217 }
218
219 pub fn environmentd_app_name(&self) -> String {
220 "environmentd".to_owned()
221 }
222
223 pub fn environmentd_service_name(&self) -> String {
224 self.name_prefixed("environmentd")
225 }
226
227 pub fn environmentd_service_internal_fqdn(&self) -> String {
228 format!(
229 "{}.{}.svc.cluster.local",
230 self.environmentd_service_name(),
231 self.meta().namespace.as_ref().unwrap()
232 )
233 }
234
235 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
236 self.name_prefixed(&format!("environmentd-{generation}"))
237 }
238
239 pub fn balancerd_app_name(&self) -> String {
240 "balancerd".to_owned()
241 }
242
243 pub fn environmentd_certificate_name(&self) -> String {
244 self.name_prefixed("environmentd-external")
245 }
246
247 pub fn environmentd_certificate_secret_name(&self) -> String {
248 self.name_prefixed("environmentd-tls")
249 }
250
251 pub fn balancerd_deployment_name(&self) -> String {
252 self.name_prefixed("balancerd")
253 }
254
255 pub fn balancerd_service_name(&self) -> String {
256 self.name_prefixed("balancerd")
257 }
258
259 pub fn console_app_name(&self) -> String {
260 "console".to_owned()
261 }
262
263 pub fn balancerd_external_certificate_name(&self) -> String {
264 self.name_prefixed("balancerd-external")
265 }
266
267 pub fn balancerd_external_certificate_secret_name(&self) -> String {
268 self.name_prefixed("balancerd-external-tls")
269 }
270
271 pub fn console_configmap_name(&self) -> String {
272 self.name_prefixed("console")
273 }
274
275 pub fn console_deployment_name(&self) -> String {
276 self.name_prefixed("console")
277 }
278
279 pub fn console_service_name(&self) -> String {
280 self.name_prefixed("console")
281 }
282
283 pub fn console_external_certificate_name(&self) -> String {
284 self.name_prefixed("console-external")
285 }
286
287 pub fn console_external_certificate_secret_name(&self) -> String {
288 self.name_prefixed("console-external-tls")
289 }
290
291 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
292 self.name_prefixed(&format!("persist-pubsub-{generation}"))
293 }
294
295 pub fn listeners_configmap_name(&self, generation: u64) -> String {
296 self.name_prefixed(&format!("listeners-{generation}"))
297 }
298
299 pub fn name_prefixed(&self, suffix: &str) -> String {
300 format!("mz{}-{}", self.resource_id(), suffix)
301 }
302
303 pub fn resource_id(&self) -> &str {
304 &self.status.as_ref().unwrap().resource_id
305 }
306
307 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
308 self.spec
309 .environmentd_scratch_volume_storage_requirement
310 .clone()
311 .unwrap_or_else(|| {
312 self.spec
313 .environmentd_resource_requirements
314 .as_ref()
315 .and_then(|requirements| {
316 requirements
317 .requests
318 .as_ref()
319 .or(requirements.limits.as_ref())
320 })
321 .and_then(|requirements| requirements.get("memory").cloned())
326 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
328 })
329 }
330
331 pub fn default_labels(&self) -> BTreeMap<String, String> {
332 BTreeMap::from_iter([
333 (
334 "materialize.cloud/organization-name".to_owned(),
335 self.name_unchecked(),
336 ),
337 (
338 "materialize.cloud/organization-namespace".to_owned(),
339 self.namespace(),
340 ),
341 (
342 "materialize.cloud/mz-resource-id".to_owned(),
343 self.resource_id().to_owned(),
344 ),
345 ])
346 }
347
348 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
349 format!(
350 "{}-{}-{}-0",
351 cloud_provider, region, self.spec.environment_id,
352 )
353 }
354
355 pub fn requested_reconciliation_id(&self) -> Uuid {
356 self.spec.request_rollout
357 }
358
359 pub fn in_place_rollout(&self) -> bool {
360 self.spec.in_place_rollout
361 }
362
363 pub fn rollout_requested(&self) -> bool {
364 self.requested_reconciliation_id()
365 != self
366 .status
367 .as_ref()
368 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
369 }
370
371 pub fn set_force_promote(&mut self) {
372 self.spec.force_promote = self.spec.request_rollout;
373 }
374
375 pub fn should_force_promote(&self) -> bool {
376 self.spec.force_promote == self.spec.request_rollout
377 }
378
379 pub fn conditions_need_update(&self) -> bool {
380 let Some(status) = self.status.as_ref() else {
381 return true;
382 };
383 if status.conditions.is_empty() {
384 return true;
385 }
386 for condition in &status.conditions {
387 if condition.observed_generation != self.meta().generation {
388 return true;
389 }
390 }
391 false
392 }
393
394 pub fn update_in_progress(&self) -> bool {
395 let Some(status) = self.status.as_ref() else {
396 return false;
397 };
398 if status.conditions.is_empty() {
399 return false;
400 }
401 for condition in &status.conditions {
402 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
403 return true;
404 }
405 }
406 false
407 }
408
409 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
413 let version = parse_image_ref(&self.spec.environmentd_image_ref);
414 match version {
415 Some(version) => &version >= minimum,
416 None => {
422 tracing::warn!(
423 image_ref = %self.spec.environmentd_image_ref,
424 "failed to parse image ref",
425 );
426 true
427 }
428 }
429 }
430
431 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
432 ObjectMeta {
433 namespace: Some(self.namespace()),
434 name: Some(name),
435 labels: Some(self.default_labels()),
436 owner_references: Some(vec![owner_reference(self)]),
437 ..Default::default()
438 }
439 }
440
441 pub fn status(&self) -> MaterializeStatus {
442 self.status.clone().unwrap_or_else(|| {
443 let mut status = MaterializeStatus::default();
444 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
449 status.resource_id = rand::thread_rng()
450 .sample_iter(Uniform::new(0, CHARSET.len()))
451 .take(10)
452 .map(|i| char::from(CHARSET[i]))
453 .collect();
454
455 if let Some(last_active_generation) = self
460 .annotations()
461 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
462 {
463 status.active_generation = last_active_generation
464 .parse()
465 .expect("valid int generation");
466 }
467
468 status
469 })
470 }
471 }
472
473 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
474 #[serde(rename_all = "camelCase")]
475 pub struct MaterializeStatus {
476 pub resource_id: String,
477 pub active_generation: u64,
478 pub last_completed_rollout_request: Uuid,
479 pub resources_hash: String,
480 pub conditions: Vec<Condition>,
481 }
482
483 impl MaterializeStatus {
484 pub fn needs_update(&self, other: &Self) -> bool {
485 let now = chrono::offset::Utc::now();
486 let mut a = self.clone();
487 for condition in &mut a.conditions {
488 condition.last_transition_time = Time(now);
489 }
490 let mut b = other.clone();
491 for condition in &mut b.conditions {
492 condition.last_transition_time = Time(now);
493 }
494 a != b
495 }
496 }
497}
498
499fn parse_image_ref(image_ref: &str) -> Option<Version> {
500 image_ref
501 .rsplit_once(':')
502 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
503 .and_then(|tag| {
504 let tag = tag.replace("--", "+");
509 Version::parse(&tag).ok()
510 })
511}
512
513fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
514 OwnerReference {
515 api_version: T::api_version(&()).to_string(),
516 kind: T::kind(&()).to_string(),
517 name: t.name_unchecked(),
518 uid: t.uid().unwrap(),
519 block_owner_deletion: Some(true),
520 ..Default::default()
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use kube::core::ObjectMeta;
527 use semver::Version;
528
529 use super::v1alpha1::{Materialize, MaterializeSpec};
530
531 #[mz_ore::test]
532 fn meets_minimum_version() {
533 let mut mz = Materialize {
534 spec: MaterializeSpec {
535 environmentd_image_ref:
536 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
537 .to_owned(),
538 ..Default::default()
539 },
540 metadata: ObjectMeta {
541 ..Default::default()
542 },
543 status: None,
544 };
545
546 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
548 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
549 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
550 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
551 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
552 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
553 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
554 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
555 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
556 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
557 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
558 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
559 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
560 mz.spec.environmentd_image_ref =
561 "materialize/environmentd:v0.146.0-dev.0--pr.g5a05a9e4ba873be8adaa528644aaae6e4c7cd29b"
562 .to_owned();
563 assert!(mz.meets_minimum_version(&Version::parse("0.146.0-dev.0").unwrap()));
564
565 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
567 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
568 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
569 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
570 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
571 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
572 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
573 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
574 }
575}