1use std::{
11 collections::BTreeSet,
12 sync::{Arc, Mutex},
13 time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_controller::TraceMetadata;
19use k8s_openapi::{
20 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21 apimachinery::pkg::apis::meta::v1::{Condition, Time},
22 jiff::{SignedDuration, Timestamp},
23};
24use kube::{
25 Api, Client, Resource, ResourceExt,
26 api::{ListParams, PostParams},
27 runtime::controller::Action,
28};
29use tracing::{debug, trace, warn};
30use uuid::Uuid;
31
32use crate::{
33 Error,
34 controller::materialize::generation::V161,
35 k8s::{apply_resource, delete_resource},
36 matching_image_from_environmentd_image_ref,
37 metrics::Metrics,
38 parse_image_tag,
39 tls::{DefaultCertificateSpecs, issuer_ref_defined, resolved_dns_names},
40};
41use mz_cloud_provider::CloudProvider;
42use mz_cloud_resources::crd::{
43 ManagedResource,
44 balancer::v1alpha1::{Balancer, BalancerSpec},
45 console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
46 materialize::MaterializeRolloutStrategy,
47 materialize::v1alpha1::{Materialize, MaterializeStatus},
48};
49use mz_license_keys::validate;
50use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
51use mz_orchestrator_tracing::TracingCliArgs;
52use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
53
54pub mod generation;
55pub mod global;
56
57pub struct Config {
58 pub cloud_provider: CloudProvider,
59 pub region: String,
60 pub create_balancers: bool,
61 pub create_console: bool,
62 pub helm_chart_version: Option<String>,
63 pub secrets_controller: String,
64 pub collect_pod_metrics: bool,
65 pub enable_prometheus_scrape_annotations: bool,
66
67 pub segment_api_key: Option<String>,
68 pub segment_client_side: bool,
69
70 pub console_image_tag_default: String,
71 pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
72
73 pub aws_account_id: Option<String>,
74 pub environmentd_iam_role_arn: Option<String>,
75 pub environmentd_connection_role_arn: Option<String>,
76 pub aws_secrets_controller_tags: Vec<String>,
77 pub environmentd_availability_zones: Option<Vec<String>>,
78
79 pub ephemeral_volume_class: Option<String>,
80 pub scheduler_name: Option<String>,
81 pub enable_security_context: bool,
82 pub enable_internal_statement_logging: bool,
83 pub disable_statement_logging: bool,
84
85 pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
86 pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
87 pub environmentd_affinity: Option<Affinity>,
88 pub environmentd_tolerations: Option<Vec<Toleration>>,
89 pub environmentd_default_resources: Option<ResourceRequirements>,
90 pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
91 pub clusterd_affinity: Option<Affinity>,
92 pub clusterd_tolerations: Option<Vec<Toleration>>,
93 pub image_pull_policy: KubernetesImagePullPolicy,
94 pub network_policies_internal_enabled: bool,
95 pub network_policies_ingress_enabled: bool,
96 pub network_policies_ingress_cidrs: Vec<String>,
97 pub network_policies_egress_enabled: bool,
98 pub network_policies_egress_cidrs: Vec<String>,
99
100 pub environmentd_cluster_replica_sizes: Option<String>,
101 pub bootstrap_default_cluster_replica_size: Option<String>,
102 pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
103 pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
104 pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
105 pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
106 pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
107 pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
108 pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
109 pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
110 pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
111
112 pub environmentd_allowed_origins: Vec<HeaderValue>,
113 pub internal_console_proxy_url: String,
114
115 pub environmentd_sql_port: u16,
116 pub environmentd_http_port: u16,
117 pub environmentd_internal_sql_port: u16,
118 pub environmentd_internal_http_port: u16,
119 pub environmentd_internal_persist_pubsub_port: u16,
120
121 pub default_certificate_specs: DefaultCertificateSpecs,
122
123 pub disable_license_key_checks: bool,
124
125 pub tracing: TracingCliArgs,
126 pub orchestratord_namespace: String,
127}
128
129pub struct Context {
130 config: Config,
131 metrics: Arc<Metrics>,
132 needs_update: Arc<Mutex<BTreeSet<String>>>,
133}
134
135impl Context {
136 pub fn new(config: Config, metrics: Arc<Metrics>) -> Self {
137 if config.cloud_provider == CloudProvider::Aws {
138 assert!(
139 config.aws_account_id.is_some(),
140 "--aws-account-id is required when using --cloud-provider=aws"
141 );
142 }
143
144 Self {
145 config,
146 metrics,
147 needs_update: Default::default(),
148 }
149 }
150
151 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
152 let mut needs_update_set = self.needs_update.lock().unwrap();
153 if needs_update {
154 needs_update_set.insert(mz.name_unchecked());
155 } else {
156 needs_update_set.remove(&mz.name_unchecked());
157 }
158 self.metrics
159 .environmentd_needs_update
160 .set(u64::cast_from(needs_update_set.len()));
161 }
162
163 async fn update_status(
164 &self,
165 mz_api: &Api<Materialize>,
166 mz: &Materialize,
167 status: MaterializeStatus,
168 needs_update: bool,
169 ) -> Result<Materialize, kube::Error> {
170 self.set_needs_update(mz, needs_update);
171
172 let mut new_mz = mz.clone();
173 if !mz
174 .status
175 .as_ref()
176 .map_or(true, |mz_status| mz_status.needs_update(&status))
177 {
178 return Ok(new_mz);
179 }
180
181 new_mz.status = Some(status);
182 mz_api
183 .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
184 .await
185 }
186
187 async fn promote(
188 &self,
189 client: &Client,
190 mz: &Materialize,
191 resources: generation::Resources,
192 active_generation: u64,
193 desired_generation: u64,
194 resources_hash: String,
195 ) -> Result<Option<Action>, Error> {
196 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
197 return Ok(Some(action));
198 }
199 resources
200 .teardown_generation(client, mz, active_generation)
201 .await?;
202 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
203 self.update_status(
204 &mz_api,
205 mz,
206 MaterializeStatus {
207 active_generation: desired_generation,
208 last_completed_rollout_request: mz.requested_reconciliation_id(),
209 last_completed_rollout_environmentd_image_ref: Some(
210 mz.spec.environmentd_image_ref.clone(),
211 ),
212 resource_id: mz.status().resource_id,
213 resources_hash,
214 last_completed_rollout_hash: None,
215 conditions: vec![Condition {
216 type_: "UpToDate".into(),
217 status: "True".into(),
218 last_transition_time: Time(Timestamp::now()),
219 message: format!(
220 "Successfully applied changes for generation {desired_generation}"
221 ),
222 observed_generation: mz.meta().generation,
223 reason: "Applied".into(),
224 }],
225 },
226 false,
227 )
228 .await?;
229 Ok(None)
230 }
231
232 async fn check_environment_id_conflicts(
233 &self,
234 client: &Client,
235 mz: &Materialize,
236 ) -> Result<(), Error> {
237 if mz.spec.environment_id.is_nil() {
238 return Err(Error::Anyhow(anyhow::anyhow!(
242 "trying to reconcile a materialize resource with no environment id - this is a bug!"
243 )));
244 }
245
246 let mz_api: Api<Materialize> = Api::all(client.clone());
247 let all_mz = mz_api.list(&ListParams::default()).await?;
248 for existing_mz in &all_mz.items {
249 if existing_mz.spec.environment_id == mz.spec.environment_id
250 && existing_mz.metadata.uid != mz.metadata.uid
251 {
252 return Err(Error::Anyhow(anyhow::anyhow!(
253 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
254 mz.namespace(),
255 mz.name_unchecked(),
256 existing_mz.namespace(),
257 existing_mz.name_unchecked(),
258 )));
259 }
260 }
261
262 Ok(())
263 }
264}
265
266#[async_trait::async_trait]
267impl k8s_controller::Context for Context {
268 type Resource = Materialize;
269 type Error = Error;
270
271 const FINALIZER_NAME: Option<&'static str> =
272 Some("orchestratord.materialize.cloud/materialize");
273
274 #[instrument(fields(organization_name=mz.name_unchecked()))]
275 async fn apply(
276 &self,
277 client: Client,
278 mz: &Self::Resource,
279 _metadata: &mut TraceMetadata,
280 ) -> Result<Option<Action>, Self::Error> {
281 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
282 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
283 let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
284 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
285
286 let status = mz.status();
287 if mz.status.is_none() {
288 self.update_status(&mz_api, mz, status, true).await?;
289 return Ok(None);
292 }
293
294 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
295 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
296 .data
297 .as_ref()
298 .and_then(|data| data.get("license_key"))
299 {
300 let license_key = validate(
301 str::from_utf8(&license_key.0)
302 .context("invalid utf8")?
303 .trim(),
304 )?;
305 let environment_id = license_key
306 .environment_id
307 .parse()
308 .context("invalid environment id in license key")?;
309 Some(environment_id)
310 } else {
311 if mz.meets_minimum_version(&V161) {
312 return Err(Error::Anyhow(anyhow::anyhow!(
313 "license_key is required when running in kubernetes",
314 )));
315 } else {
316 None
317 }
318 };
319
320 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
321 let mut mz = mz.clone();
322 if mz.spec.request_rollout.is_nil() {
323 mz.spec.request_rollout = Uuid::new_v4();
324 }
325 if mz.spec.environment_id.is_nil() {
326 if let Some(environment_id) = license_key_environment_id {
327 if environment_id.is_nil() {
328 mz.spec.environment_id = Uuid::new_v4();
331 } else {
332 mz.spec.environment_id = environment_id;
333 }
334 } else {
335 if mz.meets_minimum_version(&V161) {
336 return Err(Error::Anyhow(anyhow::anyhow!(
337 "environmentId is not set in materialize resource {}/{} but no license key was given",
338 mz.namespace(),
339 mz.name_unchecked()
340 )));
341 } else {
342 mz.spec.environment_id = Uuid::new_v4();
343 }
344 }
345 }
346 mz_api
347 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
348 .await?;
349 return Ok(None);
353 }
354
355 if let Some(environment_id) = license_key_environment_id {
356 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
359 return Err(Error::Anyhow(anyhow::anyhow!(
360 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
361 mz.namespace(),
362 mz.name_unchecked(),
363 environment_id,
364 )));
365 }
366 }
367
368 self.check_environment_id_conflicts(&client, mz).await?;
369
370 global::Resources::new(&self.config, mz)?
371 .apply(&client, &mz.namespace())
372 .await?;
373
374 let active_resources =
380 generation::Resources::new(&self.config, mz, status.active_generation);
381 let has_current_changes = status.resources_hash != active_resources.generate_hash();
382 let active_generation = status.active_generation;
383 let next_generation = active_generation + 1;
384 let desired_generation = if has_current_changes {
385 next_generation
386 } else {
387 active_generation
388 };
389
390 let resources = generation::Resources::new(&self.config, mz, desired_generation);
393 let resources_hash = resources.generate_hash();
394
395 let mut result = match (
396 mz.is_promoting(),
397 has_current_changes,
398 mz.rollout_requested(),
399 ) {
400 (true, _, _) => {
403 self.promote(
404 &client,
405 mz,
406 resources,
407 active_generation,
408 desired_generation,
409 resources_hash,
410 )
411 .await
412 }
413 (false, true, true) => {
415 if !mz.should_force_promote() {
427 if let Some(started) = mz.rollout_in_progress_since() {
428 let timeout = mz.rollout_request_timeout();
429 let elapsed = Timestamp::now().duration_since(started);
430 let timed_out = SignedDuration::try_from(timeout)
431 .is_ok_and(|timeout| elapsed >= timeout);
432 if timed_out {
433 warn!(
434 "rollout to generation {desired_generation} exceeded timeout, cancelling"
435 );
436 resources
439 .teardown_generation(&client, mz, next_generation)
440 .await?;
441 self.update_status(
442 &mz_api,
443 mz,
444 MaterializeStatus {
445 active_generation,
446 last_completed_rollout_request: mz
451 .requested_reconciliation_id(),
452 last_completed_rollout_environmentd_image_ref: status
453 .last_completed_rollout_environmentd_image_ref
454 .clone(),
455 resource_id: status.resource_id.clone(),
456 resources_hash: status.resources_hash.clone(),
457 last_completed_rollout_hash: None,
458 conditions: vec![Condition {
459 type_: "UpToDate".into(),
460 status: "False".into(),
461 last_transition_time: Time(Timestamp::now()),
462 message: format!(
463 "Cancelled rollout to generation \
464 {desired_generation} after it \
465 exceeded the rollout timeout of {}",
466 humantime::format_duration(timeout),
467 ),
468 observed_generation: mz.meta().generation,
469 reason: "RolloutTimeout".into(),
470 }],
471 },
472 active_generation != desired_generation,
473 )
474 .await?;
475 return Ok(None);
476 }
477 }
478 }
479
480 if !mz.within_upgrade_window() {
481 let last_completed_rollout_environmentd_image_ref =
482 status.last_completed_rollout_environmentd_image_ref;
483
484 self.update_status(
485 &mz_api,
486 mz,
487 MaterializeStatus {
488 active_generation,
489 last_completed_rollout_request: status.last_completed_rollout_request,
490 last_completed_rollout_environmentd_image_ref:
491 last_completed_rollout_environmentd_image_ref.clone(),
492 resource_id: status.resource_id,
493 resources_hash: status.resources_hash,
494 last_completed_rollout_hash: None,
495 conditions: vec![Condition {
496 type_: "UpToDate".into(),
497 status: "False".into(),
498 last_transition_time: Time(Timestamp::now()),
499 message: format!(
500 "Refusing to upgrade from {} to {}. \
501 More than one major version from \
502 last successful rollout. If coming \
503 from Self Managed 25.2, upgrade to \
504 materialize/environmentd:v0.147.20 \
505 first.",
506 last_completed_rollout_environmentd_image_ref
507 .expect("should be set if upgrade window check fails"),
508 &mz.spec.environmentd_image_ref,
509 ),
510 observed_generation: mz.meta().generation,
511 reason: "FailedDeploy".into(),
512 }],
513 },
514 active_generation != desired_generation,
515 )
516 .await?;
517 return Ok(None);
518 }
519
520 let mz = if mz.is_ready_to_promote(&resources_hash) {
532 mz
533 } else {
534 &self
535 .update_status(
536 &mz_api,
537 mz,
538 MaterializeStatus {
539 active_generation,
540 last_completed_rollout_request: status
545 .last_completed_rollout_request,
546 last_completed_rollout_environmentd_image_ref: status
547 .last_completed_rollout_environmentd_image_ref,
548 resource_id: status.resource_id.clone(),
549 resources_hash: String::new(),
550 last_completed_rollout_hash: None,
551 conditions: vec![Condition {
552 type_: "UpToDate".into(),
553 status: "Unknown".into(),
554 last_transition_time: Time(Timestamp::now()),
555 message: format!(
556 "Applying changes for generation {desired_generation}"
557 ),
558 observed_generation: mz.meta().generation,
559 reason: "Applying".into(),
560 }],
561 },
562 active_generation != desired_generation,
563 )
564 .await?
565 };
566 let status = mz.status();
567
568 if mz.spec.rollout_strategy
569 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
570 {
571 resources
575 .teardown_generation(&client, mz, active_generation)
576 .await?;
577 }
578
579 trace!("applying environment resources");
580 match resources
581 .apply(&client, mz.should_force_promote(), &mz.namespace())
582 .await
583 {
584 Ok(Some(action)) => {
585 trace!("new environment is not yet ready");
586 Ok(Some(action))
587 }
588 Ok(None) => {
589 if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
590 && !mz.should_force_promote()
591 {
592 trace!(
593 "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
594 );
595 self.update_status(
596 &mz_api,
597 mz,
598 MaterializeStatus {
599 active_generation,
600 last_completed_rollout_request: status
601 .last_completed_rollout_request,
602 last_completed_rollout_environmentd_image_ref: status
603 .last_completed_rollout_environmentd_image_ref,
604 resource_id: status.resource_id,
605 resources_hash,
606 last_completed_rollout_hash: None,
607 conditions: vec![Condition {
608 type_: "UpToDate".into(),
609 status: "Unknown".into(),
610 last_transition_time: Time(mz.up_to_date_transition_time(
616 "Unknown",
617 Timestamp::now(),
618 )),
619 message: format!(
620 "Ready to promote generation {desired_generation}"
621 ),
622 observed_generation: mz.meta().generation,
623 reason: "ReadyToPromote".into(),
624 }],
625 },
626 active_generation != desired_generation,
627 )
628 .await?;
629 return Ok(None);
630 }
631 self.update_status(
639 &mz_api,
640 mz,
641 MaterializeStatus {
642 active_generation,
643 last_completed_rollout_request: status
648 .last_completed_rollout_request,
649 last_completed_rollout_environmentd_image_ref: status
650 .last_completed_rollout_environmentd_image_ref,
651 resource_id: status.resource_id,
652 resources_hash: resources_hash.clone(),
653 last_completed_rollout_hash: None,
654 conditions: vec![Condition {
655 type_: "UpToDate".into(),
656 status: "Unknown".into(),
657 last_transition_time: Time(Timestamp::now()),
658 message: format!(
659 "Attempting to promote generation {desired_generation}"
660 ),
661 observed_generation: mz.meta().generation,
662 reason: "Promoting".into(),
663 }],
664 },
665 active_generation != desired_generation,
666 )
667 .await?;
668 self.promote(
669 &client,
670 mz,
671 resources,
672 active_generation,
673 desired_generation,
674 resources_hash,
675 )
676 .await
677 }
678 Err(e) => {
679 self.update_status(
680 &mz_api,
681 mz,
682 MaterializeStatus {
683 active_generation,
684 last_completed_rollout_request: status
689 .last_completed_rollout_request,
690 last_completed_rollout_environmentd_image_ref: status
691 .last_completed_rollout_environmentd_image_ref,
692 resource_id: status.resource_id,
693 resources_hash: status.resources_hash,
694 last_completed_rollout_hash: None,
695 conditions: vec![Condition {
696 type_: "UpToDate".into(),
697 status: "False".into(),
698 last_transition_time: Time(Timestamp::now()),
699 message: format!(
700 "Failed to apply changes for \
701 generation {desired_generation}: {e}"
702 ),
703 observed_generation: mz.meta().generation,
704 reason: "FailedDeploy".into(),
705 }],
706 },
707 active_generation != desired_generation,
708 )
709 .await?;
710 Err(e)
711 }
712 }
713 }
714 (false, true, false) => {
716 let mut needs_update = mz.conditions_need_update();
717 if mz.update_in_progress() {
718 resources
719 .teardown_generation(&client, mz, next_generation)
720 .await?;
721 needs_update = true;
722 }
723 if needs_update {
724 self.update_status(
725 &mz_api,
726 mz,
727 MaterializeStatus {
728 active_generation,
729 last_completed_rollout_request: mz.requested_reconciliation_id(),
730 last_completed_rollout_environmentd_image_ref: status
731 .last_completed_rollout_environmentd_image_ref,
732 resource_id: status.resource_id.clone(),
733 resources_hash: status.resources_hash,
734 last_completed_rollout_hash: None,
735 conditions: vec![Condition {
736 type_: "UpToDate".into(),
737 status: "False".into(),
738 last_transition_time: Time(Timestamp::now()),
739 message: format!(
740 "Changes detected, waiting for approval for generation {desired_generation}"
741 ),
742 observed_generation: mz.meta().generation,
743 reason: "WaitingForApproval".into(),
744 }],
745 },
746 active_generation != desired_generation,
747 )
748 .await?;
749 }
750 debug!("changes detected, waiting for approval");
751 Ok(None)
752 }
753 (false, false, _) => {
755 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
760 if mz.update_in_progress() {
761 resources
762 .teardown_generation(&client, mz, next_generation)
763 .await?;
764 needs_update = true;
765 }
766 if needs_update {
767 self.update_status(
768 &mz_api,
769 mz,
770 MaterializeStatus {
771 active_generation,
772 last_completed_rollout_request: mz.requested_reconciliation_id(),
773 last_completed_rollout_environmentd_image_ref: status
774 .last_completed_rollout_environmentd_image_ref,
775 resource_id: status.resource_id.clone(),
776 resources_hash: status.resources_hash,
777 last_completed_rollout_hash: None,
778 conditions: vec![Condition {
779 type_: "UpToDate".into(),
780 status: "True".into(),
781 last_transition_time: Time(Timestamp::now()),
782 message: format!(
783 "No changes found from generation {active_generation}"
784 ),
785 observed_generation: mz.meta().generation,
786 reason: "Applied".into(),
787 }],
788 },
789 active_generation != desired_generation,
790 )
791 .await?;
792 }
793 debug!("no changes");
794 Ok(None)
795 }
796 }?;
797
798 if let Some(action) = result {
799 return Ok(Some(action));
800 }
801
802 if self.config.create_balancers {
807 let balancer = Balancer {
808 metadata: mz.managed_resource_meta(mz.name_unchecked()),
809 spec: BalancerSpec {
810 balancerd_image_ref: matching_image_from_environmentd_image_ref(
811 mz.active_environmentd_image_ref(),
812 "balancerd",
813 None,
814 ),
815 resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
816 replicas: Some(mz.balancerd_replicas()),
817 external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
818 internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
819 pod_annotations: mz.spec.pod_annotations.clone(),
820 pod_labels: mz.spec.pod_labels.clone(),
821 static_routing: Some(
822 mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
823 environmentd_namespace: mz.namespace(),
824 environmentd_service_name: mz.environmentd_service_name(),
825 },
826 ),
827 frontegg_routing: None,
828 resource_id: Some(status.resource_id.clone()),
829 },
830 status: None,
831 };
832 let balancer = apply_resource(&balancer_api, &balancer).await?;
833 result = wait_for_balancer(&balancer)?;
834 } else {
835 delete_resource(&balancer_api, &mz.name_unchecked()).await?;
836 }
837
838 if let Some(action) = result {
839 return Ok(Some(action));
840 }
841
842 if self.config.create_console {
846 let active_environmentd_image_ref = mz.active_environmentd_image_ref();
847 let environmentd_image_tag =
848 parse_image_tag(active_environmentd_image_ref).unwrap_or("latest");
849 let console_image_tag = self
850 .config
851 .console_image_tag_map
852 .iter()
853 .find(|kv| kv.key == environmentd_image_tag)
854 .map(|kv| kv.value.clone())
855 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
856 let console = Console {
857 metadata: mz.managed_resource_meta(mz.name_unchecked()),
858 spec: ConsoleSpec {
859 console_image_ref: matching_image_from_environmentd_image_ref(
860 active_environmentd_image_ref,
861 "console",
862 Some(&console_image_tag),
863 ),
864 resource_requirements: mz.spec.console_resource_requirements.clone(),
865 replicas: Some(mz.console_replicas()),
866 external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
867 pod_annotations: mz.spec.pod_annotations.clone(),
868 pod_labels: mz.spec.pod_labels.clone(),
869 balancerd: BalancerdRef {
870 service_name: mz.balancerd_service_name(),
871 namespace: mz.namespace(),
872 scheme: if issuer_ref_defined(
873 &self.config.default_certificate_specs.balancerd_external,
874 &mz.spec.balancerd_external_certificate_spec,
875 ) {
876 HttpConnectionScheme::Https
877 } else {
878 HttpConnectionScheme::Http
879 },
880 dns_names: resolved_dns_names(
881 &self.config.default_certificate_specs.balancerd_external,
882 &mz.spec.balancerd_external_certificate_spec,
883 ),
884 },
885 authenticator_kind: mz.spec.authenticator_kind,
886 resource_id: Some(status.resource_id),
887 },
888 status: None,
889 };
890 apply_resource(&console_api, &console).await?;
891 } else {
892 delete_resource(&console_api, &mz.name_unchecked()).await?;
893 }
894
895 Ok(result)
896 }
897
898 #[instrument(fields(organization_name=mz.name_unchecked()))]
899 async fn cleanup(
900 &self,
901 _client: Client,
902 mz: &Self::Resource,
903 _metadata: &mut TraceMetadata,
904 ) -> Result<Option<Action>, Self::Error> {
905 self.set_needs_update(mz, false);
906
907 Ok(None)
908 }
909}
910
911fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
912 if let Some(conditions) = balancer
913 .status
914 .as_ref()
915 .map(|status| status.conditions.as_slice())
916 {
917 if conditions
918 .iter()
919 .any(|condition| condition.type_ == "Ready" && condition.status == "True")
920 {
921 return Ok(None);
922 }
923 }
924
925 Ok(Some(Action::requeue(Duration::from_secs(1))))
926}