Skip to main content

mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    sync::{Arc, Mutex},
13    time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20    apimachinery::pkg::apis::meta::v1::{Condition, Time},
21    jiff::Timestamp,
22};
23use kube::{
24    Api, Client, Resource, ResourceExt,
25    api::{ListParams, PostParams},
26    runtime::controller::Action,
27};
28use tracing::{debug, trace};
29use uuid::Uuid;
30
31use crate::{
32    Error,
33    controller::materialize::generation::V161,
34    k8s::{apply_resource, delete_resource},
35    matching_image_from_environmentd_image_ref,
36    metrics::Metrics,
37    parse_image_tag,
38    tls::{DefaultCertificateSpecs, issuer_ref_defined},
39};
40use mz_cloud_provider::CloudProvider;
41use mz_cloud_resources::crd::{
42    ManagedResource,
43    balancer::v1alpha1::{Balancer, BalancerSpec},
44    console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
45    materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
46};
47use mz_license_keys::validate;
48use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
49use mz_orchestrator_tracing::TracingCliArgs;
50use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
51
52pub mod generation;
53pub mod global;
54
55pub struct Config {
56    pub cloud_provider: CloudProvider,
57    pub region: String,
58    pub create_balancers: bool,
59    pub create_console: bool,
60    pub helm_chart_version: Option<String>,
61    pub secrets_controller: String,
62    pub collect_pod_metrics: bool,
63    pub enable_prometheus_scrape_annotations: bool,
64
65    pub segment_api_key: Option<String>,
66    pub segment_client_side: bool,
67
68    pub console_image_tag_default: String,
69    pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
70
71    pub aws_account_id: Option<String>,
72    pub environmentd_iam_role_arn: Option<String>,
73    pub environmentd_connection_role_arn: Option<String>,
74    pub aws_secrets_controller_tags: Vec<String>,
75    pub environmentd_availability_zones: Option<Vec<String>>,
76
77    pub ephemeral_volume_class: Option<String>,
78    pub scheduler_name: Option<String>,
79    pub enable_security_context: bool,
80    pub enable_internal_statement_logging: bool,
81    pub disable_statement_logging: bool,
82
83    pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
84    pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
85    pub environmentd_affinity: Option<Affinity>,
86    pub environmentd_tolerations: Option<Vec<Toleration>>,
87    pub environmentd_default_resources: Option<ResourceRequirements>,
88    pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
89    pub clusterd_affinity: Option<Affinity>,
90    pub clusterd_tolerations: Option<Vec<Toleration>>,
91    pub image_pull_policy: KubernetesImagePullPolicy,
92    pub network_policies_internal_enabled: bool,
93    pub network_policies_ingress_enabled: bool,
94    pub network_policies_ingress_cidrs: Vec<String>,
95    pub network_policies_egress_enabled: bool,
96    pub network_policies_egress_cidrs: Vec<String>,
97
98    pub environmentd_cluster_replica_sizes: Option<String>,
99    pub bootstrap_default_cluster_replica_size: Option<String>,
100    pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
101    pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
102    pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
103    pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
104    pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
105    pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
106    pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
107    pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
108    pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
109
110    pub environmentd_allowed_origins: Vec<HeaderValue>,
111    pub internal_console_proxy_url: String,
112
113    pub environmentd_sql_port: u16,
114    pub environmentd_http_port: u16,
115    pub environmentd_internal_sql_port: u16,
116    pub environmentd_internal_http_port: u16,
117    pub environmentd_internal_persist_pubsub_port: u16,
118
119    pub default_certificate_specs: DefaultCertificateSpecs,
120
121    pub disable_license_key_checks: bool,
122
123    pub tracing: TracingCliArgs,
124    pub orchestratord_namespace: String,
125}
126
127pub struct Context {
128    config: Config,
129    metrics: Arc<Metrics>,
130    needs_update: Arc<Mutex<BTreeSet<String>>>,
131}
132
133impl Context {
134    pub fn new(config: Config, metrics: Arc<Metrics>) -> Self {
135        if config.cloud_provider == CloudProvider::Aws {
136            assert!(
137                config.aws_account_id.is_some(),
138                "--aws-account-id is required when using --cloud-provider=aws"
139            );
140        }
141
142        Self {
143            config,
144            metrics,
145            needs_update: Default::default(),
146        }
147    }
148
149    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
150        let mut needs_update_set = self.needs_update.lock().unwrap();
151        if needs_update {
152            needs_update_set.insert(mz.name_unchecked());
153        } else {
154            needs_update_set.remove(&mz.name_unchecked());
155        }
156        self.metrics
157            .environmentd_needs_update
158            .set(u64::cast_from(needs_update_set.len()));
159    }
160
161    async fn update_status(
162        &self,
163        mz_api: &Api<Materialize>,
164        mz: &Materialize,
165        status: MaterializeStatus,
166        needs_update: bool,
167    ) -> Result<Materialize, kube::Error> {
168        self.set_needs_update(mz, needs_update);
169
170        let mut new_mz = mz.clone();
171        if !mz
172            .status
173            .as_ref()
174            .map_or(true, |mz_status| mz_status.needs_update(&status))
175        {
176            return Ok(new_mz);
177        }
178
179        new_mz.status = Some(status);
180        mz_api
181            .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
182            .await
183    }
184
185    async fn promote(
186        &self,
187        client: &Client,
188        mz: &Materialize,
189        resources: generation::Resources,
190        active_generation: u64,
191        desired_generation: u64,
192        resources_hash: String,
193    ) -> Result<Option<Action>, Error> {
194        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
195            return Ok(Some(action));
196        }
197        resources
198            .teardown_generation(client, mz, active_generation)
199            .await?;
200        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
201        self.update_status(
202            &mz_api,
203            mz,
204            MaterializeStatus {
205                active_generation: desired_generation,
206                last_completed_rollout_request: mz.requested_reconciliation_id(),
207                last_completed_rollout_environmentd_image_ref: Some(
208                    mz.spec.environmentd_image_ref.clone(),
209                ),
210                resource_id: mz.status().resource_id,
211                resources_hash,
212                conditions: vec![Condition {
213                    type_: "UpToDate".into(),
214                    status: "True".into(),
215                    last_transition_time: Time(Timestamp::now()),
216                    message: format!(
217                        "Successfully applied changes for generation {desired_generation}"
218                    ),
219                    observed_generation: mz.meta().generation,
220                    reason: "Applied".into(),
221                }],
222            },
223            false,
224        )
225        .await?;
226        Ok(None)
227    }
228
229    async fn check_environment_id_conflicts(
230        &self,
231        client: &Client,
232        mz: &Materialize,
233    ) -> Result<(), Error> {
234        if mz.spec.environment_id.is_nil() {
235            // this is always a bug - we delay doing this check until the
236            // resource should have an environment id set, either from the
237            // license key, or explicitly given, or randomly defaulted.
238            return Err(Error::Anyhow(anyhow::anyhow!(
239                "trying to reconcile a materialize resource with no environment id - this is a bug!"
240            )));
241        }
242
243        let mz_api: Api<Materialize> = Api::all(client.clone());
244        let all_mz = mz_api.list(&ListParams::default()).await?;
245        for existing_mz in &all_mz.items {
246            if existing_mz.spec.environment_id == mz.spec.environment_id
247                && existing_mz.metadata.uid != mz.metadata.uid
248            {
249                return Err(Error::Anyhow(anyhow::anyhow!(
250                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
251                    mz.namespace(),
252                    mz.name_unchecked(),
253                    existing_mz.namespace(),
254                    existing_mz.name_unchecked(),
255                )));
256            }
257        }
258
259        Ok(())
260    }
261}
262
263#[async_trait::async_trait]
264impl k8s_controller::Context for Context {
265    type Resource = Materialize;
266    type Error = Error;
267
268    const FINALIZER_NAME: Option<&'static str> =
269        Some("orchestratord.materialize.cloud/materialize");
270
271    #[instrument(fields(organization_name=mz.name_unchecked()))]
272    async fn apply(
273        &self,
274        client: Client,
275        mz: &Self::Resource,
276    ) -> Result<Option<Action>, Self::Error> {
277        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
278        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
279        let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
280        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
281
282        let status = mz.status();
283        if mz.status.is_none() {
284            self.update_status(&mz_api, mz, status, true).await?;
285            // Updating the status should trigger a reconciliation
286            // which will include a status this time.
287            return Ok(None);
288        }
289
290        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
291        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
292            .data
293            .as_ref()
294            .and_then(|data| data.get("license_key"))
295        {
296            let license_key = validate(
297                str::from_utf8(&license_key.0)
298                    .context("invalid utf8")?
299                    .trim(),
300            )?;
301            let environment_id = license_key
302                .environment_id
303                .parse()
304                .context("invalid environment id in license key")?;
305            Some(environment_id)
306        } else {
307            if mz.meets_minimum_version(&V161) {
308                return Err(Error::Anyhow(anyhow::anyhow!(
309                    "license_key is required when running in kubernetes",
310                )));
311            } else {
312                None
313            }
314        };
315
316        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
317            let mut mz = mz.clone();
318            if mz.spec.request_rollout.is_nil() {
319                mz.spec.request_rollout = Uuid::new_v4();
320            }
321            if mz.spec.environment_id.is_nil() {
322                if let Some(environment_id) = license_key_environment_id {
323                    if environment_id.is_nil() {
324                        // this makes it easier to use a license key in
325                        // development with no environment id set
326                        mz.spec.environment_id = Uuid::new_v4();
327                    } else {
328                        mz.spec.environment_id = environment_id;
329                    }
330                } else {
331                    if mz.meets_minimum_version(&V161) {
332                        return Err(Error::Anyhow(anyhow::anyhow!(
333                            "environmentId is not set in materialize resource {}/{} but no license key was given",
334                            mz.namespace(),
335                            mz.name_unchecked()
336                        )));
337                    } else {
338                        mz.spec.environment_id = Uuid::new_v4();
339                    }
340                }
341            }
342            mz_api
343                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
344                .await?;
345            // Updating the spec should also trigger a reconciliation.
346            // We can't do that as part of the above check because you can't
347            // update both the spec and the status in a single api call.
348            return Ok(None);
349        }
350
351        if let Some(environment_id) = license_key_environment_id {
352            // we still allow a nil environment id in the license key to be
353            // accepted for any provided environment id, to support cloud
354            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
355                return Err(Error::Anyhow(anyhow::anyhow!(
356                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
357                    mz.namespace(),
358                    mz.name_unchecked(),
359                    environment_id,
360                )));
361            }
362        }
363
364        self.check_environment_id_conflicts(&client, mz).await?;
365
366        global::Resources::new(&self.config, mz)?
367            .apply(&client, &mz.namespace())
368            .await?;
369
370        // we compare the hash against the environment resources generated
371        // for the current active generation, since that's what we expect to
372        // have been applied earlier, but we don't want to use these
373        // environment resources because when we apply them, we want to apply
374        // them with data that uses the new generation
375        let active_resources =
376            generation::Resources::new(&self.config, mz, status.active_generation);
377        let has_current_changes = status.resources_hash != active_resources.generate_hash();
378        let active_generation = status.active_generation;
379        let next_generation = active_generation + 1;
380        let desired_generation = if has_current_changes {
381            next_generation
382        } else {
383            active_generation
384        };
385
386        // here we regenerate the environment resources using the
387        // same inputs except with an updated generation
388        let resources = generation::Resources::new(&self.config, mz, desired_generation);
389        let resources_hash = resources.generate_hash();
390
391        let mut result = match (
392            mz.is_promoting(),
393            has_current_changes,
394            mz.rollout_requested(),
395        ) {
396            // If we're in status promoting, we MUST promote now.
397            // We don't know if we successfully promoted or not yet.
398            (true, _, _) => {
399                self.promote(
400                    &client,
401                    mz,
402                    resources,
403                    active_generation,
404                    desired_generation,
405                    resources_hash,
406                )
407                .await
408            }
409            // There are changes pending, and we want to apply them.
410            (false, true, true) => {
411                // we remove the environment resources hash annotation here
412                // because if we fail halfway through applying the resources,
413                // things will be in an inconsistent state, and we don't want
414                // to allow the possibility of the user making a second
415                // change which reverts to the original state and then
416                // skipping retrying this apply, since that would leave
417                // things in a permanently inconsistent state.
418                // note that environment.spec will be empty here after
419                // replace_status, but this is fine because we already
420                // extracted all of the information we want from the spec
421                // earlier.
422                let mz = if mz.is_ready_to_promote(&resources_hash) {
423                    mz
424                } else {
425                    &self
426                        .update_status(
427                            &mz_api,
428                            mz,
429                            MaterializeStatus {
430                                active_generation,
431                                // don't update the reconciliation id yet,
432                                // because the rollout hasn't yet completed. if
433                                // we fail later on, we want to ensure that the
434                                // rollout gets retried.
435                                last_completed_rollout_request: status
436                                    .last_completed_rollout_request,
437                                last_completed_rollout_environmentd_image_ref: status
438                                    .last_completed_rollout_environmentd_image_ref,
439                                resource_id: status.resource_id.clone(),
440                                resources_hash: String::new(),
441                                conditions: vec![Condition {
442                                    type_: "UpToDate".into(),
443                                    status: "Unknown".into(),
444                                    last_transition_time: Time(Timestamp::now()),
445                                    message: format!(
446                                        "Applying changes for generation {desired_generation}"
447                                    ),
448                                    observed_generation: mz.meta().generation,
449                                    reason: "Applying".into(),
450                                }],
451                            },
452                            active_generation != desired_generation,
453                        )
454                        .await?
455                };
456                let status = mz.status();
457
458                if !mz.within_upgrade_window() {
459                    let last_completed_rollout_environmentd_image_ref =
460                        status.last_completed_rollout_environmentd_image_ref;
461
462                    self.update_status(
463                        &mz_api,
464                        mz,
465                        MaterializeStatus {
466                            active_generation,
467                            last_completed_rollout_request: status.last_completed_rollout_request,
468                            last_completed_rollout_environmentd_image_ref:
469                                last_completed_rollout_environmentd_image_ref.clone(),
470                            resource_id: status.resource_id,
471                            resources_hash: status.resources_hash,
472                            conditions: vec![Condition {
473                                type_: "UpToDate".into(),
474                                status: "False".into(),
475                                last_transition_time: Time(Timestamp::now()),
476                                message: format!(
477                                    "Refusing to upgrade from {} to {}. \
478                                     More than one major version from \
479                                     last successful rollout. If coming \
480                                     from Self Managed 25.2, upgrade to \
481                                     materialize/environmentd:v0.147.20 \
482                                     first.",
483                                    last_completed_rollout_environmentd_image_ref
484                                        .expect("should be set if upgrade window check fails"),
485                                    &mz.spec.environmentd_image_ref,
486                                ),
487                                observed_generation: mz.meta().generation,
488                                reason: "FailedDeploy".into(),
489                            }],
490                        },
491                        active_generation != desired_generation,
492                    )
493                    .await?;
494                    return Ok(None);
495                }
496
497                if mz.spec.rollout_strategy
498                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
499                {
500                    // The only reason someone would choose this strategy is if they didn't have
501                    // space for the two generations of pods.
502                    // Lets make room for the new ones by deleting the old generation.
503                    resources
504                        .teardown_generation(&client, mz, active_generation)
505                        .await?;
506                }
507
508                trace!("applying environment resources");
509                match resources
510                    .apply(&client, mz.should_force_promote(), &mz.namespace())
511                    .await
512                {
513                    Ok(Some(action)) => {
514                        trace!("new environment is not yet ready");
515                        Ok(Some(action))
516                    }
517                    Ok(None) => {
518                        if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
519                            && !mz.should_force_promote()
520                        {
521                            trace!(
522                                "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
523                            );
524                            self.update_status(
525                                &mz_api,
526                                mz,
527                                MaterializeStatus {
528                                    active_generation,
529                                    last_completed_rollout_request: status
530                                        .last_completed_rollout_request,
531                                    last_completed_rollout_environmentd_image_ref: status
532                                        .last_completed_rollout_environmentd_image_ref,
533                                    resource_id: status.resource_id,
534                                    resources_hash,
535                                    conditions: vec![Condition {
536                                        type_: "UpToDate".into(),
537                                        status: "Unknown".into(),
538                                        last_transition_time: Time(Timestamp::now()),
539                                        message: format!(
540                                            "Ready to promote generation {desired_generation}"
541                                        ),
542                                        observed_generation: mz.meta().generation,
543                                        reason: "ReadyToPromote".into(),
544                                    }],
545                                },
546                                active_generation != desired_generation,
547                            )
548                            .await?;
549                            return Ok(None);
550                        }
551                        // do this last, so that we keep traffic pointing at
552                        // the previous environmentd until the new one is
553                        // fully ready
554
555                        // Update the status before calling promote, so that we know
556                        // we've crossed the point of no return.
557                        // Once we see this status, we must promote without taking other actions.
558                        self.update_status(
559                            &mz_api,
560                            mz,
561                            MaterializeStatus {
562                                active_generation,
563                                // don't update the reconciliation id yet,
564                                // because the rollout hasn't yet completed. if
565                                // we fail later on, we want to ensure that the
566                                // rollout gets retried.
567                                last_completed_rollout_request: status
568                                    .last_completed_rollout_request,
569                                last_completed_rollout_environmentd_image_ref: status
570                                    .last_completed_rollout_environmentd_image_ref,
571                                resource_id: status.resource_id,
572                                resources_hash: resources_hash.clone(),
573                                conditions: vec![Condition {
574                                    type_: "UpToDate".into(),
575                                    status: "Unknown".into(),
576                                    last_transition_time: Time(Timestamp::now()),
577                                    message: format!(
578                                        "Attempting to promote generation {desired_generation}"
579                                    ),
580                                    observed_generation: mz.meta().generation,
581                                    reason: "Promoting".into(),
582                                }],
583                            },
584                            active_generation != desired_generation,
585                        )
586                        .await?;
587                        self.promote(
588                            &client,
589                            mz,
590                            resources,
591                            active_generation,
592                            desired_generation,
593                            resources_hash,
594                        )
595                        .await
596                    }
597                    Err(e) => {
598                        self.update_status(
599                            &mz_api,
600                            mz,
601                            MaterializeStatus {
602                                active_generation,
603                                // also don't update the reconciliation id
604                                // here, because there was an error during
605                                // the rollout and we want to ensure it gets
606                                // retried.
607                                last_completed_rollout_request: status
608                                    .last_completed_rollout_request,
609                                last_completed_rollout_environmentd_image_ref: status
610                                    .last_completed_rollout_environmentd_image_ref,
611                                resource_id: status.resource_id,
612                                resources_hash: status.resources_hash,
613                                conditions: vec![Condition {
614                                    type_: "UpToDate".into(),
615                                    status: "False".into(),
616                                    last_transition_time: Time(Timestamp::now()),
617                                    message: format!(
618                                        "Failed to apply changes for \
619                                         generation {desired_generation}: {e}"
620                                    ),
621                                    observed_generation: mz.meta().generation,
622                                    reason: "FailedDeploy".into(),
623                                }],
624                            },
625                            active_generation != desired_generation,
626                        )
627                        .await?;
628                        Err(e)
629                    }
630                }
631            }
632            // There are changes pending, but we don't want to apply them yet.
633            (false, true, false) => {
634                let mut needs_update = mz.conditions_need_update();
635                if mz.update_in_progress() {
636                    resources
637                        .teardown_generation(&client, mz, next_generation)
638                        .await?;
639                    needs_update = true;
640                }
641                if needs_update {
642                    self.update_status(
643                        &mz_api,
644                        mz,
645                        MaterializeStatus {
646                            active_generation,
647                            last_completed_rollout_request: mz.requested_reconciliation_id(),
648                            last_completed_rollout_environmentd_image_ref: status
649                                .last_completed_rollout_environmentd_image_ref,
650                            resource_id: status.resource_id.clone(),
651                            resources_hash: status.resources_hash,
652                            conditions: vec![Condition {
653                                type_: "UpToDate".into(),
654                                status: "False".into(),
655                                last_transition_time: Time(Timestamp::now()),
656                                message: format!(
657                                    "Changes detected, waiting for approval for generation {desired_generation}"
658                                ),
659                                observed_generation: mz.meta().generation,
660                                reason: "WaitingForApproval".into(),
661                            }],
662                        },
663                        active_generation != desired_generation,
664                    )
665                    .await?;
666                }
667                debug!("changes detected, waiting for approval");
668                Ok(None)
669            }
670            // No changes pending, but we might need to clean up a partially applied rollout.
671            (false, false, _) => {
672                // this can happen if we update the environment, but then revert
673                // that update before the update was deployed. in this case, we
674                // don't want the environment to still show up as
675                // WaitingForApproval.
676                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
677                if mz.update_in_progress() {
678                    resources
679                        .teardown_generation(&client, mz, next_generation)
680                        .await?;
681                    needs_update = true;
682                }
683                if needs_update {
684                    self.update_status(
685                        &mz_api,
686                        mz,
687                        MaterializeStatus {
688                            active_generation,
689                            last_completed_rollout_request: mz.requested_reconciliation_id(),
690                            last_completed_rollout_environmentd_image_ref: status
691                                .last_completed_rollout_environmentd_image_ref,
692                            resource_id: status.resource_id.clone(),
693                            resources_hash: status.resources_hash,
694                            conditions: vec![Condition {
695                                type_: "UpToDate".into(),
696                                status: "True".into(),
697                                last_transition_time: Time(Timestamp::now()),
698                                message: format!(
699                                    "No changes found from generation {active_generation}"
700                                ),
701                                observed_generation: mz.meta().generation,
702                                reason: "Applied".into(),
703                            }],
704                        },
705                        active_generation != desired_generation,
706                    )
707                    .await?;
708                }
709                debug!("no changes");
710                Ok(None)
711            }
712        }?;
713
714        if let Some(action) = result {
715            return Ok(Some(action));
716        }
717
718        // balancers rely on the environmentd service existing, which is
719        // enforced by the environmentd rollout process being able to call
720        // into the promotion endpoint
721
722        if self.config.create_balancers {
723            let balancer = Balancer {
724                metadata: mz.managed_resource_meta(mz.name_unchecked()),
725                spec: BalancerSpec {
726                    balancerd_image_ref: matching_image_from_environmentd_image_ref(
727                        &mz.spec.environmentd_image_ref,
728                        "balancerd",
729                        None,
730                    ),
731                    resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
732                    replicas: Some(mz.balancerd_replicas()),
733                    external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
734                    internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
735                    pod_annotations: mz.spec.pod_annotations.clone(),
736                    pod_labels: mz.spec.pod_labels.clone(),
737                    static_routing: Some(
738                        mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
739                            environmentd_namespace: mz.namespace(),
740                            environmentd_service_name: mz.environmentd_service_name(),
741                        },
742                    ),
743                    frontegg_routing: None,
744                    resource_id: Some(status.resource_id.clone()),
745                },
746                status: None,
747            };
748            let balancer = apply_resource(&balancer_api, &balancer).await?;
749            result = wait_for_balancer(&balancer)?;
750        } else {
751            delete_resource(&balancer_api, &mz.name_unchecked()).await?;
752        }
753
754        if let Some(action) = result {
755            return Ok(Some(action));
756        }
757
758        // and the console relies on the balancer service existing, which is
759        // enforced by wait_for_balancer
760
761        if self.config.create_console {
762            let environmentd_image_tag =
763                parse_image_tag(&mz.spec.environmentd_image_ref).unwrap_or("latest");
764            let console_image_tag = self
765                .config
766                .console_image_tag_map
767                .iter()
768                .find(|kv| kv.key == environmentd_image_tag)
769                .map(|kv| kv.value.clone())
770                .unwrap_or_else(|| self.config.console_image_tag_default.clone());
771            let console = Console {
772                metadata: mz.managed_resource_meta(mz.name_unchecked()),
773                spec: ConsoleSpec {
774                    console_image_ref: matching_image_from_environmentd_image_ref(
775                        &mz.spec.environmentd_image_ref,
776                        "console",
777                        Some(&console_image_tag),
778                    ),
779                    resource_requirements: mz.spec.console_resource_requirements.clone(),
780                    replicas: Some(mz.console_replicas()),
781                    external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
782                    pod_annotations: mz.spec.pod_annotations.clone(),
783                    pod_labels: mz.spec.pod_labels.clone(),
784                    balancerd: BalancerdRef {
785                        service_name: mz.balancerd_service_name(),
786                        namespace: mz.namespace(),
787                        scheme: if issuer_ref_defined(
788                            &self.config.default_certificate_specs.balancerd_external,
789                            &mz.spec.balancerd_external_certificate_spec,
790                        ) {
791                            HttpConnectionScheme::Https
792                        } else {
793                            HttpConnectionScheme::Http
794                        },
795                    },
796                    authenticator_kind: mz.spec.authenticator_kind,
797                    resource_id: Some(status.resource_id),
798                },
799                status: None,
800            };
801            apply_resource(&console_api, &console).await?;
802        } else {
803            delete_resource(&console_api, &mz.name_unchecked()).await?;
804        }
805
806        Ok(result)
807    }
808
809    #[instrument(fields(organization_name=mz.name_unchecked()))]
810    async fn cleanup(
811        &self,
812        _client: Client,
813        mz: &Self::Resource,
814    ) -> Result<Option<Action>, Self::Error> {
815        self.set_needs_update(mz, false);
816
817        Ok(None)
818    }
819}
820
821fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
822    if let Some(conditions) = balancer
823        .status
824        .as_ref()
825        .map(|status| status.conditions.as_slice())
826    {
827        if conditions
828            .iter()
829            .any(|condition| condition.type_ == "Ready" && condition.status == "True")
830        {
831            return Ok(None);
832        }
833    }
834
835    Ok(Some(Action::requeue(Duration::from_secs(1))))
836}