mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    sync::{Arc, Mutex},
13    time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20    apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{
23    Api, Client, Resource, ResourceExt,
24    api::PostParams,
25    runtime::{controller::Action, reflector},
26};
27use tracing::{debug, trace};
28use uuid::Uuid;
29
30use crate::{
31    Error,
32    controller::materialize::environmentd::V161,
33    k8s::{apply_resource, delete_resource, make_reflector},
34    matching_image_from_environmentd_image_ref,
35    metrics::Metrics,
36    tls::{DefaultCertificateSpecs, issuer_ref_defined},
37};
38use mz_cloud_provider::CloudProvider;
39use mz_cloud_resources::crd::{
40    ManagedResource,
41    balancer::v1alpha1::{Balancer, BalancerSpec},
42    console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
43    materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
44};
45use mz_license_keys::validate;
46use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
47use mz_orchestrator_tracing::TracingCliArgs;
48use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
49
50pub mod environmentd;
51
52pub struct Config {
53    pub cloud_provider: CloudProvider,
54    pub region: String,
55    pub create_balancers: bool,
56    pub create_console: bool,
57    pub helm_chart_version: Option<String>,
58    pub secrets_controller: String,
59    pub collect_pod_metrics: bool,
60    pub enable_prometheus_scrape_annotations: bool,
61
62    pub segment_api_key: Option<String>,
63    pub segment_client_side: bool,
64
65    pub console_image_tag_default: String,
66    pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
67
68    pub aws_account_id: Option<String>,
69    pub environmentd_iam_role_arn: Option<String>,
70    pub environmentd_connection_role_arn: Option<String>,
71    pub aws_secrets_controller_tags: Vec<String>,
72    pub environmentd_availability_zones: Option<Vec<String>>,
73
74    pub ephemeral_volume_class: Option<String>,
75    pub scheduler_name: Option<String>,
76    pub enable_security_context: bool,
77    pub enable_internal_statement_logging: bool,
78    pub disable_statement_logging: bool,
79
80    pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
81    pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
82    pub environmentd_affinity: Option<Affinity>,
83    pub environmentd_tolerations: Option<Vec<Toleration>>,
84    pub environmentd_default_resources: Option<ResourceRequirements>,
85    pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
86    pub clusterd_affinity: Option<Affinity>,
87    pub clusterd_tolerations: Option<Vec<Toleration>>,
88    pub image_pull_policy: KubernetesImagePullPolicy,
89    pub network_policies_internal_enabled: bool,
90    pub network_policies_ingress_enabled: bool,
91    pub network_policies_ingress_cidrs: Vec<String>,
92    pub network_policies_egress_enabled: bool,
93    pub network_policies_egress_cidrs: Vec<String>,
94
95    pub environmentd_cluster_replica_sizes: Option<String>,
96    pub bootstrap_default_cluster_replica_size: Option<String>,
97    pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
98    pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
99    pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
100    pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
101    pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
102    pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
103    pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
104    pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
105    pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
106
107    pub environmentd_allowed_origins: Vec<HeaderValue>,
108    pub internal_console_proxy_url: String,
109
110    pub environmentd_sql_port: u16,
111    pub environmentd_http_port: u16,
112    pub environmentd_internal_sql_port: u16,
113    pub environmentd_internal_http_port: u16,
114    pub environmentd_internal_persist_pubsub_port: u16,
115
116    pub default_certificate_specs: DefaultCertificateSpecs,
117
118    pub disable_license_key_checks: bool,
119
120    pub tracing: TracingCliArgs,
121    pub orchestratord_namespace: String,
122}
123
124pub struct Context {
125    config: Config,
126    metrics: Arc<Metrics>,
127    materializes: reflector::Store<Materialize>,
128    needs_update: Arc<Mutex<BTreeSet<String>>>,
129}
130
131impl Context {
132    pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
133        if config.cloud_provider == CloudProvider::Aws {
134            assert!(
135                config.aws_account_id.is_some(),
136                "--aws-account-id is required when using --cloud-provider=aws"
137            );
138        }
139
140        Self {
141            config,
142            metrics,
143            materializes: make_reflector(client.clone()).await,
144            needs_update: Default::default(),
145        }
146    }
147
148    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
149        let mut needs_update_set = self.needs_update.lock().unwrap();
150        if needs_update {
151            needs_update_set.insert(mz.name_unchecked());
152        } else {
153            needs_update_set.remove(&mz.name_unchecked());
154        }
155        self.metrics
156            .environmentd_needs_update
157            .set(u64::cast_from(needs_update_set.len()));
158    }
159
160    async fn update_status(
161        &self,
162        mz_api: &Api<Materialize>,
163        mz: &Materialize,
164        status: MaterializeStatus,
165        needs_update: bool,
166    ) -> Result<Materialize, kube::Error> {
167        self.set_needs_update(mz, needs_update);
168
169        let mut new_mz = mz.clone();
170        if !mz
171            .status
172            .as_ref()
173            .map_or(true, |mz_status| mz_status.needs_update(&status))
174        {
175            return Ok(new_mz);
176        }
177
178        new_mz.status = Some(status);
179        mz_api
180            .replace_status(
181                &mz.name_unchecked(),
182                &PostParams::default(),
183                serde_json::to_vec(&new_mz).unwrap(),
184            )
185            .await
186    }
187
188    async fn promote(
189        &self,
190        client: &Client,
191        mz: &Materialize,
192        resources: environmentd::Resources,
193        active_generation: u64,
194        desired_generation: u64,
195        resources_hash: String,
196    ) -> Result<Option<Action>, Error> {
197        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
198            return Ok(Some(action));
199        }
200        resources
201            .teardown_generation(client, mz, active_generation)
202            .await?;
203        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
204        self.update_status(
205            &mz_api,
206            mz,
207            MaterializeStatus {
208                active_generation: desired_generation,
209                last_completed_rollout_request: mz.requested_reconciliation_id(),
210                last_completed_rollout_environmentd_image_ref: Some(
211                    mz.spec.environmentd_image_ref.clone(),
212                ),
213                resource_id: mz.status().resource_id,
214                resources_hash,
215                conditions: vec![Condition {
216                    type_: "UpToDate".into(),
217                    status: "True".into(),
218                    last_transition_time: Time(chrono::offset::Utc::now()),
219                    message: format!(
220                        "Successfully applied changes for generation {desired_generation}"
221                    ),
222                    observed_generation: mz.meta().generation,
223                    reason: "Applied".into(),
224                }],
225            },
226            false,
227        )
228        .await?;
229        Ok(None)
230    }
231
232    fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
233        if mz.spec.environment_id.is_nil() {
234            // this is always a bug - we delay doing this check until the
235            // resource should have an environment id set, either from the
236            // license key, or explicitly given, or randomly defaulted.
237            return Err(Error::Anyhow(anyhow::anyhow!(
238                "trying to reconcile a materialize resource with no environment id - this is a bug!"
239            )));
240        }
241
242        for existing_mz in self.materializes.state() {
243            if existing_mz.spec.environment_id == mz.spec.environment_id
244                && existing_mz.metadata.uid != mz.metadata.uid
245            {
246                return Err(Error::Anyhow(anyhow::anyhow!(
247                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
248                    mz.namespace(),
249                    mz.name_unchecked(),
250                    existing_mz.namespace(),
251                    existing_mz.name_unchecked(),
252                )));
253            }
254        }
255
256        Ok(())
257    }
258}
259
260#[async_trait::async_trait]
261impl k8s_controller::Context for Context {
262    type Resource = Materialize;
263    type Error = Error;
264
265    const FINALIZER_NAME: Option<&'static str> =
266        Some("orchestratord.materialize.cloud/materialize");
267
268    #[instrument(fields(organization_name=mz.name_unchecked()))]
269    async fn apply(
270        &self,
271        client: Client,
272        mz: &Self::Resource,
273    ) -> Result<Option<Action>, Self::Error> {
274        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
275        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
276        let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
277        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
278
279        let status = mz.status();
280        if mz.status.is_none() {
281            self.update_status(&mz_api, mz, status, true).await?;
282            // Updating the status should trigger a reconciliation
283            // which will include a status this time.
284            return Ok(None);
285        }
286
287        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
288        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
289            .data
290            .as_ref()
291            .and_then(|data| data.get("license_key"))
292        {
293            let license_key = validate(
294                str::from_utf8(&license_key.0)
295                    .context("invalid utf8")?
296                    .trim(),
297            )?;
298            let environment_id = license_key
299                .environment_id
300                .parse()
301                .context("invalid environment id in license key")?;
302            Some(environment_id)
303        } else {
304            if mz.meets_minimum_version(&V161) {
305                return Err(Error::Anyhow(anyhow::anyhow!(
306                    "license_key is required when running in kubernetes",
307                )));
308            } else {
309                None
310            }
311        };
312
313        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
314            let mut mz = mz.clone();
315            if mz.spec.request_rollout.is_nil() {
316                mz.spec.request_rollout = Uuid::new_v4();
317            }
318            if mz.spec.environment_id.is_nil() {
319                if let Some(environment_id) = license_key_environment_id {
320                    if environment_id.is_nil() {
321                        // this makes it easier to use a license key in
322                        // development with no environment id set
323                        mz.spec.environment_id = Uuid::new_v4();
324                    } else {
325                        mz.spec.environment_id = environment_id;
326                    }
327                } else {
328                    if mz.meets_minimum_version(&V161) {
329                        return Err(Error::Anyhow(anyhow::anyhow!(
330                            "environmentId is not set in materialize resource {}/{} but no license key was given",
331                            mz.namespace(),
332                            mz.name_unchecked()
333                        )));
334                    } else {
335                        mz.spec.environment_id = Uuid::new_v4();
336                    }
337                }
338            }
339            mz_api
340                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
341                .await?;
342            // Updating the spec should also trigger a reconciliation.
343            // We can't do that as part of the above check because you can't
344            // update both the spec and the status in a single api call.
345            return Ok(None);
346        }
347
348        if let Some(environment_id) = license_key_environment_id {
349            // we still allow a nil environment id in the license key to be
350            // accepted for any provided environment id, to support cloud
351            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
352                return Err(Error::Anyhow(anyhow::anyhow!(
353                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
354                    mz.namespace(),
355                    mz.name_unchecked(),
356                    environment_id,
357                )));
358            }
359        }
360
361        self.check_environment_id_conflicts(mz)?;
362
363        // we compare the hash against the environment resources generated
364        // for the current active generation, since that's what we expect to
365        // have been applied earlier, but we don't want to use these
366        // environment resources because when we apply them, we want to apply
367        // them with data that uses the new generation
368        let active_resources =
369            environmentd::Resources::new(&self.config, mz, status.active_generation);
370        let has_current_changes = status.resources_hash != active_resources.generate_hash();
371        let active_generation = status.active_generation;
372        let next_generation = active_generation + 1;
373        let desired_generation = if has_current_changes {
374            next_generation
375        } else {
376            active_generation
377        };
378
379        // here we regenerate the environment resources using the
380        // same inputs except with an updated generation
381        let resources = environmentd::Resources::new(&self.config, mz, desired_generation);
382        let resources_hash = resources.generate_hash();
383
384        let mut result = match (
385            mz.is_promoting(),
386            has_current_changes,
387            mz.rollout_requested(),
388        ) {
389            // If we're in status promoting, we MUST promote now.
390            // We don't know if we successfully promoted or not yet.
391            (true, _, _) => {
392                self.promote(
393                    &client,
394                    mz,
395                    resources,
396                    active_generation,
397                    desired_generation,
398                    resources_hash,
399                )
400                .await
401            }
402            // There are changes pending, and we want to apply them.
403            (false, true, true) => {
404                // we remove the environment resources hash annotation here
405                // because if we fail halfway through applying the resources,
406                // things will be in an inconsistent state, and we don't want
407                // to allow the possibility of the user making a second
408                // change which reverts to the original state and then
409                // skipping retrying this apply, since that would leave
410                // things in a permanently inconsistent state.
411                // note that environment.spec will be empty here after
412                // replace_status, but this is fine because we already
413                // extracted all of the information we want from the spec
414                // earlier.
415                let mz = if mz.is_ready_to_promote(&resources_hash) {
416                    mz
417                } else {
418                    &self
419                        .update_status(
420                            &mz_api,
421                            mz,
422                            MaterializeStatus {
423                                active_generation,
424                                // don't update the reconciliation id yet,
425                                // because the rollout hasn't yet completed. if
426                                // we fail later on, we want to ensure that the
427                                // rollout gets retried.
428                                last_completed_rollout_request: status
429                                    .last_completed_rollout_request,
430                                last_completed_rollout_environmentd_image_ref: status
431                                    .last_completed_rollout_environmentd_image_ref,
432                                resource_id: status.resource_id.clone(),
433                                resources_hash: String::new(),
434                                conditions: vec![Condition {
435                                    type_: "UpToDate".into(),
436                                    status: "Unknown".into(),
437                                    last_transition_time: Time(chrono::offset::Utc::now()),
438                                    message: format!(
439                                        "Applying changes for generation {desired_generation}"
440                                    ),
441                                    observed_generation: mz.meta().generation,
442                                    reason: "Applying".into(),
443                                }],
444                            },
445                            active_generation != desired_generation,
446                        )
447                        .await?
448                };
449                let status = mz.status();
450
451                if !mz.within_upgrade_window() {
452                    let last_completed_rollout_environmentd_image_ref =
453                        status.last_completed_rollout_environmentd_image_ref;
454
455                    self.update_status(
456                        &mz_api,
457                        mz,
458                        MaterializeStatus {
459                            active_generation,
460                            last_completed_rollout_request: status.last_completed_rollout_request,
461                            last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
462                            resource_id: status.resource_id,
463                            resources_hash: status.resources_hash,
464                            conditions: vec![Condition {
465                                type_: "UpToDate".into(),
466                                status: "False".into(),
467                                last_transition_time: Time(chrono::offset::Utc::now()),
468                                message: format!(
469                        "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
470                        last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
471                        &mz.spec.environmentd_image_ref,
472                    ),
473                                observed_generation: mz.meta().generation,
474                                reason: "FailedDeploy".into(),
475                            }],
476                        },
477                        active_generation != desired_generation,
478                    )
479                    .await?;
480                    return Ok(None);
481                }
482
483                if mz.spec.rollout_strategy
484                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
485                {
486                    // The only reason someone would choose this strategy is if they didn't have
487                    // space for the two generations of pods.
488                    // Lets make room for the new ones by deleting the old generation.
489                    resources
490                        .teardown_generation(&client, mz, active_generation)
491                        .await?;
492                }
493
494                trace!("applying environment resources");
495                match resources
496                    .apply(&client, mz.should_force_promote(), &mz.namespace())
497                    .await
498                {
499                    Ok(Some(action)) => {
500                        trace!("new environment is not yet ready");
501                        Ok(Some(action))
502                    }
503                    Ok(None) => {
504                        if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
505                            && !mz.should_force_promote()
506                        {
507                            trace!(
508                                "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
509                            );
510                            self.update_status(
511                                &mz_api,
512                                mz,
513                                MaterializeStatus {
514                                    active_generation,
515                                    last_completed_rollout_request: status
516                                        .last_completed_rollout_request,
517                                    last_completed_rollout_environmentd_image_ref: status
518                                        .last_completed_rollout_environmentd_image_ref,
519                                    resource_id: status.resource_id,
520                                    resources_hash,
521                                    conditions: vec![Condition {
522                                        type_: "UpToDate".into(),
523                                        status: "Unknown".into(),
524                                        last_transition_time: Time(chrono::offset::Utc::now()),
525                                        message: format!(
526                                            "Ready to promote generation {desired_generation}"
527                                        ),
528                                        observed_generation: mz.meta().generation,
529                                        reason: "ReadyToPromote".into(),
530                                    }],
531                                },
532                                active_generation != desired_generation,
533                            )
534                            .await?;
535                            return Ok(None);
536                        }
537                        // do this last, so that we keep traffic pointing at
538                        // the previous environmentd until the new one is
539                        // fully ready
540
541                        // Update the status before calling promote, so that we know
542                        // we've crossed the point of no return.
543                        // Once we see this status, we must promote without taking other actions.
544                        self.update_status(
545                            &mz_api,
546                            mz,
547                            MaterializeStatus {
548                                active_generation,
549                                // don't update the reconciliation id yet,
550                                // because the rollout hasn't yet completed. if
551                                // we fail later on, we want to ensure that the
552                                // rollout gets retried.
553                                last_completed_rollout_request: status
554                                    .last_completed_rollout_request,
555                                last_completed_rollout_environmentd_image_ref: status
556                                    .last_completed_rollout_environmentd_image_ref,
557                                resource_id: status.resource_id,
558                                resources_hash: resources_hash.clone(),
559                                conditions: vec![Condition {
560                                    type_: "UpToDate".into(),
561                                    status: "Unknown".into(),
562                                    last_transition_time: Time(chrono::offset::Utc::now()),
563                                    message: format!(
564                                        "Attempting to promote generation {desired_generation}"
565                                    ),
566                                    observed_generation: mz.meta().generation,
567                                    reason: "Promoting".into(),
568                                }],
569                            },
570                            active_generation != desired_generation,
571                        )
572                        .await?;
573                        self.promote(
574                            &client,
575                            mz,
576                            resources,
577                            active_generation,
578                            desired_generation,
579                            resources_hash,
580                        )
581                        .await
582                    }
583                    Err(e) => {
584                        self.update_status(
585                            &mz_api,
586                            mz,
587                            MaterializeStatus {
588                                active_generation,
589                                // also don't update the reconciliation id
590                                // here, because there was an error during
591                                // the rollout and we want to ensure it gets
592                                // retried.
593                                last_completed_rollout_request: status.last_completed_rollout_request,
594                                last_completed_rollout_environmentd_image_ref: status
595                                    .last_completed_rollout_environmentd_image_ref,
596                                resource_id: status.resource_id,
597                                resources_hash: status.resources_hash,
598                                conditions: vec![Condition {
599                                    type_: "UpToDate".into(),
600                                    status: "False".into(),
601                                    last_transition_time: Time(chrono::offset::Utc::now()),
602                                    message: format!(
603                                        "Failed to apply changes for generation {desired_generation}: {e}"
604                                    ),
605                                    observed_generation: mz.meta().generation,
606                                    reason: "FailedDeploy".into(),
607                                }],
608                            },
609                            active_generation != desired_generation,
610                        )
611                        .await?;
612                        Err(e)
613                    }
614                }
615            }
616            // There are changes pending, but we don't want to apply them yet.
617            (false, true, false) => {
618                let mut needs_update = mz.conditions_need_update();
619                if mz.update_in_progress() {
620                    resources
621                        .teardown_generation(&client, mz, next_generation)
622                        .await?;
623                    needs_update = true;
624                }
625                if needs_update {
626                    self.update_status(
627                        &mz_api,
628                        mz,
629                        MaterializeStatus {
630                            active_generation,
631                            last_completed_rollout_request: mz.requested_reconciliation_id(),
632                            last_completed_rollout_environmentd_image_ref: status
633                                .last_completed_rollout_environmentd_image_ref,
634                            resource_id: status.resource_id.clone(),
635                            resources_hash: status.resources_hash,
636                            conditions: vec![Condition {
637                                type_: "UpToDate".into(),
638                                status: "False".into(),
639                                last_transition_time: Time(chrono::offset::Utc::now()),
640                                message: format!(
641                                    "Changes detected, waiting for approval for generation {desired_generation}"
642                                ),
643                                observed_generation: mz.meta().generation,
644                                reason: "WaitingForApproval".into(),
645                            }],
646                        },
647                        active_generation != desired_generation,
648                    )
649                    .await?;
650                }
651                debug!("changes detected, waiting for approval");
652                Ok(None)
653            }
654            // No changes pending, but we might need to clean up a partially applied rollout.
655            (false, false, _) => {
656                // this can happen if we update the environment, but then revert
657                // that update before the update was deployed. in this case, we
658                // don't want the environment to still show up as
659                // WaitingForApproval.
660                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
661                if mz.update_in_progress() {
662                    resources
663                        .teardown_generation(&client, mz, next_generation)
664                        .await?;
665                    needs_update = true;
666                }
667                if needs_update {
668                    self.update_status(
669                        &mz_api,
670                        mz,
671                        MaterializeStatus {
672                            active_generation,
673                            last_completed_rollout_request: mz.requested_reconciliation_id(),
674                            last_completed_rollout_environmentd_image_ref: status
675                                .last_completed_rollout_environmentd_image_ref,
676                            resource_id: status.resource_id.clone(),
677                            resources_hash: status.resources_hash,
678                            conditions: vec![Condition {
679                                type_: "UpToDate".into(),
680                                status: "True".into(),
681                                last_transition_time: Time(chrono::offset::Utc::now()),
682                                message: format!(
683                                    "No changes found from generation {active_generation}"
684                                ),
685                                observed_generation: mz.meta().generation,
686                                reason: "Applied".into(),
687                            }],
688                        },
689                        active_generation != desired_generation,
690                    )
691                    .await?;
692                }
693                debug!("no changes");
694                Ok(None)
695            }
696        }?;
697
698        if let Some(action) = result {
699            return Ok(Some(action));
700        }
701
702        // balancers rely on the environmentd service existing, which is
703        // enforced by the environmentd rollout process being able to call
704        // into the promotion endpoint
705
706        if self.config.create_balancers {
707            let balancer = Balancer {
708                metadata: mz.managed_resource_meta(mz.name_unchecked()),
709                spec: BalancerSpec {
710                    balancerd_image_ref: matching_image_from_environmentd_image_ref(
711                        &mz.spec.environmentd_image_ref,
712                        "balancerd",
713                        None,
714                    ),
715                    resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
716                    replicas: Some(mz.balancerd_replicas()),
717                    external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
718                    internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
719                    pod_annotations: mz.spec.pod_annotations.clone(),
720                    pod_labels: mz.spec.pod_labels.clone(),
721                    static_routing: Some(
722                        mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
723                            environmentd_namespace: mz.namespace(),
724                            environmentd_service_name: mz.environmentd_service_name(),
725                        },
726                    ),
727                    frontegg_routing: None,
728                    resource_id: Some(status.resource_id.clone()),
729                },
730                status: None,
731            };
732            let balancer = apply_resource(&balancer_api, &balancer).await?;
733            result = wait_for_balancer(&balancer)?;
734        } else {
735            delete_resource(&balancer_api, &mz.name_unchecked()).await?;
736        }
737
738        if let Some(action) = result {
739            return Ok(Some(action));
740        }
741
742        // and the console relies on the balancer service existing, which is
743        // enforced by wait_for_balancer
744
745        if self.config.create_console {
746            let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
747            else {
748                return Err(Error::Anyhow(anyhow::anyhow!(
749                    "failed to parse environmentd image ref: {}",
750                    mz.spec.environmentd_image_ref
751                )));
752            };
753            let console_image_tag = self
754                .config
755                .console_image_tag_map
756                .iter()
757                .find(|kv| kv.key == environmentd_image_tag)
758                .map(|kv| kv.value.clone())
759                .unwrap_or_else(|| self.config.console_image_tag_default.clone());
760            let console = Console {
761                metadata: mz.managed_resource_meta(mz.name_unchecked()),
762                spec: ConsoleSpec {
763                    console_image_ref: matching_image_from_environmentd_image_ref(
764                        &mz.spec.environmentd_image_ref,
765                        "console",
766                        Some(&console_image_tag),
767                    ),
768                    resource_requirements: mz.spec.console_resource_requirements.clone(),
769                    replicas: Some(mz.console_replicas()),
770                    external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
771                    pod_annotations: mz.spec.pod_annotations.clone(),
772                    pod_labels: mz.spec.pod_labels.clone(),
773                    balancerd: BalancerdRef {
774                        service_name: mz.balancerd_service_name(),
775                        namespace: mz.namespace(),
776                        scheme: if issuer_ref_defined(
777                            &self.config.default_certificate_specs.balancerd_external,
778                            &mz.spec.balancerd_external_certificate_spec,
779                        ) {
780                            HttpConnectionScheme::Https
781                        } else {
782                            HttpConnectionScheme::Http
783                        },
784                    },
785                    authenticator_kind: mz.spec.authenticator_kind,
786                    resource_id: Some(status.resource_id),
787                },
788                status: None,
789            };
790            apply_resource(&console_api, &console).await?;
791        } else {
792            delete_resource(&console_api, &mz.name_unchecked()).await?;
793        }
794
795        Ok(result)
796    }
797
798    #[instrument(fields(organization_name=mz.name_unchecked()))]
799    async fn cleanup(
800        &self,
801        _client: Client,
802        mz: &Self::Resource,
803    ) -> Result<Option<Action>, Self::Error> {
804        self.set_needs_update(mz, false);
805
806        Ok(None)
807    }
808}
809
810fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
811    if let Some(conditions) = balancer
812        .status
813        .as_ref()
814        .map(|status| status.conditions.as_slice())
815    {
816        if conditions
817            .iter()
818            .any(|condition| condition.type_ == "Ready" && condition.status == "True")
819        {
820            return Ok(None);
821        }
822    }
823
824    Ok(Some(Action::requeue(Duration::from_secs(1))))
825}