mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    sync::{Arc, Mutex},
13};
14
15use anyhow::Context as _;
16use http::HeaderValue;
17use k8s_openapi::{
18    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
19    apimachinery::pkg::apis::meta::v1::{Condition, Time},
20};
21use kube::{
22    Api, Client, Resource, ResourceExt,
23    api::PostParams,
24    runtime::{controller::Action, reflector},
25};
26use tracing::{debug, trace};
27use uuid::Uuid;
28
29use crate::{
30    Error, controller::materialize::environmentd::V161, k8s::make_reflector,
31    matching_image_from_environmentd_image_ref, metrics::Metrics, tls::DefaultCertificateSpecs,
32};
33use mz_cloud_provider::CloudProvider;
34use mz_cloud_resources::crd::materialize::v1alpha1::{
35    Materialize, MaterializeRolloutStrategy, MaterializeStatus,
36};
37use mz_license_keys::validate;
38use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
39use mz_orchestrator_tracing::TracingCliArgs;
40use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
41
42pub mod balancer;
43pub mod console;
44pub mod environmentd;
45
46pub struct Config {
47    pub cloud_provider: CloudProvider,
48    pub region: String,
49    pub create_balancers: bool,
50    pub create_console: bool,
51    pub helm_chart_version: Option<String>,
52    pub secrets_controller: String,
53    pub collect_pod_metrics: bool,
54    pub enable_prometheus_scrape_annotations: bool,
55
56    pub segment_api_key: Option<String>,
57    pub segment_client_side: bool,
58
59    pub console_image_tag_default: String,
60    pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
61
62    pub aws_account_id: Option<String>,
63    pub environmentd_iam_role_arn: Option<String>,
64    pub environmentd_connection_role_arn: Option<String>,
65    pub aws_secrets_controller_tags: Vec<String>,
66    pub environmentd_availability_zones: Option<Vec<String>>,
67
68    pub ephemeral_volume_class: Option<String>,
69    pub scheduler_name: Option<String>,
70    pub enable_security_context: bool,
71    pub enable_internal_statement_logging: bool,
72    pub disable_statement_logging: bool,
73
74    pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
75    pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
76    pub environmentd_affinity: Option<Affinity>,
77    pub environmentd_tolerations: Option<Vec<Toleration>>,
78    pub environmentd_default_resources: Option<ResourceRequirements>,
79    pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
80    pub clusterd_affinity: Option<Affinity>,
81    pub clusterd_tolerations: Option<Vec<Toleration>>,
82    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
83    pub balancerd_affinity: Option<Affinity>,
84    pub balancerd_tolerations: Option<Vec<Toleration>>,
85    pub balancerd_default_resources: Option<ResourceRequirements>,
86    pub console_node_selector: Vec<KeyValueArg<String, String>>,
87    pub console_affinity: Option<Affinity>,
88    pub console_tolerations: Option<Vec<Toleration>>,
89    pub console_default_resources: Option<ResourceRequirements>,
90    pub image_pull_policy: KubernetesImagePullPolicy,
91    pub network_policies_internal_enabled: bool,
92    pub network_policies_ingress_enabled: bool,
93    pub network_policies_ingress_cidrs: Vec<String>,
94    pub network_policies_egress_enabled: bool,
95    pub network_policies_egress_cidrs: Vec<String>,
96
97    pub environmentd_cluster_replica_sizes: Option<String>,
98    pub bootstrap_default_cluster_replica_size: Option<String>,
99    pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
100    pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
101    pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
102    pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
103    pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
104    pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
105    pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
106    pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
107    pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
108
109    pub environmentd_allowed_origins: Vec<HeaderValue>,
110    pub internal_console_proxy_url: String,
111
112    pub environmentd_sql_port: u16,
113    pub environmentd_http_port: u16,
114    pub environmentd_internal_sql_port: u16,
115    pub environmentd_internal_http_port: u16,
116    pub environmentd_internal_persist_pubsub_port: u16,
117
118    pub balancerd_sql_port: u16,
119    pub balancerd_http_port: u16,
120    pub balancerd_internal_http_port: u16,
121
122    pub console_http_port: u16,
123
124    pub default_certificate_specs: DefaultCertificateSpecs,
125
126    pub disable_license_key_checks: bool,
127
128    pub tracing: TracingCliArgs,
129    pub orchestratord_namespace: String,
130}
131
132pub struct Context {
133    config: Config,
134    metrics: Arc<Metrics>,
135    materializes: reflector::Store<Materialize>,
136    needs_update: Arc<Mutex<BTreeSet<String>>>,
137}
138
139impl Context {
140    pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
141        if config.cloud_provider == CloudProvider::Aws {
142            assert!(
143                config.aws_account_id.is_some(),
144                "--aws-account-id is required when using --cloud-provider=aws"
145            );
146        }
147
148        Self {
149            config,
150            metrics,
151            materializes: make_reflector(client.clone()).await,
152            needs_update: Default::default(),
153        }
154    }
155
156    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
157        let mut needs_update_set = self.needs_update.lock().unwrap();
158        if needs_update {
159            needs_update_set.insert(mz.name_unchecked());
160        } else {
161            needs_update_set.remove(&mz.name_unchecked());
162        }
163        self.metrics
164            .environmentd_needs_update
165            .set(u64::cast_from(needs_update_set.len()));
166    }
167
168    async fn update_status(
169        &self,
170        mz_api: &Api<Materialize>,
171        mz: &Materialize,
172        status: MaterializeStatus,
173        needs_update: bool,
174    ) -> Result<Materialize, kube::Error> {
175        self.set_needs_update(mz, needs_update);
176
177        let mut new_mz = mz.clone();
178        if !mz
179            .status
180            .as_ref()
181            .map_or(true, |mz_status| mz_status.needs_update(&status))
182        {
183            return Ok(new_mz);
184        }
185
186        new_mz.status = Some(status);
187        mz_api
188            .replace_status(
189                &mz.name_unchecked(),
190                &PostParams::default(),
191                serde_json::to_vec(&new_mz).unwrap(),
192            )
193            .await
194    }
195
196    async fn promote(
197        &self,
198        client: &Client,
199        mz: &Materialize,
200        resources: environmentd::Resources,
201        active_generation: u64,
202        desired_generation: u64,
203        resources_hash: String,
204    ) -> Result<Option<Action>, Error> {
205        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
206            return Ok(Some(action));
207        }
208        resources
209            .teardown_generation(client, mz, active_generation)
210            .await?;
211        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
212        self.update_status(
213            &mz_api,
214            mz,
215            MaterializeStatus {
216                active_generation: desired_generation,
217                last_completed_rollout_request: mz.requested_reconciliation_id(),
218                last_completed_rollout_environmentd_image_ref: Some(
219                    mz.spec.environmentd_image_ref.clone(),
220                ),
221                resource_id: mz.status().resource_id,
222                resources_hash,
223                conditions: vec![Condition {
224                    type_: "UpToDate".into(),
225                    status: "True".into(),
226                    last_transition_time: Time(chrono::offset::Utc::now()),
227                    message: format!(
228                        "Successfully applied changes for generation {desired_generation}"
229                    ),
230                    observed_generation: mz.meta().generation,
231                    reason: "Applied".into(),
232                }],
233            },
234            false,
235        )
236        .await?;
237        Ok(None)
238    }
239
240    fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
241        if mz.spec.environment_id.is_nil() {
242            // this is always a bug - we delay doing this check until the
243            // resource should have an environment id set, either from the
244            // license key, or explicitly given, or randomly defaulted.
245            return Err(Error::Anyhow(anyhow::anyhow!(
246                "trying to reconcile a materialize resource with no environment id - this is a bug!"
247            )));
248        }
249
250        for existing_mz in self.materializes.state() {
251            if existing_mz.spec.environment_id == mz.spec.environment_id
252                && existing_mz.metadata.uid != mz.metadata.uid
253            {
254                return Err(Error::Anyhow(anyhow::anyhow!(
255                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
256                    mz.namespace(),
257                    mz.name_unchecked(),
258                    existing_mz.namespace(),
259                    existing_mz.name_unchecked(),
260                )));
261            }
262        }
263
264        Ok(())
265    }
266}
267
268#[async_trait::async_trait]
269impl k8s_controller::Context for Context {
270    type Resource = Materialize;
271    type Error = Error;
272
273    const FINALIZER_NAME: Option<&'static str> =
274        Some("orchestratord.materialize.cloud/materialize");
275
276    #[instrument(fields(organization_name=mz.name_unchecked()))]
277    async fn apply(
278        &self,
279        client: Client,
280        mz: &Self::Resource,
281    ) -> Result<Option<Action>, Self::Error> {
282        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
283        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
284
285        let status = mz.status();
286        if mz.status.is_none() {
287            self.update_status(&mz_api, mz, status, true).await?;
288            // Updating the status should trigger a reconciliation
289            // which will include a status this time.
290            return Ok(None);
291        }
292
293        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
294        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
295            .data
296            .as_ref()
297            .and_then(|data| data.get("license_key"))
298        {
299            let license_key = validate(
300                str::from_utf8(&license_key.0)
301                    .context("invalid utf8")?
302                    .trim(),
303            )?;
304            let environment_id = license_key
305                .environment_id
306                .parse()
307                .context("invalid environment id in license key")?;
308            Some(environment_id)
309        } else {
310            if mz.meets_minimum_version(&V161) {
311                return Err(Error::Anyhow(anyhow::anyhow!(
312                    "license_key is required when running in kubernetes",
313                )));
314            } else {
315                None
316            }
317        };
318
319        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
320            let mut mz = mz.clone();
321            if mz.spec.request_rollout.is_nil() {
322                mz.spec.request_rollout = Uuid::new_v4();
323            }
324            if mz.spec.environment_id.is_nil() {
325                if let Some(environment_id) = license_key_environment_id {
326                    if environment_id.is_nil() {
327                        // this makes it easier to use a license key in
328                        // development with no environment id set
329                        mz.spec.environment_id = Uuid::new_v4();
330                    } else {
331                        mz.spec.environment_id = environment_id;
332                    }
333                } else {
334                    if mz.meets_minimum_version(&V161) {
335                        return Err(Error::Anyhow(anyhow::anyhow!(
336                            "environmentId is not set in materialize resource {}/{} but no license key was given",
337                            mz.namespace(),
338                            mz.name_unchecked()
339                        )));
340                    } else {
341                        mz.spec.environment_id = Uuid::new_v4();
342                    }
343                }
344            }
345            mz_api
346                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
347                .await?;
348            // Updating the spec should also trigger a reconciliation.
349            // We can't do that as part of the above check because you can't
350            // update both the spec and the status in a single api call.
351            return Ok(None);
352        }
353
354        if let Some(environment_id) = license_key_environment_id {
355            // we still allow a nil environment id in the license key to be
356            // accepted for any provided environment id, to support cloud
357            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
358                return Err(Error::Anyhow(anyhow::anyhow!(
359                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
360                    mz.namespace(),
361                    mz.name_unchecked(),
362                    environment_id,
363                )));
364            }
365        }
366
367        self.check_environment_id_conflicts(mz)?;
368
369        // we compare the hash against the environment resources generated
370        // for the current active generation, since that's what we expect to
371        // have been applied earlier, but we don't want to use these
372        // environment resources because when we apply them, we want to apply
373        // them with data that uses the new generation
374        let active_resources =
375            environmentd::Resources::new(&self.config, mz, status.active_generation);
376        let has_current_changes = status.resources_hash != active_resources.generate_hash();
377        let active_generation = status.active_generation;
378        let next_generation = active_generation + 1;
379        let desired_generation = if has_current_changes {
380            next_generation
381        } else {
382            active_generation
383        };
384
385        // here we regenerate the environment resources using the
386        // same inputs except with an updated generation
387        let resources = environmentd::Resources::new(&self.config, mz, desired_generation);
388        let resources_hash = resources.generate_hash();
389
390        let mut result = match (
391            mz.is_promoting(),
392            has_current_changes,
393            mz.rollout_requested(),
394        ) {
395            // If we're in status promoting, we MUST promote now.
396            // We don't know if we successfully promoted or not yet.
397            (true, _, _) => {
398                self.promote(
399                    &client,
400                    mz,
401                    resources,
402                    active_generation,
403                    desired_generation,
404                    resources_hash,
405                )
406                .await
407            }
408            // There are changes pending, and we want to apply them.
409            (false, true, true) => {
410                // we remove the environment resources hash annotation here
411                // because if we fail halfway through applying the resources,
412                // things will be in an inconsistent state, and we don't want
413                // to allow the possibility of the user making a second
414                // change which reverts to the original state and then
415                // skipping retrying this apply, since that would leave
416                // things in a permanently inconsistent state.
417                // note that environment.spec will be empty here after
418                // replace_status, but this is fine because we already
419                // extracted all of the information we want from the spec
420                // earlier.
421                let mz = self
422                    .update_status(
423                        &mz_api,
424                        mz,
425                        MaterializeStatus {
426                            active_generation,
427                            // don't update the reconciliation id yet,
428                            // because the rollout hasn't yet completed. if
429                            // we fail later on, we want to ensure that the
430                            // rollout gets retried.
431                            last_completed_rollout_request: status.last_completed_rollout_request,
432                            last_completed_rollout_environmentd_image_ref: status
433                                .last_completed_rollout_environmentd_image_ref,
434                            resource_id: status.resource_id,
435                            resources_hash: String::new(),
436                            conditions: vec![Condition {
437                                type_: "UpToDate".into(),
438                                status: "Unknown".into(),
439                                last_transition_time: Time(chrono::offset::Utc::now()),
440                                message: format!(
441                                    "Applying changes for generation {desired_generation}"
442                                ),
443                                observed_generation: mz.meta().generation,
444                                reason: "Applying".into(),
445                            }],
446                        },
447                        active_generation != desired_generation,
448                    )
449                    .await?;
450                let mz = &mz;
451                let status = mz.status();
452
453                if !mz.within_upgrade_window() {
454                    let last_completed_rollout_environmentd_image_ref =
455                        status.last_completed_rollout_environmentd_image_ref;
456
457                    self.update_status(
458                        &mz_api,
459                        mz,
460                        MaterializeStatus {
461                            active_generation,
462                            last_completed_rollout_request: status.last_completed_rollout_request,
463                            last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
464                            resource_id: status.resource_id,
465                            resources_hash: status.resources_hash,
466                            conditions: vec![Condition {
467                                type_: "UpToDate".into(),
468                                status: "False".into(),
469                                last_transition_time: Time(chrono::offset::Utc::now()),
470                                message: format!(
471                        "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
472                        last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
473                        &mz.spec.environmentd_image_ref,
474                    ),
475                                observed_generation: mz.meta().generation,
476                                reason: "FailedDeploy".into(),
477                            }],
478                        },
479                        active_generation != desired_generation,
480                    )
481                    .await?;
482                    return Ok(None);
483                }
484
485                if mz.spec.rollout_strategy
486                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
487                {
488                    // The only reason someone would choose this strategy is if they didn't have
489                    // space for the two generations of pods.
490                    // Lets make room for the new ones by deleting the old generation.
491                    resources
492                        .teardown_generation(&client, mz, active_generation)
493                        .await?;
494                }
495
496                trace!("applying environment resources");
497                match resources
498                    .apply(&client, mz.should_force_promote(), &mz.namespace())
499                    .await
500                {
501                    Ok(Some(action)) => {
502                        trace!("new environment is not yet ready");
503                        Ok(Some(action))
504                    }
505                    Ok(None) => {
506                        // do this last, so that we keep traffic pointing at
507                        // the previous environmentd until the new one is
508                        // fully ready
509
510                        // Update the status before calling promote, so that we know
511                        // we've crossed the point of no return.
512                        // Once we see this status, we must promote without taking other actions.
513                        self.update_status(
514                            &mz_api,
515                            mz,
516                            MaterializeStatus {
517                                active_generation,
518                                // don't update the reconciliation id yet,
519                                // because the rollout hasn't yet completed. if
520                                // we fail later on, we want to ensure that the
521                                // rollout gets retried.
522                                last_completed_rollout_request: status
523                                    .last_completed_rollout_request,
524                                last_completed_rollout_environmentd_image_ref: status
525                                    .last_completed_rollout_environmentd_image_ref,
526                                resource_id: status.resource_id,
527                                resources_hash: resources_hash.clone(),
528                                conditions: vec![Condition {
529                                    type_: "UpToDate".into(),
530                                    status: "Unknown".into(),
531                                    last_transition_time: Time(chrono::offset::Utc::now()),
532                                    message: format!(
533                                        "Attempting to promote generation {desired_generation}"
534                                    ),
535                                    observed_generation: mz.meta().generation,
536                                    reason: "Promoting".into(),
537                                }],
538                            },
539                            active_generation != desired_generation,
540                        )
541                        .await?;
542                        self.promote(
543                            &client,
544                            mz,
545                            resources,
546                            active_generation,
547                            desired_generation,
548                            resources_hash,
549                        )
550                        .await
551                    }
552                    Err(e) => {
553                        self.update_status(
554                            &mz_api,
555                            mz,
556                            MaterializeStatus {
557                                active_generation,
558                                // also don't update the reconciliation id
559                                // here, because there was an error during
560                                // the rollout and we want to ensure it gets
561                                // retried.
562                                last_completed_rollout_request: status.last_completed_rollout_request,
563                                last_completed_rollout_environmentd_image_ref: status
564                                    .last_completed_rollout_environmentd_image_ref,
565                                resource_id: status.resource_id,
566                                resources_hash: status.resources_hash,
567                                conditions: vec![Condition {
568                                    type_: "UpToDate".into(),
569                                    status: "False".into(),
570                                    last_transition_time: Time(chrono::offset::Utc::now()),
571                                    message: format!(
572                                        "Failed to apply changes for generation {desired_generation}: {e}"
573                                    ),
574                                    observed_generation: mz.meta().generation,
575                                    reason: "FailedDeploy".into(),
576                                }],
577                            },
578                            active_generation != desired_generation,
579                        )
580                        .await?;
581                        Err(e)
582                    }
583                }
584            }
585            // There are changes pending, but we don't want to apply them yet.
586            (false, true, false) => {
587                let mut needs_update = mz.conditions_need_update();
588                if mz.update_in_progress() {
589                    resources
590                        .teardown_generation(&client, mz, next_generation)
591                        .await?;
592                    needs_update = true;
593                }
594                if needs_update {
595                    self.update_status(
596                        &mz_api,
597                        mz,
598                        MaterializeStatus {
599                            active_generation,
600                            last_completed_rollout_request: mz.requested_reconciliation_id(),
601                            last_completed_rollout_environmentd_image_ref: status
602                                .last_completed_rollout_environmentd_image_ref,
603                            resource_id: status.resource_id,
604                            resources_hash: status.resources_hash,
605                            conditions: vec![Condition {
606                                type_: "UpToDate".into(),
607                                status: "False".into(),
608                                last_transition_time: Time(chrono::offset::Utc::now()),
609                                message: format!(
610                                    "Changes detected, waiting for approval for generation {desired_generation}"
611                                ),
612                                observed_generation: mz.meta().generation,
613                                reason: "WaitingForApproval".into(),
614                            }],
615                        },
616                        active_generation != desired_generation,
617                    )
618                    .await?;
619                }
620                debug!("changes detected, waiting for approval");
621                Ok(None)
622            }
623            // No changes pending, but we might need to clean up a partially applied rollout.
624            (false, false, _) => {
625                // this can happen if we update the environment, but then revert
626                // that update before the update was deployed. in this case, we
627                // don't want the environment to still show up as
628                // WaitingForApproval.
629                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
630                if mz.update_in_progress() {
631                    resources
632                        .teardown_generation(&client, mz, next_generation)
633                        .await?;
634                    needs_update = true;
635                }
636                if needs_update {
637                    self.update_status(
638                        &mz_api,
639                        mz,
640                        MaterializeStatus {
641                            active_generation,
642                            last_completed_rollout_request: mz.requested_reconciliation_id(),
643                            last_completed_rollout_environmentd_image_ref: status
644                                .last_completed_rollout_environmentd_image_ref,
645                            resource_id: status.resource_id,
646                            resources_hash: status.resources_hash,
647                            conditions: vec![Condition {
648                                type_: "UpToDate".into(),
649                                status: "True".into(),
650                                last_transition_time: Time(chrono::offset::Utc::now()),
651                                message: format!(
652                                    "No changes found from generation {active_generation}"
653                                ),
654                                observed_generation: mz.meta().generation,
655                                reason: "Applied".into(),
656                            }],
657                        },
658                        active_generation != desired_generation,
659                    )
660                    .await?;
661                }
662                debug!("no changes");
663                Ok(None)
664            }
665        }?;
666
667        if let Some(action) = result {
668            return Ok(Some(action));
669        }
670
671        // balancers rely on the environmentd service existing, which is
672        // enforced by the environmentd rollout process being able to call
673        // into the promotion endpoint
674
675        let balancer = balancer::Resources::new(&self.config, mz);
676        if self.config.create_balancers {
677            result = balancer.apply(&client, &mz.namespace()).await?;
678        } else {
679            result = balancer.cleanup(&client, &mz.namespace()).await?;
680        }
681
682        if let Some(action) = result {
683            return Ok(Some(action));
684        }
685
686        // and the console relies on the balancer service existing, which is
687        // enforced by balancer::Resources::apply having a check for its pods
688        // being up, and not returning successfully until they are
689
690        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
691        else {
692            return Err(Error::Anyhow(anyhow::anyhow!(
693                "failed to parse environmentd image ref: {}",
694                mz.spec.environmentd_image_ref
695            )));
696        };
697        let console_image_tag = self
698            .config
699            .console_image_tag_map
700            .iter()
701            .find(|kv| kv.key == environmentd_image_tag)
702            .map(|kv| kv.value.clone())
703            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
704        let console = console::Resources::new(
705            &self.config,
706            mz,
707            &matching_image_from_environmentd_image_ref(
708                &mz.spec.environmentd_image_ref,
709                "console",
710                Some(&console_image_tag),
711            ),
712        );
713        if self.config.create_console {
714            console.apply(&client, &mz.namespace()).await?;
715        } else {
716            console.cleanup(&client, &mz.namespace()).await?;
717        }
718
719        Ok(result)
720    }
721
722    #[instrument(fields(organization_name=mz.name_unchecked()))]
723    async fn cleanup(
724        &self,
725        _client: Client,
726        mz: &Self::Resource,
727    ) -> Result<Option<Action>, Self::Error> {
728        self.set_needs_update(mz, false);
729
730        Ok(None)
731    }
732}