Skip to main content

mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    sync::{Arc, Mutex},
13    time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20    apimachinery::pkg::apis::meta::v1::{Condition, Time},
21    jiff::Timestamp,
22};
23use kube::{
24    Api, Client, Resource, ResourceExt,
25    api::PostParams,
26    runtime::{controller::Action, reflector},
27};
28use tracing::{debug, trace};
29use uuid::Uuid;
30
31use crate::{
32    Error,
33    controller::materialize::generation::V161,
34    k8s::{apply_resource, delete_resource, make_reflector},
35    matching_image_from_environmentd_image_ref,
36    metrics::Metrics,
37    tls::{DefaultCertificateSpecs, issuer_ref_defined},
38};
39use mz_cloud_provider::CloudProvider;
40use mz_cloud_resources::crd::{
41    ManagedResource,
42    balancer::v1alpha1::{Balancer, BalancerSpec},
43    console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
44    materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
45};
46use mz_license_keys::validate;
47use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
48use mz_orchestrator_tracing::TracingCliArgs;
49use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
50
51pub mod generation;
52pub mod global;
53
54pub struct Config {
55    pub cloud_provider: CloudProvider,
56    pub region: String,
57    pub create_balancers: bool,
58    pub create_console: bool,
59    pub helm_chart_version: Option<String>,
60    pub secrets_controller: String,
61    pub collect_pod_metrics: bool,
62    pub enable_prometheus_scrape_annotations: bool,
63
64    pub segment_api_key: Option<String>,
65    pub segment_client_side: bool,
66
67    pub console_image_tag_default: String,
68    pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
69
70    pub aws_account_id: Option<String>,
71    pub environmentd_iam_role_arn: Option<String>,
72    pub environmentd_connection_role_arn: Option<String>,
73    pub aws_secrets_controller_tags: Vec<String>,
74    pub environmentd_availability_zones: Option<Vec<String>>,
75
76    pub ephemeral_volume_class: Option<String>,
77    pub scheduler_name: Option<String>,
78    pub enable_security_context: bool,
79    pub enable_internal_statement_logging: bool,
80    pub disable_statement_logging: bool,
81
82    pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
83    pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
84    pub environmentd_affinity: Option<Affinity>,
85    pub environmentd_tolerations: Option<Vec<Toleration>>,
86    pub environmentd_default_resources: Option<ResourceRequirements>,
87    pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
88    pub clusterd_affinity: Option<Affinity>,
89    pub clusterd_tolerations: Option<Vec<Toleration>>,
90    pub image_pull_policy: KubernetesImagePullPolicy,
91    pub network_policies_internal_enabled: bool,
92    pub network_policies_ingress_enabled: bool,
93    pub network_policies_ingress_cidrs: Vec<String>,
94    pub network_policies_egress_enabled: bool,
95    pub network_policies_egress_cidrs: Vec<String>,
96
97    pub environmentd_cluster_replica_sizes: Option<String>,
98    pub bootstrap_default_cluster_replica_size: Option<String>,
99    pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
100    pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
101    pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
102    pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
103    pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
104    pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
105    pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
106    pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
107    pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
108
109    pub environmentd_allowed_origins: Vec<HeaderValue>,
110    pub internal_console_proxy_url: String,
111
112    pub environmentd_sql_port: u16,
113    pub environmentd_http_port: u16,
114    pub environmentd_internal_sql_port: u16,
115    pub environmentd_internal_http_port: u16,
116    pub environmentd_internal_persist_pubsub_port: u16,
117
118    pub default_certificate_specs: DefaultCertificateSpecs,
119
120    pub disable_license_key_checks: bool,
121
122    pub tracing: TracingCliArgs,
123    pub orchestratord_namespace: String,
124}
125
126pub struct Context {
127    config: Config,
128    metrics: Arc<Metrics>,
129    materializes: reflector::Store<Materialize>,
130    needs_update: Arc<Mutex<BTreeSet<String>>>,
131}
132
133impl Context {
134    pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
135        if config.cloud_provider == CloudProvider::Aws {
136            assert!(
137                config.aws_account_id.is_some(),
138                "--aws-account-id is required when using --cloud-provider=aws"
139            );
140        }
141
142        Self {
143            config,
144            metrics,
145            materializes: make_reflector(client.clone()).await,
146            needs_update: Default::default(),
147        }
148    }
149
150    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
151        let mut needs_update_set = self.needs_update.lock().unwrap();
152        if needs_update {
153            needs_update_set.insert(mz.name_unchecked());
154        } else {
155            needs_update_set.remove(&mz.name_unchecked());
156        }
157        self.metrics
158            .environmentd_needs_update
159            .set(u64::cast_from(needs_update_set.len()));
160    }
161
162    async fn update_status(
163        &self,
164        mz_api: &Api<Materialize>,
165        mz: &Materialize,
166        status: MaterializeStatus,
167        needs_update: bool,
168    ) -> Result<Materialize, kube::Error> {
169        self.set_needs_update(mz, needs_update);
170
171        let mut new_mz = mz.clone();
172        if !mz
173            .status
174            .as_ref()
175            .map_or(true, |mz_status| mz_status.needs_update(&status))
176        {
177            return Ok(new_mz);
178        }
179
180        new_mz.status = Some(status);
181        mz_api
182            .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
183            .await
184    }
185
186    async fn promote(
187        &self,
188        client: &Client,
189        mz: &Materialize,
190        resources: generation::Resources,
191        active_generation: u64,
192        desired_generation: u64,
193        resources_hash: String,
194    ) -> Result<Option<Action>, Error> {
195        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
196            return Ok(Some(action));
197        }
198        resources
199            .teardown_generation(client, mz, active_generation)
200            .await?;
201        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
202        self.update_status(
203            &mz_api,
204            mz,
205            MaterializeStatus {
206                active_generation: desired_generation,
207                last_completed_rollout_request: mz.requested_reconciliation_id(),
208                last_completed_rollout_environmentd_image_ref: Some(
209                    mz.spec.environmentd_image_ref.clone(),
210                ),
211                resource_id: mz.status().resource_id,
212                resources_hash,
213                conditions: vec![Condition {
214                    type_: "UpToDate".into(),
215                    status: "True".into(),
216                    last_transition_time: Time(Timestamp::now()),
217                    message: format!(
218                        "Successfully applied changes for generation {desired_generation}"
219                    ),
220                    observed_generation: mz.meta().generation,
221                    reason: "Applied".into(),
222                }],
223            },
224            false,
225        )
226        .await?;
227        Ok(None)
228    }
229
230    fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
231        if mz.spec.environment_id.is_nil() {
232            // this is always a bug - we delay doing this check until the
233            // resource should have an environment id set, either from the
234            // license key, or explicitly given, or randomly defaulted.
235            return Err(Error::Anyhow(anyhow::anyhow!(
236                "trying to reconcile a materialize resource with no environment id - this is a bug!"
237            )));
238        }
239
240        for existing_mz in self.materializes.state() {
241            if existing_mz.spec.environment_id == mz.spec.environment_id
242                && existing_mz.metadata.uid != mz.metadata.uid
243            {
244                return Err(Error::Anyhow(anyhow::anyhow!(
245                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
246                    mz.namespace(),
247                    mz.name_unchecked(),
248                    existing_mz.namespace(),
249                    existing_mz.name_unchecked(),
250                )));
251            }
252        }
253
254        Ok(())
255    }
256}
257
258#[async_trait::async_trait]
259impl k8s_controller::Context for Context {
260    type Resource = Materialize;
261    type Error = Error;
262
263    const FINALIZER_NAME: Option<&'static str> =
264        Some("orchestratord.materialize.cloud/materialize");
265
266    #[instrument(fields(organization_name=mz.name_unchecked()))]
267    async fn apply(
268        &self,
269        client: Client,
270        mz: &Self::Resource,
271    ) -> Result<Option<Action>, Self::Error> {
272        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
273        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
274        let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
275        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
276
277        let status = mz.status();
278        if mz.status.is_none() {
279            self.update_status(&mz_api, mz, status, true).await?;
280            // Updating the status should trigger a reconciliation
281            // which will include a status this time.
282            return Ok(None);
283        }
284
285        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
286        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
287            .data
288            .as_ref()
289            .and_then(|data| data.get("license_key"))
290        {
291            let license_key = validate(
292                str::from_utf8(&license_key.0)
293                    .context("invalid utf8")?
294                    .trim(),
295            )?;
296            let environment_id = license_key
297                .environment_id
298                .parse()
299                .context("invalid environment id in license key")?;
300            Some(environment_id)
301        } else {
302            if mz.meets_minimum_version(&V161) {
303                return Err(Error::Anyhow(anyhow::anyhow!(
304                    "license_key is required when running in kubernetes",
305                )));
306            } else {
307                None
308            }
309        };
310
311        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
312            let mut mz = mz.clone();
313            if mz.spec.request_rollout.is_nil() {
314                mz.spec.request_rollout = Uuid::new_v4();
315            }
316            if mz.spec.environment_id.is_nil() {
317                if let Some(environment_id) = license_key_environment_id {
318                    if environment_id.is_nil() {
319                        // this makes it easier to use a license key in
320                        // development with no environment id set
321                        mz.spec.environment_id = Uuid::new_v4();
322                    } else {
323                        mz.spec.environment_id = environment_id;
324                    }
325                } else {
326                    if mz.meets_minimum_version(&V161) {
327                        return Err(Error::Anyhow(anyhow::anyhow!(
328                            "environmentId is not set in materialize resource {}/{} but no license key was given",
329                            mz.namespace(),
330                            mz.name_unchecked()
331                        )));
332                    } else {
333                        mz.spec.environment_id = Uuid::new_v4();
334                    }
335                }
336            }
337            mz_api
338                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
339                .await?;
340            // Updating the spec should also trigger a reconciliation.
341            // We can't do that as part of the above check because you can't
342            // update both the spec and the status in a single api call.
343            return Ok(None);
344        }
345
346        if let Some(environment_id) = license_key_environment_id {
347            // we still allow a nil environment id in the license key to be
348            // accepted for any provided environment id, to support cloud
349            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
350                return Err(Error::Anyhow(anyhow::anyhow!(
351                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
352                    mz.namespace(),
353                    mz.name_unchecked(),
354                    environment_id,
355                )));
356            }
357        }
358
359        self.check_environment_id_conflicts(mz)?;
360
361        global::Resources::new(&self.config, mz)
362            .apply(&client, &mz.namespace())
363            .await?;
364
365        // we compare the hash against the environment resources generated
366        // for the current active generation, since that's what we expect to
367        // have been applied earlier, but we don't want to use these
368        // environment resources because when we apply them, we want to apply
369        // them with data that uses the new generation
370        let active_resources =
371            generation::Resources::new(&self.config, mz, status.active_generation);
372        let has_current_changes = status.resources_hash != active_resources.generate_hash();
373        let active_generation = status.active_generation;
374        let next_generation = active_generation + 1;
375        let desired_generation = if has_current_changes {
376            next_generation
377        } else {
378            active_generation
379        };
380
381        // here we regenerate the environment resources using the
382        // same inputs except with an updated generation
383        let resources = generation::Resources::new(&self.config, mz, desired_generation);
384        let resources_hash = resources.generate_hash();
385
386        let mut result = match (
387            mz.is_promoting(),
388            has_current_changes,
389            mz.rollout_requested(),
390        ) {
391            // If we're in status promoting, we MUST promote now.
392            // We don't know if we successfully promoted or not yet.
393            (true, _, _) => {
394                self.promote(
395                    &client,
396                    mz,
397                    resources,
398                    active_generation,
399                    desired_generation,
400                    resources_hash,
401                )
402                .await
403            }
404            // There are changes pending, and we want to apply them.
405            (false, true, true) => {
406                // we remove the environment resources hash annotation here
407                // because if we fail halfway through applying the resources,
408                // things will be in an inconsistent state, and we don't want
409                // to allow the possibility of the user making a second
410                // change which reverts to the original state and then
411                // skipping retrying this apply, since that would leave
412                // things in a permanently inconsistent state.
413                // note that environment.spec will be empty here after
414                // replace_status, but this is fine because we already
415                // extracted all of the information we want from the spec
416                // earlier.
417                let mz = if mz.is_ready_to_promote(&resources_hash) {
418                    mz
419                } else {
420                    &self
421                        .update_status(
422                            &mz_api,
423                            mz,
424                            MaterializeStatus {
425                                active_generation,
426                                // don't update the reconciliation id yet,
427                                // because the rollout hasn't yet completed. if
428                                // we fail later on, we want to ensure that the
429                                // rollout gets retried.
430                                last_completed_rollout_request: status
431                                    .last_completed_rollout_request,
432                                last_completed_rollout_environmentd_image_ref: status
433                                    .last_completed_rollout_environmentd_image_ref,
434                                resource_id: status.resource_id.clone(),
435                                resources_hash: String::new(),
436                                conditions: vec![Condition {
437                                    type_: "UpToDate".into(),
438                                    status: "Unknown".into(),
439                                    last_transition_time: Time(Timestamp::now()),
440                                    message: format!(
441                                        "Applying changes for generation {desired_generation}"
442                                    ),
443                                    observed_generation: mz.meta().generation,
444                                    reason: "Applying".into(),
445                                }],
446                            },
447                            active_generation != desired_generation,
448                        )
449                        .await?
450                };
451                let status = mz.status();
452
453                if !mz.within_upgrade_window() {
454                    let last_completed_rollout_environmentd_image_ref =
455                        status.last_completed_rollout_environmentd_image_ref;
456
457                    self.update_status(
458                        &mz_api,
459                        mz,
460                        MaterializeStatus {
461                            active_generation,
462                            last_completed_rollout_request: status.last_completed_rollout_request,
463                            last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
464                            resource_id: status.resource_id,
465                            resources_hash: status.resources_hash,
466                            conditions: vec![Condition {
467                                type_: "UpToDate".into(),
468                                status: "False".into(),
469                                last_transition_time: Time(Timestamp::now()),
470                                message: format!(
471                        "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
472                        last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
473                        &mz.spec.environmentd_image_ref,
474                    ),
475                                observed_generation: mz.meta().generation,
476                                reason: "FailedDeploy".into(),
477                            }],
478                        },
479                        active_generation != desired_generation,
480                    )
481                    .await?;
482                    return Ok(None);
483                }
484
485                if mz.spec.rollout_strategy
486                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
487                {
488                    // The only reason someone would choose this strategy is if they didn't have
489                    // space for the two generations of pods.
490                    // Lets make room for the new ones by deleting the old generation.
491                    resources
492                        .teardown_generation(&client, mz, active_generation)
493                        .await?;
494                }
495
496                trace!("applying environment resources");
497                match resources
498                    .apply(&client, mz.should_force_promote(), &mz.namespace())
499                    .await
500                {
501                    Ok(Some(action)) => {
502                        trace!("new environment is not yet ready");
503                        Ok(Some(action))
504                    }
505                    Ok(None) => {
506                        if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
507                            && !mz.should_force_promote()
508                        {
509                            trace!(
510                                "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
511                            );
512                            self.update_status(
513                                &mz_api,
514                                mz,
515                                MaterializeStatus {
516                                    active_generation,
517                                    last_completed_rollout_request: status
518                                        .last_completed_rollout_request,
519                                    last_completed_rollout_environmentd_image_ref: status
520                                        .last_completed_rollout_environmentd_image_ref,
521                                    resource_id: status.resource_id,
522                                    resources_hash,
523                                    conditions: vec![Condition {
524                                        type_: "UpToDate".into(),
525                                        status: "Unknown".into(),
526                                        last_transition_time: Time(Timestamp::now()),
527                                        message: format!(
528                                            "Ready to promote generation {desired_generation}"
529                                        ),
530                                        observed_generation: mz.meta().generation,
531                                        reason: "ReadyToPromote".into(),
532                                    }],
533                                },
534                                active_generation != desired_generation,
535                            )
536                            .await?;
537                            return Ok(None);
538                        }
539                        // do this last, so that we keep traffic pointing at
540                        // the previous environmentd until the new one is
541                        // fully ready
542
543                        // Update the status before calling promote, so that we know
544                        // we've crossed the point of no return.
545                        // Once we see this status, we must promote without taking other actions.
546                        self.update_status(
547                            &mz_api,
548                            mz,
549                            MaterializeStatus {
550                                active_generation,
551                                // don't update the reconciliation id yet,
552                                // because the rollout hasn't yet completed. if
553                                // we fail later on, we want to ensure that the
554                                // rollout gets retried.
555                                last_completed_rollout_request: status
556                                    .last_completed_rollout_request,
557                                last_completed_rollout_environmentd_image_ref: status
558                                    .last_completed_rollout_environmentd_image_ref,
559                                resource_id: status.resource_id,
560                                resources_hash: resources_hash.clone(),
561                                conditions: vec![Condition {
562                                    type_: "UpToDate".into(),
563                                    status: "Unknown".into(),
564                                    last_transition_time: Time(Timestamp::now()),
565                                    message: format!(
566                                        "Attempting to promote generation {desired_generation}"
567                                    ),
568                                    observed_generation: mz.meta().generation,
569                                    reason: "Promoting".into(),
570                                }],
571                            },
572                            active_generation != desired_generation,
573                        )
574                        .await?;
575                        self.promote(
576                            &client,
577                            mz,
578                            resources,
579                            active_generation,
580                            desired_generation,
581                            resources_hash,
582                        )
583                        .await
584                    }
585                    Err(e) => {
586                        self.update_status(
587                            &mz_api,
588                            mz,
589                            MaterializeStatus {
590                                active_generation,
591                                // also don't update the reconciliation id
592                                // here, because there was an error during
593                                // the rollout and we want to ensure it gets
594                                // retried.
595                                last_completed_rollout_request: status.last_completed_rollout_request,
596                                last_completed_rollout_environmentd_image_ref: status
597                                    .last_completed_rollout_environmentd_image_ref,
598                                resource_id: status.resource_id,
599                                resources_hash: status.resources_hash,
600                                conditions: vec![Condition {
601                                    type_: "UpToDate".into(),
602                                    status: "False".into(),
603                                    last_transition_time: Time(Timestamp::now()),
604                                    message: format!(
605                                        "Failed to apply changes for generation {desired_generation}: {e}"
606                                    ),
607                                    observed_generation: mz.meta().generation,
608                                    reason: "FailedDeploy".into(),
609                                }],
610                            },
611                            active_generation != desired_generation,
612                        )
613                        .await?;
614                        Err(e)
615                    }
616                }
617            }
618            // There are changes pending, but we don't want to apply them yet.
619            (false, true, false) => {
620                let mut needs_update = mz.conditions_need_update();
621                if mz.update_in_progress() {
622                    resources
623                        .teardown_generation(&client, mz, next_generation)
624                        .await?;
625                    needs_update = true;
626                }
627                if needs_update {
628                    self.update_status(
629                        &mz_api,
630                        mz,
631                        MaterializeStatus {
632                            active_generation,
633                            last_completed_rollout_request: mz.requested_reconciliation_id(),
634                            last_completed_rollout_environmentd_image_ref: status
635                                .last_completed_rollout_environmentd_image_ref,
636                            resource_id: status.resource_id.clone(),
637                            resources_hash: status.resources_hash,
638                            conditions: vec![Condition {
639                                type_: "UpToDate".into(),
640                                status: "False".into(),
641                                last_transition_time: Time(Timestamp::now()),
642                                message: format!(
643                                    "Changes detected, waiting for approval for generation {desired_generation}"
644                                ),
645                                observed_generation: mz.meta().generation,
646                                reason: "WaitingForApproval".into(),
647                            }],
648                        },
649                        active_generation != desired_generation,
650                    )
651                    .await?;
652                }
653                debug!("changes detected, waiting for approval");
654                Ok(None)
655            }
656            // No changes pending, but we might need to clean up a partially applied rollout.
657            (false, false, _) => {
658                // this can happen if we update the environment, but then revert
659                // that update before the update was deployed. in this case, we
660                // don't want the environment to still show up as
661                // WaitingForApproval.
662                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
663                if mz.update_in_progress() {
664                    resources
665                        .teardown_generation(&client, mz, next_generation)
666                        .await?;
667                    needs_update = true;
668                }
669                if needs_update {
670                    self.update_status(
671                        &mz_api,
672                        mz,
673                        MaterializeStatus {
674                            active_generation,
675                            last_completed_rollout_request: mz.requested_reconciliation_id(),
676                            last_completed_rollout_environmentd_image_ref: status
677                                .last_completed_rollout_environmentd_image_ref,
678                            resource_id: status.resource_id.clone(),
679                            resources_hash: status.resources_hash,
680                            conditions: vec![Condition {
681                                type_: "UpToDate".into(),
682                                status: "True".into(),
683                                last_transition_time: Time(Timestamp::now()),
684                                message: format!(
685                                    "No changes found from generation {active_generation}"
686                                ),
687                                observed_generation: mz.meta().generation,
688                                reason: "Applied".into(),
689                            }],
690                        },
691                        active_generation != desired_generation,
692                    )
693                    .await?;
694                }
695                debug!("no changes");
696                Ok(None)
697            }
698        }?;
699
700        if let Some(action) = result {
701            return Ok(Some(action));
702        }
703
704        // balancers rely on the environmentd service existing, which is
705        // enforced by the environmentd rollout process being able to call
706        // into the promotion endpoint
707
708        if self.config.create_balancers {
709            let balancer = Balancer {
710                metadata: mz.managed_resource_meta(mz.name_unchecked()),
711                spec: BalancerSpec {
712                    balancerd_image_ref: matching_image_from_environmentd_image_ref(
713                        &mz.spec.environmentd_image_ref,
714                        "balancerd",
715                        None,
716                    ),
717                    resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
718                    replicas: Some(mz.balancerd_replicas()),
719                    external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
720                    internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
721                    pod_annotations: mz.spec.pod_annotations.clone(),
722                    pod_labels: mz.spec.pod_labels.clone(),
723                    static_routing: Some(
724                        mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
725                            environmentd_namespace: mz.namespace(),
726                            environmentd_service_name: mz.environmentd_service_name(),
727                        },
728                    ),
729                    frontegg_routing: None,
730                    resource_id: Some(status.resource_id.clone()),
731                },
732                status: None,
733            };
734            let balancer = apply_resource(&balancer_api, &balancer).await?;
735            result = wait_for_balancer(&balancer)?;
736        } else {
737            delete_resource(&balancer_api, &mz.name_unchecked()).await?;
738        }
739
740        if let Some(action) = result {
741            return Ok(Some(action));
742        }
743
744        // and the console relies on the balancer service existing, which is
745        // enforced by wait_for_balancer
746
747        if self.config.create_console {
748            let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
749            else {
750                return Err(Error::Anyhow(anyhow::anyhow!(
751                    "failed to parse environmentd image ref: {}",
752                    mz.spec.environmentd_image_ref
753                )));
754            };
755            let console_image_tag = self
756                .config
757                .console_image_tag_map
758                .iter()
759                .find(|kv| kv.key == environmentd_image_tag)
760                .map(|kv| kv.value.clone())
761                .unwrap_or_else(|| self.config.console_image_tag_default.clone());
762            let console = Console {
763                metadata: mz.managed_resource_meta(mz.name_unchecked()),
764                spec: ConsoleSpec {
765                    console_image_ref: matching_image_from_environmentd_image_ref(
766                        &mz.spec.environmentd_image_ref,
767                        "console",
768                        Some(&console_image_tag),
769                    ),
770                    resource_requirements: mz.spec.console_resource_requirements.clone(),
771                    replicas: Some(mz.console_replicas()),
772                    external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
773                    pod_annotations: mz.spec.pod_annotations.clone(),
774                    pod_labels: mz.spec.pod_labels.clone(),
775                    balancerd: BalancerdRef {
776                        service_name: mz.balancerd_service_name(),
777                        namespace: mz.namespace(),
778                        scheme: if issuer_ref_defined(
779                            &self.config.default_certificate_specs.balancerd_external,
780                            &mz.spec.balancerd_external_certificate_spec,
781                        ) {
782                            HttpConnectionScheme::Https
783                        } else {
784                            HttpConnectionScheme::Http
785                        },
786                    },
787                    authenticator_kind: mz.spec.authenticator_kind,
788                    resource_id: Some(status.resource_id),
789                },
790                status: None,
791            };
792            apply_resource(&console_api, &console).await?;
793        } else {
794            delete_resource(&console_api, &mz.name_unchecked()).await?;
795        }
796
797        Ok(result)
798    }
799
800    #[instrument(fields(organization_name=mz.name_unchecked()))]
801    async fn cleanup(
802        &self,
803        _client: Client,
804        mz: &Self::Resource,
805    ) -> Result<Option<Action>, Self::Error> {
806        self.set_needs_update(mz, false);
807
808        Ok(None)
809    }
810}
811
812fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
813    if let Some(conditions) = balancer
814        .status
815        .as_ref()
816        .map(|status| status.conditions.as_slice())
817    {
818        if conditions
819            .iter()
820            .any(|condition| condition.type_ == "Ready" && condition.status == "True")
821        {
822            return Ok(None);
823        }
824    }
825
826    Ok(Some(Action::requeue(Duration::from_secs(1))))
827}