1use std::{
11 collections::BTreeSet,
12 sync::{Arc, Mutex},
13};
14
15use anyhow::Context as _;
16use http::HeaderValue;
17use k8s_openapi::{
18 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
19 apimachinery::pkg::apis::meta::v1::{Condition, Time},
20};
21use kube::{
22 Api, Client, Resource, ResourceExt,
23 api::PostParams,
24 runtime::{controller::Action, reflector},
25};
26use tracing::{debug, trace};
27use uuid::Uuid;
28
29use crate::{
30 Error, controller::materialize::environmentd::V161, k8s::make_reflector,
31 matching_image_from_environmentd_image_ref, metrics::Metrics, tls::DefaultCertificateSpecs,
32};
33use mz_cloud_provider::CloudProvider;
34use mz_cloud_resources::crd::materialize::v1alpha1::{
35 Materialize, MaterializeRolloutStrategy, MaterializeStatus,
36};
37use mz_license_keys::validate;
38use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
39use mz_orchestrator_tracing::TracingCliArgs;
40use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
41
42pub mod balancer;
43pub mod console;
44pub mod environmentd;
45
46pub struct Config {
47 pub cloud_provider: CloudProvider,
48 pub region: String,
49 pub create_balancers: bool,
50 pub create_console: bool,
51 pub helm_chart_version: Option<String>,
52 pub secrets_controller: String,
53 pub collect_pod_metrics: bool,
54 pub enable_prometheus_scrape_annotations: bool,
55
56 pub segment_api_key: Option<String>,
57 pub segment_client_side: bool,
58
59 pub console_image_tag_default: String,
60 pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
61
62 pub aws_account_id: Option<String>,
63 pub environmentd_iam_role_arn: Option<String>,
64 pub environmentd_connection_role_arn: Option<String>,
65 pub aws_secrets_controller_tags: Vec<String>,
66 pub environmentd_availability_zones: Option<Vec<String>>,
67
68 pub ephemeral_volume_class: Option<String>,
69 pub scheduler_name: Option<String>,
70 pub enable_security_context: bool,
71 pub enable_internal_statement_logging: bool,
72 pub disable_statement_logging: bool,
73
74 pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
75 pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
76 pub environmentd_affinity: Option<Affinity>,
77 pub environmentd_tolerations: Option<Vec<Toleration>>,
78 pub environmentd_default_resources: Option<ResourceRequirements>,
79 pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
80 pub clusterd_affinity: Option<Affinity>,
81 pub clusterd_tolerations: Option<Vec<Toleration>>,
82 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
83 pub balancerd_affinity: Option<Affinity>,
84 pub balancerd_tolerations: Option<Vec<Toleration>>,
85 pub balancerd_default_resources: Option<ResourceRequirements>,
86 pub console_node_selector: Vec<KeyValueArg<String, String>>,
87 pub console_affinity: Option<Affinity>,
88 pub console_tolerations: Option<Vec<Toleration>>,
89 pub console_default_resources: Option<ResourceRequirements>,
90 pub image_pull_policy: KubernetesImagePullPolicy,
91 pub network_policies_internal_enabled: bool,
92 pub network_policies_ingress_enabled: bool,
93 pub network_policies_ingress_cidrs: Vec<String>,
94 pub network_policies_egress_enabled: bool,
95 pub network_policies_egress_cidrs: Vec<String>,
96
97 pub environmentd_cluster_replica_sizes: Option<String>,
98 pub bootstrap_default_cluster_replica_size: Option<String>,
99 pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
100 pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
101 pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
102 pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
103 pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
104 pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
105 pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
106 pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
107 pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
108
109 pub environmentd_allowed_origins: Vec<HeaderValue>,
110 pub internal_console_proxy_url: String,
111
112 pub environmentd_sql_port: u16,
113 pub environmentd_http_port: u16,
114 pub environmentd_internal_sql_port: u16,
115 pub environmentd_internal_http_port: u16,
116 pub environmentd_internal_persist_pubsub_port: u16,
117
118 pub balancerd_sql_port: u16,
119 pub balancerd_http_port: u16,
120 pub balancerd_internal_http_port: u16,
121
122 pub console_http_port: u16,
123
124 pub default_certificate_specs: DefaultCertificateSpecs,
125
126 pub disable_license_key_checks: bool,
127
128 pub tracing: TracingCliArgs,
129 pub orchestratord_namespace: String,
130}
131
132pub struct Context {
133 config: Config,
134 metrics: Arc<Metrics>,
135 materializes: reflector::Store<Materialize>,
136 needs_update: Arc<Mutex<BTreeSet<String>>>,
137}
138
139impl Context {
140 pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
141 if config.cloud_provider == CloudProvider::Aws {
142 assert!(
143 config.aws_account_id.is_some(),
144 "--aws-account-id is required when using --cloud-provider=aws"
145 );
146 }
147
148 Self {
149 config,
150 metrics,
151 materializes: make_reflector(client.clone()).await,
152 needs_update: Default::default(),
153 }
154 }
155
156 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
157 let mut needs_update_set = self.needs_update.lock().unwrap();
158 if needs_update {
159 needs_update_set.insert(mz.name_unchecked());
160 } else {
161 needs_update_set.remove(&mz.name_unchecked());
162 }
163 self.metrics
164 .environmentd_needs_update
165 .set(u64::cast_from(needs_update_set.len()));
166 }
167
168 async fn update_status(
169 &self,
170 mz_api: &Api<Materialize>,
171 mz: &Materialize,
172 status: MaterializeStatus,
173 needs_update: bool,
174 ) -> Result<Materialize, kube::Error> {
175 self.set_needs_update(mz, needs_update);
176
177 let mut new_mz = mz.clone();
178 if !mz
179 .status
180 .as_ref()
181 .map_or(true, |mz_status| mz_status.needs_update(&status))
182 {
183 return Ok(new_mz);
184 }
185
186 new_mz.status = Some(status);
187 mz_api
188 .replace_status(
189 &mz.name_unchecked(),
190 &PostParams::default(),
191 serde_json::to_vec(&new_mz).unwrap(),
192 )
193 .await
194 }
195
196 async fn promote(
197 &self,
198 client: &Client,
199 mz: &Materialize,
200 resources: environmentd::Resources,
201 active_generation: u64,
202 desired_generation: u64,
203 resources_hash: String,
204 ) -> Result<Option<Action>, Error> {
205 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
206 return Ok(Some(action));
207 }
208 resources
209 .teardown_generation(client, mz, active_generation)
210 .await?;
211 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
212 self.update_status(
213 &mz_api,
214 mz,
215 MaterializeStatus {
216 active_generation: desired_generation,
217 last_completed_rollout_request: mz.requested_reconciliation_id(),
218 last_completed_rollout_environmentd_image_ref: Some(
219 mz.spec.environmentd_image_ref.clone(),
220 ),
221 resource_id: mz.status().resource_id,
222 resources_hash,
223 conditions: vec![Condition {
224 type_: "UpToDate".into(),
225 status: "True".into(),
226 last_transition_time: Time(chrono::offset::Utc::now()),
227 message: format!(
228 "Successfully applied changes for generation {desired_generation}"
229 ),
230 observed_generation: mz.meta().generation,
231 reason: "Applied".into(),
232 }],
233 },
234 false,
235 )
236 .await?;
237 Ok(None)
238 }
239
240 fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
241 if mz.spec.environment_id.is_nil() {
242 return Err(Error::Anyhow(anyhow::anyhow!(
246 "trying to reconcile a materialize resource with no environment id - this is a bug!"
247 )));
248 }
249
250 for existing_mz in self.materializes.state() {
251 if existing_mz.spec.environment_id == mz.spec.environment_id
252 && existing_mz.metadata.uid != mz.metadata.uid
253 {
254 return Err(Error::Anyhow(anyhow::anyhow!(
255 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
256 mz.namespace(),
257 mz.name_unchecked(),
258 existing_mz.namespace(),
259 existing_mz.name_unchecked(),
260 )));
261 }
262 }
263
264 Ok(())
265 }
266}
267
268#[async_trait::async_trait]
269impl k8s_controller::Context for Context {
270 type Resource = Materialize;
271 type Error = Error;
272
273 const FINALIZER_NAME: Option<&'static str> =
274 Some("orchestratord.materialize.cloud/materialize");
275
276 #[instrument(fields(organization_name=mz.name_unchecked()))]
277 async fn apply(
278 &self,
279 client: Client,
280 mz: &Self::Resource,
281 ) -> Result<Option<Action>, Self::Error> {
282 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
283 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
284
285 let status = mz.status();
286 if mz.status.is_none() {
287 self.update_status(&mz_api, mz, status, true).await?;
288 return Ok(None);
291 }
292
293 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
294 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
295 .data
296 .as_ref()
297 .and_then(|data| data.get("license_key"))
298 {
299 let license_key = validate(
300 str::from_utf8(&license_key.0)
301 .context("invalid utf8")?
302 .trim(),
303 )?;
304 let environment_id = license_key
305 .environment_id
306 .parse()
307 .context("invalid environment id in license key")?;
308 Some(environment_id)
309 } else {
310 if mz.meets_minimum_version(&V161) {
311 return Err(Error::Anyhow(anyhow::anyhow!(
312 "license_key is required when running in kubernetes",
313 )));
314 } else {
315 None
316 }
317 };
318
319 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
320 let mut mz = mz.clone();
321 if mz.spec.request_rollout.is_nil() {
322 mz.spec.request_rollout = Uuid::new_v4();
323 }
324 if mz.spec.environment_id.is_nil() {
325 if let Some(environment_id) = license_key_environment_id {
326 if environment_id.is_nil() {
327 mz.spec.environment_id = Uuid::new_v4();
330 } else {
331 mz.spec.environment_id = environment_id;
332 }
333 } else {
334 if mz.meets_minimum_version(&V161) {
335 return Err(Error::Anyhow(anyhow::anyhow!(
336 "environmentId is not set in materialize resource {}/{} but no license key was given",
337 mz.namespace(),
338 mz.name_unchecked()
339 )));
340 } else {
341 mz.spec.environment_id = Uuid::new_v4();
342 }
343 }
344 }
345 mz_api
346 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
347 .await?;
348 return Ok(None);
352 }
353
354 if let Some(environment_id) = license_key_environment_id {
355 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
358 return Err(Error::Anyhow(anyhow::anyhow!(
359 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
360 mz.namespace(),
361 mz.name_unchecked(),
362 environment_id,
363 )));
364 }
365 }
366
367 self.check_environment_id_conflicts(mz)?;
368
369 let active_resources =
375 environmentd::Resources::new(&self.config, mz, status.active_generation);
376 let has_current_changes = status.resources_hash != active_resources.generate_hash();
377 let active_generation = status.active_generation;
378 let next_generation = active_generation + 1;
379 let desired_generation = if has_current_changes {
380 next_generation
381 } else {
382 active_generation
383 };
384
385 let resources = environmentd::Resources::new(&self.config, mz, desired_generation);
388 let resources_hash = resources.generate_hash();
389
390 let mut result = match (
391 mz.is_promoting(),
392 has_current_changes,
393 mz.rollout_requested(),
394 ) {
395 (true, _, _) => {
398 self.promote(
399 &client,
400 mz,
401 resources,
402 active_generation,
403 desired_generation,
404 resources_hash,
405 )
406 .await
407 }
408 (false, true, true) => {
410 let mz = self
422 .update_status(
423 &mz_api,
424 mz,
425 MaterializeStatus {
426 active_generation,
427 last_completed_rollout_request: status.last_completed_rollout_request,
432 last_completed_rollout_environmentd_image_ref: status
433 .last_completed_rollout_environmentd_image_ref,
434 resource_id: status.resource_id,
435 resources_hash: String::new(),
436 conditions: vec![Condition {
437 type_: "UpToDate".into(),
438 status: "Unknown".into(),
439 last_transition_time: Time(chrono::offset::Utc::now()),
440 message: format!(
441 "Applying changes for generation {desired_generation}"
442 ),
443 observed_generation: mz.meta().generation,
444 reason: "Applying".into(),
445 }],
446 },
447 active_generation != desired_generation,
448 )
449 .await?;
450 let mz = &mz;
451 let status = mz.status();
452
453 if !mz.within_upgrade_window() {
454 let last_completed_rollout_environmentd_image_ref =
455 status.last_completed_rollout_environmentd_image_ref;
456
457 self.update_status(
458 &mz_api,
459 mz,
460 MaterializeStatus {
461 active_generation,
462 last_completed_rollout_request: status.last_completed_rollout_request,
463 last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
464 resource_id: status.resource_id,
465 resources_hash: status.resources_hash,
466 conditions: vec![Condition {
467 type_: "UpToDate".into(),
468 status: "False".into(),
469 last_transition_time: Time(chrono::offset::Utc::now()),
470 message: format!(
471 "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
472 last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
473 &mz.spec.environmentd_image_ref,
474 ),
475 observed_generation: mz.meta().generation,
476 reason: "FailedDeploy".into(),
477 }],
478 },
479 active_generation != desired_generation,
480 )
481 .await?;
482 return Ok(None);
483 }
484
485 if mz.spec.rollout_strategy
486 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
487 {
488 resources
492 .teardown_generation(&client, mz, active_generation)
493 .await?;
494 }
495
496 trace!("applying environment resources");
497 match resources
498 .apply(&client, mz.should_force_promote(), &mz.namespace())
499 .await
500 {
501 Ok(Some(action)) => {
502 trace!("new environment is not yet ready");
503 Ok(Some(action))
504 }
505 Ok(None) => {
506 self.update_status(
514 &mz_api,
515 mz,
516 MaterializeStatus {
517 active_generation,
518 last_completed_rollout_request: status
523 .last_completed_rollout_request,
524 last_completed_rollout_environmentd_image_ref: status
525 .last_completed_rollout_environmentd_image_ref,
526 resource_id: status.resource_id,
527 resources_hash: resources_hash.clone(),
528 conditions: vec![Condition {
529 type_: "UpToDate".into(),
530 status: "Unknown".into(),
531 last_transition_time: Time(chrono::offset::Utc::now()),
532 message: format!(
533 "Attempting to promote generation {desired_generation}"
534 ),
535 observed_generation: mz.meta().generation,
536 reason: "Promoting".into(),
537 }],
538 },
539 active_generation != desired_generation,
540 )
541 .await?;
542 self.promote(
543 &client,
544 mz,
545 resources,
546 active_generation,
547 desired_generation,
548 resources_hash,
549 )
550 .await
551 }
552 Err(e) => {
553 self.update_status(
554 &mz_api,
555 mz,
556 MaterializeStatus {
557 active_generation,
558 last_completed_rollout_request: status.last_completed_rollout_request,
563 last_completed_rollout_environmentd_image_ref: status
564 .last_completed_rollout_environmentd_image_ref,
565 resource_id: status.resource_id,
566 resources_hash: status.resources_hash,
567 conditions: vec![Condition {
568 type_: "UpToDate".into(),
569 status: "False".into(),
570 last_transition_time: Time(chrono::offset::Utc::now()),
571 message: format!(
572 "Failed to apply changes for generation {desired_generation}: {e}"
573 ),
574 observed_generation: mz.meta().generation,
575 reason: "FailedDeploy".into(),
576 }],
577 },
578 active_generation != desired_generation,
579 )
580 .await?;
581 Err(e)
582 }
583 }
584 }
585 (false, true, false) => {
587 let mut needs_update = mz.conditions_need_update();
588 if mz.update_in_progress() {
589 resources
590 .teardown_generation(&client, mz, next_generation)
591 .await?;
592 needs_update = true;
593 }
594 if needs_update {
595 self.update_status(
596 &mz_api,
597 mz,
598 MaterializeStatus {
599 active_generation,
600 last_completed_rollout_request: mz.requested_reconciliation_id(),
601 last_completed_rollout_environmentd_image_ref: status
602 .last_completed_rollout_environmentd_image_ref,
603 resource_id: status.resource_id,
604 resources_hash: status.resources_hash,
605 conditions: vec![Condition {
606 type_: "UpToDate".into(),
607 status: "False".into(),
608 last_transition_time: Time(chrono::offset::Utc::now()),
609 message: format!(
610 "Changes detected, waiting for approval for generation {desired_generation}"
611 ),
612 observed_generation: mz.meta().generation,
613 reason: "WaitingForApproval".into(),
614 }],
615 },
616 active_generation != desired_generation,
617 )
618 .await?;
619 }
620 debug!("changes detected, waiting for approval");
621 Ok(None)
622 }
623 (false, false, _) => {
625 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
630 if mz.update_in_progress() {
631 resources
632 .teardown_generation(&client, mz, next_generation)
633 .await?;
634 needs_update = true;
635 }
636 if needs_update {
637 self.update_status(
638 &mz_api,
639 mz,
640 MaterializeStatus {
641 active_generation,
642 last_completed_rollout_request: mz.requested_reconciliation_id(),
643 last_completed_rollout_environmentd_image_ref: status
644 .last_completed_rollout_environmentd_image_ref,
645 resource_id: status.resource_id,
646 resources_hash: status.resources_hash,
647 conditions: vec![Condition {
648 type_: "UpToDate".into(),
649 status: "True".into(),
650 last_transition_time: Time(chrono::offset::Utc::now()),
651 message: format!(
652 "No changes found from generation {active_generation}"
653 ),
654 observed_generation: mz.meta().generation,
655 reason: "Applied".into(),
656 }],
657 },
658 active_generation != desired_generation,
659 )
660 .await?;
661 }
662 debug!("no changes");
663 Ok(None)
664 }
665 }?;
666
667 if let Some(action) = result {
668 return Ok(Some(action));
669 }
670
671 let balancer = balancer::Resources::new(&self.config, mz);
676 if self.config.create_balancers {
677 result = balancer.apply(&client, &mz.namespace()).await?;
678 } else {
679 result = balancer.cleanup(&client, &mz.namespace()).await?;
680 }
681
682 if let Some(action) = result {
683 return Ok(Some(action));
684 }
685
686 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
691 else {
692 return Err(Error::Anyhow(anyhow::anyhow!(
693 "failed to parse environmentd image ref: {}",
694 mz.spec.environmentd_image_ref
695 )));
696 };
697 let console_image_tag = self
698 .config
699 .console_image_tag_map
700 .iter()
701 .find(|kv| kv.key == environmentd_image_tag)
702 .map(|kv| kv.value.clone())
703 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
704 let console = console::Resources::new(
705 &self.config,
706 mz,
707 &matching_image_from_environmentd_image_ref(
708 &mz.spec.environmentd_image_ref,
709 "console",
710 Some(&console_image_tag),
711 ),
712 );
713 if self.config.create_console {
714 console.apply(&client, &mz.namespace()).await?;
715 } else {
716 console.cleanup(&client, &mz.namespace()).await?;
717 }
718
719 Ok(result)
720 }
721
722 #[instrument(fields(organization_name=mz.name_unchecked()))]
723 async fn cleanup(
724 &self,
725 _client: Client,
726 mz: &Self::Resource,
727 ) -> Result<Option<Action>, Self::Error> {
728 self.set_needs_update(mz, false);
729
730 Ok(None)
731 }
732}