mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    str::FromStr,
14    sync::{Arc, Mutex},
15};
16
17use anyhow::Context as _;
18use http::HeaderValue;
19use k8s_openapi::{
20    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21    apimachinery::pkg::apis::meta::v1::{Condition, Time},
22};
23use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
24use serde::Deserialize;
25use tracing::{debug, trace};
26use uuid::Uuid;
27
28use crate::metrics::Metrics;
29use mz_cloud_provider::CloudProvider;
30use mz_cloud_resources::crd::materialize::v1alpha1::{
31    Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
32};
33use mz_license_keys::validate;
34use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
35use mz_orchestrator_tracing::TracingCliArgs;
36use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
37
38pub mod balancer;
39pub mod console;
40pub mod environmentd;
41pub mod tls;
42
43#[derive(clap::Parser)]
44pub struct MaterializeControllerArgs {
45    #[clap(long)]
46    cloud_provider: CloudProvider,
47    #[clap(long)]
48    region: String,
49    #[clap(long)]
50    create_balancers: bool,
51    #[clap(long)]
52    create_console: bool,
53    #[clap(long)]
54    helm_chart_version: Option<String>,
55    #[clap(long, default_value = "kubernetes")]
56    secrets_controller: String,
57    #[clap(long)]
58    collect_pod_metrics: bool,
59    #[clap(long)]
60    enable_prometheus_scrape_annotations: bool,
61    #[clap(long)]
62    disable_authentication: bool,
63
64    #[clap(long)]
65    segment_api_key: Option<String>,
66    #[clap(long)]
67    segment_client_side: bool,
68
69    #[clap(long)]
70    console_image_tag_default: String,
71    #[clap(long)]
72    console_image_tag_map: Vec<KeyValueArg<String, String>>,
73
74    #[clap(flatten)]
75    aws_info: AwsInfo,
76
77    #[clap(long)]
78    ephemeral_volume_class: Option<String>,
79    #[clap(long)]
80    scheduler_name: Option<String>,
81    #[clap(long)]
82    enable_security_context: bool,
83    #[clap(long)]
84    enable_internal_statement_logging: bool,
85    #[clap(long, default_value = "false")]
86    disable_statement_logging: bool,
87
88    #[clap(long)]
89    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
90    #[clap(long)]
91    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
92    #[clap(long, value_parser = parse_affinity)]
93    environmentd_affinity: Option<Affinity>,
94    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
95    environmentd_tolerations: Option<Vec<Toleration>>,
96    #[clap(long, value_parser = parse_resources)]
97    environmentd_default_resources: Option<ResourceRequirements>,
98    #[clap(long)]
99    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
100    #[clap(long, value_parser = parse_affinity)]
101    clusterd_affinity: Option<Affinity>,
102    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
103    clusterd_tolerations: Option<Vec<Toleration>>,
104    #[clap(long)]
105    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
106    #[clap(long, value_parser = parse_affinity)]
107    balancerd_affinity: Option<Affinity>,
108    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
109    balancerd_tolerations: Option<Vec<Toleration>>,
110    #[clap(long, value_parser = parse_resources)]
111    balancerd_default_resources: Option<ResourceRequirements>,
112    #[clap(long)]
113    console_node_selector: Vec<KeyValueArg<String, String>>,
114    #[clap(long, value_parser = parse_affinity)]
115    console_affinity: Option<Affinity>,
116    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
117    console_tolerations: Option<Vec<Toleration>>,
118    #[clap(long, value_parser = parse_resources)]
119    console_default_resources: Option<ResourceRequirements>,
120    #[clap(long, default_value = "always", value_enum)]
121    image_pull_policy: KubernetesImagePullPolicy,
122    #[clap(flatten)]
123    network_policies: NetworkPolicyConfig,
124
125    #[clap(long)]
126    environmentd_cluster_replica_sizes: Option<String>,
127    #[clap(long)]
128    bootstrap_default_cluster_replica_size: Option<String>,
129    #[clap(long)]
130    bootstrap_builtin_system_cluster_replica_size: Option<String>,
131    #[clap(long)]
132    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
133    #[clap(long)]
134    bootstrap_builtin_support_cluster_replica_size: Option<String>,
135    #[clap(long)]
136    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
137    #[clap(long)]
138    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
139    #[clap(long)]
140    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
141    #[clap(long)]
142    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
143    #[clap(long)]
144    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
145    #[clap(long)]
146    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
147
148    #[clap(
149        long,
150        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
151    )]
152    environmentd_allowed_origins: Vec<HeaderValue>,
153    #[clap(long, default_value = "https://console.materialize.com")]
154    internal_console_proxy_url: String,
155
156    #[clap(long, default_value = "6875")]
157    environmentd_sql_port: u16,
158    #[clap(long, default_value = "6876")]
159    environmentd_http_port: u16,
160    #[clap(long, default_value = "6877")]
161    environmentd_internal_sql_port: u16,
162    #[clap(long, default_value = "6878")]
163    environmentd_internal_http_port: u16,
164    #[clap(long, default_value = "6879")]
165    environmentd_internal_persist_pubsub_port: u16,
166
167    #[clap(long, default_value = "6875")]
168    balancerd_sql_port: u16,
169    #[clap(long, default_value = "6876")]
170    balancerd_http_port: u16,
171    #[clap(long, default_value = "8080")]
172    balancerd_internal_http_port: u16,
173
174    #[clap(long, default_value = "8080")]
175    console_http_port: u16,
176
177    #[clap(long, default_value = "{}")]
178    default_certificate_specs: DefaultCertificateSpecs,
179
180    #[clap(long, hide = true)]
181    disable_license_key_checks: bool,
182}
183
184fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
185    Ok(serde_json::from_str(s)?)
186}
187
188fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
189    Ok(serde_json::from_str(s)?)
190}
191
192fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
193    Ok(serde_json::from_str(s)?)
194}
195
196#[derive(Clone, Deserialize, Default)]
197#[serde(rename_all = "camelCase")]
198pub struct DefaultCertificateSpecs {
199    balancerd_external: Option<MaterializeCertSpec>,
200    console_external: Option<MaterializeCertSpec>,
201    internal: Option<MaterializeCertSpec>,
202}
203
204impl FromStr for DefaultCertificateSpecs {
205    type Err = serde_json::Error;
206
207    fn from_str(s: &str) -> Result<Self, Self::Err> {
208        serde_json::from_str(s)
209    }
210}
211
212#[derive(clap::Parser)]
213pub struct AwsInfo {
214    #[clap(long)]
215    aws_account_id: Option<String>,
216    #[clap(long)]
217    environmentd_iam_role_arn: Option<String>,
218    #[clap(long)]
219    environmentd_connection_role_arn: Option<String>,
220    #[clap(long)]
221    aws_secrets_controller_tags: Vec<String>,
222    #[clap(long)]
223    environmentd_availability_zones: Option<Vec<String>>,
224}
225
226#[derive(clap::Parser)]
227pub struct NetworkPolicyConfig {
228    #[clap(long = "network-policies-internal-enabled")]
229    internal_enabled: bool,
230
231    #[clap(long = "network-policies-ingress-enabled")]
232    ingress_enabled: bool,
233
234    #[clap(long = "network-policies-ingress-cidrs")]
235    ingress_cidrs: Vec<String>,
236
237    #[clap(long = "network-policies-egress-enabled")]
238    egress_enabled: bool,
239
240    #[clap(long = "network-policies-egress-cidrs")]
241    egress_cidrs: Vec<String>,
242}
243
244#[derive(Debug, thiserror::Error)]
245pub enum Error {
246    Anyhow(#[from] anyhow::Error),
247    Kube(#[from] kube::Error),
248}
249
250impl Display for Error {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        match self {
253            Self::Anyhow(e) => write!(f, "{e}"),
254            Self::Kube(e) => write!(f, "{e}"),
255        }
256    }
257}
258
259pub struct Context {
260    config: MaterializeControllerArgs,
261    tracing: TracingCliArgs,
262    orchestratord_namespace: String,
263    metrics: Arc<Metrics>,
264    needs_update: Arc<Mutex<BTreeSet<String>>>,
265}
266
267impl Context {
268    pub fn new(
269        config: MaterializeControllerArgs,
270        tracing: TracingCliArgs,
271        orchestratord_namespace: String,
272        metrics: Arc<Metrics>,
273    ) -> Self {
274        if config.cloud_provider == CloudProvider::Aws {
275            assert!(
276                config.aws_info.aws_account_id.is_some(),
277                "--aws-account-id is required when using --cloud-provider=aws"
278            );
279        }
280
281        Self {
282            config,
283            tracing,
284            orchestratord_namespace,
285            metrics,
286            needs_update: Default::default(),
287        }
288    }
289
290    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
291        let mut needs_update_set = self.needs_update.lock().unwrap();
292        if needs_update {
293            needs_update_set.insert(mz.name_unchecked());
294        } else {
295            needs_update_set.remove(&mz.name_unchecked());
296        }
297        self.metrics
298            .environmentd_needs_update
299            .set(u64::cast_from(needs_update_set.len()));
300    }
301
302    async fn update_status(
303        &self,
304        mz_api: &Api<Materialize>,
305        mz: &Materialize,
306        status: MaterializeStatus,
307        needs_update: bool,
308    ) -> Result<Materialize, kube::Error> {
309        self.set_needs_update(mz, needs_update);
310
311        let mut new_mz = mz.clone();
312        if !mz
313            .status
314            .as_ref()
315            .map_or(true, |mz_status| mz_status.needs_update(&status))
316        {
317            return Ok(new_mz);
318        }
319
320        new_mz.status = Some(status);
321        mz_api
322            .replace_status(
323                &mz.name_unchecked(),
324                &PostParams::default(),
325                serde_json::to_vec(&new_mz).unwrap(),
326            )
327            .await
328    }
329
330    async fn promote(
331        &self,
332        client: &Client,
333        mz: &Materialize,
334        resources: environmentd::Resources,
335        active_generation: u64,
336        desired_generation: u64,
337        resources_hash: String,
338    ) -> Result<(), Error> {
339        resources.promote_services(client, &mz.namespace()).await?;
340        resources
341            .teardown_generation(client, mz, active_generation)
342            .await?;
343        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
344        self.update_status(
345            &mz_api,
346            mz,
347            MaterializeStatus {
348                active_generation: desired_generation,
349                last_completed_rollout_request: mz.requested_reconciliation_id(),
350                resource_id: mz.status().resource_id,
351                resources_hash,
352                conditions: vec![Condition {
353                    type_: "UpToDate".into(),
354                    status: "True".into(),
355                    last_transition_time: Time(chrono::offset::Utc::now()),
356                    message: format!(
357                        "Successfully applied changes for generation {desired_generation}"
358                    ),
359                    observed_generation: mz.meta().generation,
360                    reason: "Applied".into(),
361                }],
362            },
363            false,
364        )
365        .await?;
366        Ok(())
367    }
368}
369
370#[async_trait::async_trait]
371impl k8s_controller::Context for Context {
372    type Resource = Materialize;
373    type Error = Error;
374
375    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
376
377    #[instrument(fields(organization_name=mz.name_unchecked()))]
378    async fn apply(
379        &self,
380        client: Client,
381        mz: &Self::Resource,
382    ) -> Result<Option<Action>, Self::Error> {
383        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
384        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
385
386        let status = mz.status();
387        if mz.status.is_none() {
388            self.update_status(&mz_api, mz, status, true).await?;
389            // Updating the status should trigger a reconciliation
390            // which will include a status this time.
391            return Ok(None);
392        }
393
394        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
395        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
396            .data
397            .as_ref()
398            .and_then(|data| data.get("license_key"))
399        {
400            let license_key = validate(
401                str::from_utf8(&license_key.0)
402                    .context("invalid utf8")?
403                    .trim(),
404            )?;
405            let environment_id = license_key
406                .environment_id
407                .parse()
408                .context("invalid environment id in license key")?;
409            Some(environment_id)
410        } else {
411            None
412        };
413
414        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
415            let mut mz = mz.clone();
416            if mz.spec.request_rollout.is_nil() {
417                mz.spec.request_rollout = Uuid::new_v4();
418            }
419            if mz.spec.environment_id.is_nil() {
420                if let Some(environment_id) = license_key_environment_id {
421                    if environment_id.is_nil() {
422                        // this makes it easier to use a license key in
423                        // development with no environment id set
424                        mz.spec.environment_id = Uuid::new_v4();
425                    } else {
426                        mz.spec.environment_id = environment_id;
427                    }
428                } else {
429                    return Err(Error::Anyhow(anyhow::anyhow!(
430                        "environment_id is not set in materialize resource {}/{} but no license key was given",
431                        mz.namespace(),
432                        mz.name_unchecked()
433                    )));
434                }
435            }
436            mz_api
437                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
438                .await?;
439            // Updating the spec should also trigger a reconciliation.
440            // We can't do that as part of the above check because you can't
441            // update both the spec and the status in a single api call.
442            return Ok(None);
443        }
444
445        if let Some(environment_id) = license_key_environment_id {
446            // we still allow a nil environment id in the license key to be
447            // accepted for any provided environment id, to support cloud
448            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
449                return Err(Error::Anyhow(anyhow::anyhow!(
450                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
451                    mz.namespace(),
452                    mz.name_unchecked(),
453                    environment_id,
454                )));
455            }
456        }
457
458        // we compare the hash against the environment resources generated
459        // for the current active generation, since that's what we expect to
460        // have been applied earlier, but we don't want to use these
461        // environment resources because when we apply them, we want to apply
462        // them with data that uses the new generation
463        let active_resources = environmentd::Resources::new(
464            &self.config,
465            &self.tracing,
466            &self.orchestratord_namespace,
467            mz,
468            status.active_generation,
469        );
470        let has_current_changes = status.resources_hash != active_resources.generate_hash();
471        let active_generation = status.active_generation;
472        let next_generation = active_generation + 1;
473        let desired_generation = if has_current_changes {
474            next_generation
475        } else {
476            active_generation
477        };
478
479        // here we regenerate the environment resources using the
480        // same inputs except with an updated generation
481        let resources = environmentd::Resources::new(
482            &self.config,
483            &self.tracing,
484            &self.orchestratord_namespace,
485            mz,
486            desired_generation,
487        );
488        let resources_hash = resources.generate_hash();
489
490        let mut result = match (
491            mz.is_promoting(),
492            has_current_changes,
493            mz.rollout_requested(),
494        ) {
495            // If we're in status promoting, we MUST promote now.
496            // We don't know if we successfully promoted or not yet.
497            (true, _, _) => {
498                self.promote(
499                    &client,
500                    mz,
501                    resources,
502                    active_generation,
503                    desired_generation,
504                    resources_hash,
505                )
506                .await?;
507                Ok(None)
508            }
509            // There are changes pending, and we want to appy them.
510            (false, true, true) => {
511                // we remove the environment resources hash annotation here
512                // because if we fail halfway through applying the resources,
513                // things will be in an inconsistent state, and we don't want
514                // to allow the possibility of the user making a second
515                // change which reverts to the original state and then
516                // skipping retrying this apply, since that would leave
517                // things in a permanently inconsistent state.
518                // note that environment.spec will be empty here after
519                // replace_status, but this is fine because we already
520                // extracted all of the information we want from the spec
521                // earlier.
522                let mz = self
523                    .update_status(
524                        &mz_api,
525                        mz,
526                        MaterializeStatus {
527                            active_generation,
528                            // don't update the reconciliation id yet,
529                            // because the rollout hasn't yet completed. if
530                            // we fail later on, we want to ensure that the
531                            // rollout gets retried.
532                            last_completed_rollout_request: status.last_completed_rollout_request,
533                            resource_id: status.resource_id,
534                            resources_hash: String::new(),
535                            conditions: vec![Condition {
536                                type_: "UpToDate".into(),
537                                status: "Unknown".into(),
538                                last_transition_time: Time(chrono::offset::Utc::now()),
539                                message: format!(
540                                    "Applying changes for generation {desired_generation}"
541                                ),
542                                observed_generation: mz.meta().generation,
543                                reason: "Applying".into(),
544                            }],
545                        },
546                        active_generation != desired_generation,
547                    )
548                    .await?;
549                let mz = &mz;
550                let status = mz.status();
551
552                if mz.spec.rollout_strategy
553                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
554                {
555                    // The only reason someone would choose this strategy is if they didn't have
556                    // space for the two generations of pods.
557                    // Lets make room for the new ones by deleting the old generation.
558                    resources
559                        .teardown_generation(&client, mz, active_generation)
560                        .await?;
561                }
562
563                trace!("applying environment resources");
564                match resources
565                    .apply(&client, mz.should_force_promote(), &mz.namespace())
566                    .await
567                {
568                    Ok(Some(action)) => {
569                        trace!("new environment is not yet ready");
570                        Ok(Some(action))
571                    }
572                    Ok(None) => {
573                        // do this last, so that we keep traffic pointing at
574                        // the previous environmentd until the new one is
575                        // fully ready
576
577                        // Update the status before calling promote, so that we know
578                        // we've crossed the point of no return.
579                        // Once we see this status, we must promote without taking other actions.
580                        self.update_status(
581                            &mz_api,
582                            mz,
583                            MaterializeStatus {
584                                active_generation,
585                                // don't update the reconciliation id yet,
586                                // because the rollout hasn't yet completed. if
587                                // we fail later on, we want to ensure that the
588                                // rollout gets retried.
589                                last_completed_rollout_request: status
590                                    .last_completed_rollout_request,
591                                resource_id: status.resource_id,
592                                resources_hash: resources_hash.clone(),
593                                conditions: vec![Condition {
594                                    type_: "UpToDate".into(),
595                                    status: "Unknown".into(),
596                                    last_transition_time: Time(chrono::offset::Utc::now()),
597                                    message: format!(
598                                        "Attempting to promote generation {desired_generation}"
599                                    ),
600                                    observed_generation: mz.meta().generation,
601                                    reason: "Promoting".into(),
602                                }],
603                            },
604                            active_generation != desired_generation,
605                        )
606                        .await?;
607                        self.promote(
608                            &client,
609                            mz,
610                            resources,
611                            active_generation,
612                            desired_generation,
613                            resources_hash,
614                        )
615                        .await?;
616                        Ok(None)
617                    }
618                    Err(e) => {
619                        self.update_status(
620                            &mz_api,
621                            mz,
622                            MaterializeStatus {
623                                active_generation,
624                                // also don't update the reconciliation id
625                                // here, because there was an error during
626                                // the rollout and we want to ensure it gets
627                                // retried.
628                                last_completed_rollout_request: status.last_completed_rollout_request,
629                                resource_id: status.resource_id,
630                                resources_hash: status.resources_hash,
631                                conditions: vec![Condition {
632                                    type_: "UpToDate".into(),
633                                    status: "False".into(),
634                                    last_transition_time: Time(chrono::offset::Utc::now()),
635                                    message: format!(
636                                        "Failed to apply changes for generation {desired_generation}: {e}"
637                                    ),
638                                    observed_generation: mz.meta().generation,
639                                    reason: "FailedDeploy".into(),
640                                }],
641                            },
642                            active_generation != desired_generation,
643                        )
644                        .await?;
645                        Err(e)
646                    }
647                }
648            }
649            // There are changes pending, but we don't want to apply them yet.
650            (false, true, false) => {
651                let mut needs_update = mz.conditions_need_update();
652                if mz.update_in_progress() {
653                    resources
654                        .teardown_generation(&client, mz, next_generation)
655                        .await?;
656                    needs_update = true;
657                }
658                if needs_update {
659                    self.update_status(
660                        &mz_api,
661                        mz,
662                        MaterializeStatus {
663                            active_generation,
664                            last_completed_rollout_request: mz.requested_reconciliation_id(),
665                            resource_id: status.resource_id,
666                            resources_hash: status.resources_hash,
667                            conditions: vec![Condition {
668                                type_: "UpToDate".into(),
669                                status: "False".into(),
670                                last_transition_time: Time(chrono::offset::Utc::now()),
671                                message: format!(
672                                    "Changes detected, waiting for approval for generation {desired_generation}"
673                                ),
674                                observed_generation: mz.meta().generation,
675                                reason: "WaitingForApproval".into(),
676                            }],
677                        },
678                        active_generation != desired_generation,
679                    )
680                    .await?;
681                }
682                debug!("changes detected, waiting for approval");
683                Ok(None)
684            }
685            // No changes pending, but we might need to clean up a partially applied rollout.
686            (false, false, _) => {
687                // this can happen if we update the environment, but then revert
688                // that update before the update was deployed. in this case, we
689                // don't want the environment to still show up as
690                // WaitingForApproval.
691                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
692                if mz.update_in_progress() {
693                    resources
694                        .teardown_generation(&client, mz, next_generation)
695                        .await?;
696                    needs_update = true;
697                }
698                if needs_update {
699                    self.update_status(
700                        &mz_api,
701                        mz,
702                        MaterializeStatus {
703                            active_generation,
704                            last_completed_rollout_request: mz.requested_reconciliation_id(),
705                            resource_id: status.resource_id,
706                            resources_hash: status.resources_hash,
707                            conditions: vec![Condition {
708                                type_: "UpToDate".into(),
709                                status: "True".into(),
710                                last_transition_time: Time(chrono::offset::Utc::now()),
711                                message: format!(
712                                    "No changes found from generation {active_generation}"
713                                ),
714                                observed_generation: mz.meta().generation,
715                                reason: "Applied".into(),
716                            }],
717                        },
718                        active_generation != desired_generation,
719                    )
720                    .await?;
721                }
722                debug!("no changes");
723                Ok(None)
724            }
725        };
726
727        // balancers rely on the environmentd service existing, which is
728        // enforced by the environmentd rollout process being able to call
729        // into the promotion endpoint
730
731        if !matches!(result, Ok(None)) {
732            return result.map_err(Error::Anyhow);
733        }
734
735        let balancer = balancer::Resources::new(&self.config, mz);
736        if self.config.create_balancers {
737            result = balancer.apply(&client, &mz.namespace()).await;
738        } else {
739            result = balancer.cleanup(&client, &mz.namespace()).await;
740        }
741
742        // and the console relies on the balancer service existing, which is
743        // enforced by balancer::Resources::apply having a check for its pods
744        // being up, and not returning successfully until they are
745
746        if !matches!(result, Ok(None)) {
747            return result.map_err(Error::Anyhow);
748        }
749
750        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
751        else {
752            return Err(Error::Anyhow(anyhow::anyhow!(
753                "failed to parse environmentd image ref: {}",
754                mz.spec.environmentd_image_ref
755            )));
756        };
757        let console_image_tag = self
758            .config
759            .console_image_tag_map
760            .iter()
761            .find(|kv| kv.key == environmentd_image_tag)
762            .map(|kv| kv.value.clone())
763            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
764        let console = console::Resources::new(
765            &self.config,
766            mz,
767            &matching_image_from_environmentd_image_ref(
768                &mz.spec.environmentd_image_ref,
769                "console",
770                Some(&console_image_tag),
771            ),
772        );
773        if self.config.create_console {
774            console.apply(&client, &mz.namespace()).await?;
775        } else {
776            console.cleanup(&client, &mz.namespace()).await?;
777        }
778
779        result.map_err(Error::Anyhow)
780    }
781
782    #[instrument(fields(organization_name=mz.name_unchecked()))]
783    async fn cleanup(
784        &self,
785        _client: Client,
786        mz: &Self::Resource,
787    ) -> Result<Option<Action>, Self::Error> {
788        self.set_needs_update(mz, false);
789
790        Ok(None)
791    }
792}
793
794fn matching_image_from_environmentd_image_ref(
795    environmentd_image_ref: &str,
796    image_name: &str,
797    image_tag: Option<&str>,
798) -> String {
799    let namespace = environmentd_image_ref
800        .rsplit_once('/')
801        .unwrap_or(("materialize", ""))
802        .0;
803    let tag = image_tag.unwrap_or_else(|| {
804        environmentd_image_ref
805            .rsplit_once(':')
806            .unwrap_or(("", "unstable"))
807            .1
808    });
809    format!("{namespace}/{image_name}:{tag}")
810}