1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::core::v1::{EnvVar, ResourceRequirements},
14 apimachinery::pkg::{
15 api::resource::Quantity,
16 apis::meta::v1::{Condition, OwnerReference, Time},
17 },
18};
19use kube::{CustomResource, Resource, ResourceExt, api::ObjectMeta};
20use rand::Rng;
21use rand::distributions::Uniform;
22use schemars::JsonSchema;
23use semver::Version;
24use serde::{Deserialize, Serialize};
25use uuid::Uuid;
26
27use crate::crd::generated::cert_manager::certificates::{
28 CertificateIssuerRef, CertificateSecretTemplate,
29};
30
31pub const LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION: &str =
32 "materialize.cloud/last-known-active-generation";
33
34pub mod v1alpha1 {
35
36 use super::*;
37
38 #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
42 #[serde(rename_all = "camelCase")]
43 pub struct MaterializeCertSpec {
44 pub dns_names: Option<Vec<String>>,
46 pub duration: Option<String>,
50 pub renew_before: Option<String>,
54 pub issuer_ref: Option<CertificateIssuerRef>,
56 pub secret_template: Option<CertificateSecretTemplate>,
58 }
59
60 #[derive(
61 CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
62 )]
63 #[serde(rename_all = "camelCase")]
64 #[kube(
65 namespaced,
66 group = "materialize.cloud",
67 version = "v1alpha1",
68 kind = "Materialize",
69 singular = "materialize",
70 plural = "materializes",
71 shortname = "mzs",
72 status = "MaterializeStatus",
73 printcolumn = r#"{"name": "ImageRef", "type": "string", "description": "Reference to the Docker image.", "jsonPath": ".spec.imageRef", "priority": 1}"#,
74 printcolumn = r#"{"name": "UpToDate", "type": "string", "description": "Whether the spec has been applied", "jsonPath": ".status.conditions[?(@.type==\"UpToDate\")].status", "priority": 1}"#
75 )]
76 pub struct MaterializeSpec {
77 pub environmentd_image_ref: String,
79 pub environmentd_extra_args: Option<Vec<String>>,
81 pub environmentd_extra_env: Option<Vec<EnvVar>>,
83 pub environmentd_iam_role_arn: Option<String>,
86 pub environmentd_connection_role_arn: Option<String>,
89 pub environmentd_resource_requirements: Option<ResourceRequirements>,
91 pub environmentd_scratch_volume_storage_requirement: Option<Quantity>,
93 pub balancerd_resource_requirements: Option<ResourceRequirements>,
95 pub console_resource_requirements: Option<ResourceRequirements>,
97
98 #[serde(default = "Uuid::new_v4")]
110 pub request_rollout: Uuid,
111 #[serde(default)]
116 pub force_promote: Uuid,
117 #[serde(default)]
123 pub force_rollout: Uuid,
124 #[serde(default)]
128 pub in_place_rollout: bool,
129 pub backend_secret_name: String,
131
132 #[serde(default = "Uuid::new_v4")]
143 pub environment_id: Uuid,
144
145 pub balancerd_external_certificate_spec: Option<MaterializeCertSpec>,
149 pub console_external_certificate_spec: Option<MaterializeCertSpec>,
154 pub internal_certificate_spec: Option<MaterializeCertSpec>,
158 }
159
160 impl Materialize {
161 pub fn backend_secret_name(&self) -> String {
162 self.spec.backend_secret_name.clone()
163 }
164
165 pub fn namespace(&self) -> String {
166 self.meta().namespace.clone().unwrap()
167 }
168
169 pub fn service_account_name(&self) -> String {
170 self.name_unchecked()
171 }
172
173 pub fn role_name(&self) -> String {
174 self.name_unchecked()
175 }
176
177 pub fn role_binding_name(&self) -> String {
178 self.name_unchecked()
179 }
180
181 pub fn environmentd_statefulset_name(&self, generation: u64) -> String {
182 self.name_prefixed(&format!("environmentd-{generation}"))
183 }
184
185 pub fn environmentd_app_name(&self) -> String {
186 "environmentd".to_owned()
187 }
188
189 pub fn environmentd_service_name(&self) -> String {
190 self.name_prefixed("environmentd")
191 }
192
193 pub fn environmentd_service_internal_fqdn(&self) -> String {
194 format!(
195 "{}.{}.svc.cluster.local",
196 self.environmentd_service_name(),
197 self.meta().namespace.as_ref().unwrap()
198 )
199 }
200
201 pub fn environmentd_generation_service_name(&self, generation: u64) -> String {
202 self.name_prefixed(&format!("environmentd-{generation}"))
203 }
204
205 pub fn balancerd_app_name(&self) -> String {
206 "balancerd".to_owned()
207 }
208
209 pub fn environmentd_certificate_name(&self) -> String {
210 self.name_prefixed("environmentd-external")
211 }
212
213 pub fn environmentd_certificate_secret_name(&self) -> String {
214 self.name_prefixed("environmentd-tls")
215 }
216
217 pub fn balancerd_deployment_name(&self) -> String {
218 self.name_prefixed("balancerd")
219 }
220
221 pub fn balancerd_service_name(&self) -> String {
222 self.name_prefixed("balancerd")
223 }
224
225 pub fn console_app_name(&self) -> String {
226 "console".to_owned()
227 }
228
229 pub fn balancerd_external_certificate_name(&self) -> String {
230 self.name_prefixed("balancerd-external")
231 }
232
233 pub fn balancerd_external_certificate_secret_name(&self) -> String {
234 self.name_prefixed("balancerd-external-tls")
235 }
236
237 pub fn console_deployment_name(&self) -> String {
238 self.name_prefixed("console")
239 }
240
241 pub fn console_service_name(&self) -> String {
242 self.name_prefixed("console")
243 }
244
245 pub fn console_external_certificate_name(&self) -> String {
246 self.name_prefixed("console-external")
247 }
248
249 pub fn console_external_certificate_secret_name(&self) -> String {
250 self.name_prefixed("console-external-tls")
251 }
252
253 pub fn persist_pubsub_service_name(&self, generation: u64) -> String {
254 self.name_prefixed(&format!("persist-pubsub-{generation}"))
255 }
256
257 pub fn name_prefixed(&self, suffix: &str) -> String {
258 format!("mz{}-{}", self.resource_id(), suffix)
259 }
260
261 pub fn resource_id(&self) -> &str {
262 &self.status.as_ref().unwrap().resource_id
263 }
264
265 pub fn environmentd_scratch_volume_storage_requirement(&self) -> Quantity {
266 self.spec
267 .environmentd_scratch_volume_storage_requirement
268 .clone()
269 .unwrap_or_else(|| {
270 self.spec
271 .environmentd_resource_requirements
272 .as_ref()
273 .and_then(|requirements| {
274 requirements
275 .requests
276 .as_ref()
277 .or(requirements.limits.as_ref())
278 })
279 .and_then(|requirements| requirements.get("memory").cloned())
284 .unwrap_or_else(|| Quantity("4096Mi".to_string()))
286 })
287 }
288
289 pub fn default_labels(&self) -> BTreeMap<String, String> {
290 BTreeMap::from_iter([
291 (
292 "materialize.cloud/organization-name".to_owned(),
293 self.name_unchecked(),
294 ),
295 (
296 "materialize.cloud/organization-namespace".to_owned(),
297 self.namespace(),
298 ),
299 (
300 "materialize.cloud/mz-resource-id".to_owned(),
301 self.resource_id().to_owned(),
302 ),
303 ])
304 }
305
306 pub fn environment_id(&self, cloud_provider: &str, region: &str) -> String {
307 format!(
308 "{}-{}-{}-0",
309 cloud_provider, region, self.spec.environment_id,
310 )
311 }
312
313 pub fn requested_reconciliation_id(&self) -> Uuid {
314 self.spec.request_rollout
315 }
316
317 pub fn in_place_rollout(&self) -> bool {
318 self.spec.in_place_rollout
319 }
320
321 pub fn rollout_requested(&self) -> bool {
322 self.requested_reconciliation_id()
323 != self
324 .status
325 .as_ref()
326 .map_or_else(Uuid::nil, |status| status.last_completed_rollout_request)
327 }
328
329 pub fn set_force_promote(&mut self) {
330 self.spec.force_promote = self.spec.request_rollout;
331 }
332
333 pub fn should_force_promote(&self) -> bool {
334 self.spec.force_promote == self.spec.request_rollout
335 }
336
337 pub fn conditions_need_update(&self) -> bool {
338 let Some(status) = self.status.as_ref() else {
339 return true;
340 };
341 if status.conditions.is_empty() {
342 return true;
343 }
344 for condition in &status.conditions {
345 if condition.observed_generation != self.meta().generation {
346 return true;
347 }
348 }
349 false
350 }
351
352 pub fn update_in_progress(&self) -> bool {
353 let Some(status) = self.status.as_ref() else {
354 return false;
355 };
356 if status.conditions.is_empty() {
357 return false;
358 }
359 for condition in &status.conditions {
360 if condition.type_ == "UpToDate" && condition.status == "Unknown" {
361 return true;
362 }
363 }
364 false
365 }
366
367 pub fn meets_minimum_version(&self, minimum: &Version) -> bool {
371 let version = parse_image_ref(&self.spec.environmentd_image_ref);
372 match version {
373 Some(version) => &version >= minimum,
374 None => true,
380 }
381 }
382
383 pub fn managed_resource_meta(&self, name: String) -> ObjectMeta {
384 ObjectMeta {
385 namespace: Some(self.namespace()),
386 name: Some(name),
387 labels: Some(self.default_labels()),
388 owner_references: Some(vec![owner_reference(self)]),
389 ..Default::default()
390 }
391 }
392
393 pub fn status(&self) -> MaterializeStatus {
394 self.status.clone().unwrap_or_else(|| {
395 let mut status = MaterializeStatus::default();
396 const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789";
401 status.resource_id = rand::thread_rng()
402 .sample_iter(Uniform::new(0, CHARSET.len()))
403 .take(10)
404 .map(|i| char::from(CHARSET[i]))
405 .collect();
406
407 if let Some(last_active_generation) = self
412 .annotations()
413 .get(LAST_KNOWN_ACTIVE_GENERATION_ANNOTATION)
414 {
415 status.active_generation = last_active_generation
416 .parse()
417 .expect("valid int generation");
418 }
419
420 status
421 })
422 }
423 }
424
425 #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema, PartialEq)]
426 #[serde(rename_all = "camelCase")]
427 pub struct MaterializeStatus {
428 pub resource_id: String,
429 pub active_generation: u64,
430 pub last_completed_rollout_request: Uuid,
431 pub resources_hash: String,
432 pub conditions: Vec<Condition>,
433 }
434
435 impl MaterializeStatus {
436 pub fn needs_update(&self, other: &Self) -> bool {
437 let now = chrono::offset::Utc::now();
438 let mut a = self.clone();
439 for condition in &mut a.conditions {
440 condition.last_transition_time = Time(now);
441 }
442 let mut b = other.clone();
443 for condition in &mut b.conditions {
444 condition.last_transition_time = Time(now);
445 }
446 a != b
447 }
448 }
449}
450
451fn parse_image_ref(image_ref: &str) -> Option<Version> {
452 image_ref
453 .rsplit_once(':')
454 .and_then(|(_repo, tag)| tag.strip_prefix('v'))
455 .and_then(|tag| {
456 let tag = tag.replace("--", "+");
461 Version::parse(&tag).ok()
462 })
463}
464
465fn owner_reference<T: Resource<DynamicType = ()>>(t: &T) -> OwnerReference {
466 OwnerReference {
467 api_version: T::api_version(&()).to_string(),
468 kind: T::kind(&()).to_string(),
469 name: t.name_unchecked(),
470 uid: t.uid().unwrap(),
471 block_owner_deletion: Some(true),
472 ..Default::default()
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use kube::core::ObjectMeta;
479 use semver::Version;
480
481 use super::v1alpha1::{Materialize, MaterializeSpec};
482
483 #[mz_ore::test]
484 fn meets_minimum_version() {
485 let mut mz = Materialize {
486 spec: MaterializeSpec {
487 environmentd_image_ref:
488 "materialize/environmentd:devel-47116c24b8d0df33d3f60a9ee476aa8d7bce5953"
489 .to_owned(),
490 ..Default::default()
491 },
492 metadata: ObjectMeta {
493 ..Default::default()
494 },
495 status: None,
496 };
497
498 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
500 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
501 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
502 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.35.0".to_owned();
503 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
504 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.3".to_owned();
505 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
506 mz.spec.environmentd_image_ref = "materialize/environmentd@41af286dc0b172ed2f1ca934fd2278de4a1192302ffa07087cea2682e7d372e3".to_owned();
507 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
508 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.34.3".to_owned();
509 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
510 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.asdf.0".to_owned();
511 assert!(mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
512
513 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0-dev".to_owned();
515 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
516 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.33.0".to_owned();
517 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
518 mz.spec.environmentd_image_ref = "materialize/environmentd:v0.34.0".to_owned();
519 assert!(!mz.meets_minimum_version(&Version::parse("1.0.0").unwrap()));
520 mz.spec.environmentd_image_ref = "my.private.registry:5000:v0.33.3".to_owned();
521 assert!(!mz.meets_minimum_version(&Version::parse("0.34.0").unwrap()));
522 }
523}