mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    str::FromStr,
14    sync::{Arc, Mutex},
15};
16
17use anyhow::Context as _;
18use http::HeaderValue;
19use k8s_openapi::{
20    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21    apimachinery::pkg::apis::meta::v1::{Condition, Time},
22};
23use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
24use serde::Deserialize;
25use tracing::{debug, trace};
26use uuid::Uuid;
27
28use crate::{controller::materialize::environmentd::V161, metrics::Metrics};
29use mz_cloud_provider::CloudProvider;
30use mz_cloud_resources::crd::materialize::v1alpha1::{
31    Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
32};
33use mz_license_keys::validate;
34use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
35use mz_orchestrator_tracing::TracingCliArgs;
36use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
37
38pub mod balancer;
39pub mod console;
40pub mod environmentd;
41pub mod tls;
42
43#[derive(clap::Parser)]
44pub struct MaterializeControllerArgs {
45    #[clap(long)]
46    cloud_provider: CloudProvider,
47    #[clap(long)]
48    region: String,
49    #[clap(long)]
50    create_balancers: bool,
51    #[clap(long)]
52    create_console: bool,
53    #[clap(long)]
54    helm_chart_version: Option<String>,
55    #[clap(long, default_value = "kubernetes")]
56    secrets_controller: String,
57    #[clap(long)]
58    collect_pod_metrics: bool,
59    #[clap(long)]
60    enable_prometheus_scrape_annotations: bool,
61    #[clap(long)]
62    disable_authentication: bool,
63
64    #[clap(long)]
65    segment_api_key: Option<String>,
66    #[clap(long)]
67    segment_client_side: bool,
68
69    #[clap(long)]
70    console_image_tag_default: String,
71    #[clap(long)]
72    console_image_tag_map: Vec<KeyValueArg<String, String>>,
73
74    #[clap(flatten)]
75    aws_info: AwsInfo,
76
77    #[clap(long)]
78    ephemeral_volume_class: Option<String>,
79    #[clap(long)]
80    scheduler_name: Option<String>,
81    #[clap(long)]
82    enable_security_context: bool,
83    #[clap(long)]
84    enable_internal_statement_logging: bool,
85    #[clap(long, default_value = "false")]
86    disable_statement_logging: bool,
87
88    #[clap(long)]
89    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
90    #[clap(long)]
91    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
92    #[clap(long, value_parser = parse_affinity)]
93    environmentd_affinity: Option<Affinity>,
94    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
95    environmentd_tolerations: Option<Vec<Toleration>>,
96    #[clap(long, value_parser = parse_resources)]
97    environmentd_default_resources: Option<ResourceRequirements>,
98    #[clap(long)]
99    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
100    #[clap(long, value_parser = parse_affinity)]
101    clusterd_affinity: Option<Affinity>,
102    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
103    clusterd_tolerations: Option<Vec<Toleration>>,
104    #[clap(long)]
105    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
106    #[clap(long, value_parser = parse_affinity)]
107    balancerd_affinity: Option<Affinity>,
108    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
109    balancerd_tolerations: Option<Vec<Toleration>>,
110    #[clap(long, value_parser = parse_resources)]
111    balancerd_default_resources: Option<ResourceRequirements>,
112    #[clap(long)]
113    console_node_selector: Vec<KeyValueArg<String, String>>,
114    #[clap(long, value_parser = parse_affinity)]
115    console_affinity: Option<Affinity>,
116    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
117    console_tolerations: Option<Vec<Toleration>>,
118    #[clap(long, value_parser = parse_resources)]
119    console_default_resources: Option<ResourceRequirements>,
120    #[clap(long, default_value = "always", value_enum)]
121    image_pull_policy: KubernetesImagePullPolicy,
122    #[clap(flatten)]
123    network_policies: NetworkPolicyConfig,
124
125    #[clap(long)]
126    environmentd_cluster_replica_sizes: Option<String>,
127    #[clap(long)]
128    bootstrap_default_cluster_replica_size: Option<String>,
129    #[clap(long)]
130    bootstrap_builtin_system_cluster_replica_size: Option<String>,
131    #[clap(long)]
132    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
133    #[clap(long)]
134    bootstrap_builtin_support_cluster_replica_size: Option<String>,
135    #[clap(long)]
136    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
137    #[clap(long)]
138    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
139    #[clap(long)]
140    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
141    #[clap(long)]
142    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
143    #[clap(long)]
144    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
145    #[clap(long)]
146    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
147
148    #[clap(
149        long,
150        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
151    )]
152    environmentd_allowed_origins: Vec<HeaderValue>,
153    #[clap(long, default_value = "https://console.materialize.com")]
154    internal_console_proxy_url: String,
155
156    #[clap(long, default_value = "6875")]
157    environmentd_sql_port: u16,
158    #[clap(long, default_value = "6876")]
159    environmentd_http_port: u16,
160    #[clap(long, default_value = "6877")]
161    environmentd_internal_sql_port: u16,
162    #[clap(long, default_value = "6878")]
163    environmentd_internal_http_port: u16,
164    #[clap(long, default_value = "6879")]
165    environmentd_internal_persist_pubsub_port: u16,
166
167    #[clap(long, default_value = "6875")]
168    balancerd_sql_port: u16,
169    #[clap(long, default_value = "6876")]
170    balancerd_http_port: u16,
171    #[clap(long, default_value = "8080")]
172    balancerd_internal_http_port: u16,
173
174    #[clap(long, default_value = "8080")]
175    console_http_port: u16,
176
177    #[clap(long, default_value = "{}")]
178    default_certificate_specs: DefaultCertificateSpecs,
179
180    #[clap(long, hide = true)]
181    disable_license_key_checks: bool,
182}
183
184fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
185    Ok(serde_json::from_str(s)?)
186}
187
188fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
189    Ok(serde_json::from_str(s)?)
190}
191
192fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
193    Ok(serde_json::from_str(s)?)
194}
195
196#[derive(Clone, Deserialize, Default)]
197#[serde(rename_all = "camelCase")]
198pub struct DefaultCertificateSpecs {
199    balancerd_external: Option<MaterializeCertSpec>,
200    console_external: Option<MaterializeCertSpec>,
201    internal: Option<MaterializeCertSpec>,
202}
203
204impl FromStr for DefaultCertificateSpecs {
205    type Err = serde_json::Error;
206
207    fn from_str(s: &str) -> Result<Self, Self::Err> {
208        serde_json::from_str(s)
209    }
210}
211
212#[derive(clap::Parser)]
213pub struct AwsInfo {
214    #[clap(long)]
215    aws_account_id: Option<String>,
216    #[clap(long)]
217    environmentd_iam_role_arn: Option<String>,
218    #[clap(long)]
219    environmentd_connection_role_arn: Option<String>,
220    #[clap(long)]
221    aws_secrets_controller_tags: Vec<String>,
222    #[clap(long)]
223    environmentd_availability_zones: Option<Vec<String>>,
224}
225
226#[derive(clap::Parser)]
227pub struct NetworkPolicyConfig {
228    #[clap(long = "network-policies-internal-enabled")]
229    internal_enabled: bool,
230
231    #[clap(long = "network-policies-ingress-enabled")]
232    ingress_enabled: bool,
233
234    #[clap(long = "network-policies-ingress-cidrs")]
235    ingress_cidrs: Vec<String>,
236
237    #[clap(long = "network-policies-egress-enabled")]
238    egress_enabled: bool,
239
240    #[clap(long = "network-policies-egress-cidrs")]
241    egress_cidrs: Vec<String>,
242}
243
244#[derive(Debug, thiserror::Error)]
245pub enum Error {
246    Anyhow(#[from] anyhow::Error),
247    Kube(#[from] kube::Error),
248    Reqwest(#[from] reqwest::Error),
249}
250
251impl Display for Error {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            Self::Anyhow(e) => write!(f, "{e}"),
255            Self::Kube(e) => write!(f, "{e}"),
256            Self::Reqwest(e) => write!(f, "{e}"),
257        }
258    }
259}
260
261pub struct Context {
262    config: MaterializeControllerArgs,
263    tracing: TracingCliArgs,
264    orchestratord_namespace: String,
265    metrics: Arc<Metrics>,
266    needs_update: Arc<Mutex<BTreeSet<String>>>,
267}
268
269impl Context {
270    pub fn new(
271        config: MaterializeControllerArgs,
272        tracing: TracingCliArgs,
273        orchestratord_namespace: String,
274        metrics: Arc<Metrics>,
275    ) -> Self {
276        if config.cloud_provider == CloudProvider::Aws {
277            assert!(
278                config.aws_info.aws_account_id.is_some(),
279                "--aws-account-id is required when using --cloud-provider=aws"
280            );
281        }
282
283        Self {
284            config,
285            tracing,
286            orchestratord_namespace,
287            metrics,
288            needs_update: Default::default(),
289        }
290    }
291
292    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
293        let mut needs_update_set = self.needs_update.lock().unwrap();
294        if needs_update {
295            needs_update_set.insert(mz.name_unchecked());
296        } else {
297            needs_update_set.remove(&mz.name_unchecked());
298        }
299        self.metrics
300            .environmentd_needs_update
301            .set(u64::cast_from(needs_update_set.len()));
302    }
303
304    async fn update_status(
305        &self,
306        mz_api: &Api<Materialize>,
307        mz: &Materialize,
308        status: MaterializeStatus,
309        needs_update: bool,
310    ) -> Result<Materialize, kube::Error> {
311        self.set_needs_update(mz, needs_update);
312
313        let mut new_mz = mz.clone();
314        if !mz
315            .status
316            .as_ref()
317            .map_or(true, |mz_status| mz_status.needs_update(&status))
318        {
319            return Ok(new_mz);
320        }
321
322        new_mz.status = Some(status);
323        mz_api
324            .replace_status(
325                &mz.name_unchecked(),
326                &PostParams::default(),
327                serde_json::to_vec(&new_mz).unwrap(),
328            )
329            .await
330    }
331
332    async fn promote(
333        &self,
334        client: &Client,
335        mz: &Materialize,
336        resources: environmentd::Resources,
337        active_generation: u64,
338        desired_generation: u64,
339        resources_hash: String,
340    ) -> Result<Option<Action>, Error> {
341        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
342            return Ok(Some(action));
343        }
344        resources
345            .teardown_generation(client, mz, active_generation)
346            .await?;
347        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
348        self.update_status(
349            &mz_api,
350            mz,
351            MaterializeStatus {
352                active_generation: desired_generation,
353                last_completed_rollout_request: mz.requested_reconciliation_id(),
354                resource_id: mz.status().resource_id,
355                resources_hash,
356                conditions: vec![Condition {
357                    type_: "UpToDate".into(),
358                    status: "True".into(),
359                    last_transition_time: Time(chrono::offset::Utc::now()),
360                    message: format!(
361                        "Successfully applied changes for generation {desired_generation}"
362                    ),
363                    observed_generation: mz.meta().generation,
364                    reason: "Applied".into(),
365                }],
366            },
367            false,
368        )
369        .await?;
370        Ok(None)
371    }
372}
373
374#[async_trait::async_trait]
375impl k8s_controller::Context for Context {
376    type Resource = Materialize;
377    type Error = Error;
378
379    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
380
381    #[instrument(fields(organization_name=mz.name_unchecked()))]
382    async fn apply(
383        &self,
384        client: Client,
385        mz: &Self::Resource,
386    ) -> Result<Option<Action>, Self::Error> {
387        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
388        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
389
390        let status = mz.status();
391        if mz.status.is_none() {
392            self.update_status(&mz_api, mz, status, true).await?;
393            // Updating the status should trigger a reconciliation
394            // which will include a status this time.
395            return Ok(None);
396        }
397
398        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
399        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
400            .data
401            .as_ref()
402            .and_then(|data| data.get("license_key"))
403        {
404            let license_key = validate(
405                str::from_utf8(&license_key.0)
406                    .context("invalid utf8")?
407                    .trim(),
408            )?;
409            let environment_id = license_key
410                .environment_id
411                .parse()
412                .context("invalid environment id in license key")?;
413            Some(environment_id)
414        } else {
415            if mz.meets_minimum_version(&V161) {
416                return Err(Error::Anyhow(anyhow::anyhow!(
417                    "license_key is required when running in kubernetes",
418                )));
419            } else {
420                None
421            }
422        };
423
424        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
425            let mut mz = mz.clone();
426            if mz.spec.request_rollout.is_nil() {
427                mz.spec.request_rollout = Uuid::new_v4();
428            }
429            if mz.spec.environment_id.is_nil() {
430                if let Some(environment_id) = license_key_environment_id {
431                    if environment_id.is_nil() {
432                        // this makes it easier to use a license key in
433                        // development with no environment id set
434                        mz.spec.environment_id = Uuid::new_v4();
435                    } else {
436                        mz.spec.environment_id = environment_id;
437                    }
438                } else {
439                    if mz.meets_minimum_version(&V161) {
440                        return Err(Error::Anyhow(anyhow::anyhow!(
441                            "environmentId is not set in materialize resource {}/{} but no license key was given",
442                            mz.namespace(),
443                            mz.name_unchecked()
444                        )));
445                    } else {
446                        mz.spec.environment_id = Uuid::new_v4();
447                    }
448                }
449            }
450            mz_api
451                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
452                .await?;
453            // Updating the spec should also trigger a reconciliation.
454            // We can't do that as part of the above check because you can't
455            // update both the spec and the status in a single api call.
456            return Ok(None);
457        }
458
459        if let Some(environment_id) = license_key_environment_id {
460            // we still allow a nil environment id in the license key to be
461            // accepted for any provided environment id, to support cloud
462            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
463                return Err(Error::Anyhow(anyhow::anyhow!(
464                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
465                    mz.namespace(),
466                    mz.name_unchecked(),
467                    environment_id,
468                )));
469            }
470        }
471
472        // we compare the hash against the environment resources generated
473        // for the current active generation, since that's what we expect to
474        // have been applied earlier, but we don't want to use these
475        // environment resources because when we apply them, we want to apply
476        // them with data that uses the new generation
477        let active_resources = environmentd::Resources::new(
478            &self.config,
479            &self.tracing,
480            &self.orchestratord_namespace,
481            mz,
482            status.active_generation,
483        );
484        let has_current_changes = status.resources_hash != active_resources.generate_hash();
485        let active_generation = status.active_generation;
486        let next_generation = active_generation + 1;
487        let desired_generation = if has_current_changes {
488            next_generation
489        } else {
490            active_generation
491        };
492
493        // here we regenerate the environment resources using the
494        // same inputs except with an updated generation
495        let resources = environmentd::Resources::new(
496            &self.config,
497            &self.tracing,
498            &self.orchestratord_namespace,
499            mz,
500            desired_generation,
501        );
502        let resources_hash = resources.generate_hash();
503
504        let mut result = match (
505            mz.is_promoting(),
506            has_current_changes,
507            mz.rollout_requested(),
508        ) {
509            // If we're in status promoting, we MUST promote now.
510            // We don't know if we successfully promoted or not yet.
511            (true, _, _) => {
512                self.promote(
513                    &client,
514                    mz,
515                    resources,
516                    active_generation,
517                    desired_generation,
518                    resources_hash,
519                )
520                .await
521            }
522            // There are changes pending, and we want to appy them.
523            (false, true, true) => {
524                // we remove the environment resources hash annotation here
525                // because if we fail halfway through applying the resources,
526                // things will be in an inconsistent state, and we don't want
527                // to allow the possibility of the user making a second
528                // change which reverts to the original state and then
529                // skipping retrying this apply, since that would leave
530                // things in a permanently inconsistent state.
531                // note that environment.spec will be empty here after
532                // replace_status, but this is fine because we already
533                // extracted all of the information we want from the spec
534                // earlier.
535                let mz = self
536                    .update_status(
537                        &mz_api,
538                        mz,
539                        MaterializeStatus {
540                            active_generation,
541                            // don't update the reconciliation id yet,
542                            // because the rollout hasn't yet completed. if
543                            // we fail later on, we want to ensure that the
544                            // rollout gets retried.
545                            last_completed_rollout_request: status.last_completed_rollout_request,
546                            resource_id: status.resource_id,
547                            resources_hash: String::new(),
548                            conditions: vec![Condition {
549                                type_: "UpToDate".into(),
550                                status: "Unknown".into(),
551                                last_transition_time: Time(chrono::offset::Utc::now()),
552                                message: format!(
553                                    "Applying changes for generation {desired_generation}"
554                                ),
555                                observed_generation: mz.meta().generation,
556                                reason: "Applying".into(),
557                            }],
558                        },
559                        active_generation != desired_generation,
560                    )
561                    .await?;
562                let mz = &mz;
563                let status = mz.status();
564
565                if mz.spec.rollout_strategy
566                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
567                {
568                    // The only reason someone would choose this strategy is if they didn't have
569                    // space for the two generations of pods.
570                    // Lets make room for the new ones by deleting the old generation.
571                    resources
572                        .teardown_generation(&client, mz, active_generation)
573                        .await?;
574                }
575
576                trace!("applying environment resources");
577                match resources
578                    .apply(&client, mz.should_force_promote(), &mz.namespace())
579                    .await
580                {
581                    Ok(Some(action)) => {
582                        trace!("new environment is not yet ready");
583                        Ok(Some(action))
584                    }
585                    Ok(None) => {
586                        // do this last, so that we keep traffic pointing at
587                        // the previous environmentd until the new one is
588                        // fully ready
589
590                        // Update the status before calling promote, so that we know
591                        // we've crossed the point of no return.
592                        // Once we see this status, we must promote without taking other actions.
593                        self.update_status(
594                            &mz_api,
595                            mz,
596                            MaterializeStatus {
597                                active_generation,
598                                // don't update the reconciliation id yet,
599                                // because the rollout hasn't yet completed. if
600                                // we fail later on, we want to ensure that the
601                                // rollout gets retried.
602                                last_completed_rollout_request: status
603                                    .last_completed_rollout_request,
604                                resource_id: status.resource_id,
605                                resources_hash: resources_hash.clone(),
606                                conditions: vec![Condition {
607                                    type_: "UpToDate".into(),
608                                    status: "Unknown".into(),
609                                    last_transition_time: Time(chrono::offset::Utc::now()),
610                                    message: format!(
611                                        "Attempting to promote generation {desired_generation}"
612                                    ),
613                                    observed_generation: mz.meta().generation,
614                                    reason: "Promoting".into(),
615                                }],
616                            },
617                            active_generation != desired_generation,
618                        )
619                        .await?;
620                        self.promote(
621                            &client,
622                            mz,
623                            resources,
624                            active_generation,
625                            desired_generation,
626                            resources_hash,
627                        )
628                        .await
629                    }
630                    Err(e) => {
631                        self.update_status(
632                            &mz_api,
633                            mz,
634                            MaterializeStatus {
635                                active_generation,
636                                // also don't update the reconciliation id
637                                // here, because there was an error during
638                                // the rollout and we want to ensure it gets
639                                // retried.
640                                last_completed_rollout_request: status.last_completed_rollout_request,
641                                resource_id: status.resource_id,
642                                resources_hash: status.resources_hash,
643                                conditions: vec![Condition {
644                                    type_: "UpToDate".into(),
645                                    status: "False".into(),
646                                    last_transition_time: Time(chrono::offset::Utc::now()),
647                                    message: format!(
648                                        "Failed to apply changes for generation {desired_generation}: {e}"
649                                    ),
650                                    observed_generation: mz.meta().generation,
651                                    reason: "FailedDeploy".into(),
652                                }],
653                            },
654                            active_generation != desired_generation,
655                        )
656                        .await?;
657                        Err(e)
658                    }
659                }
660            }
661            // There are changes pending, but we don't want to apply them yet.
662            (false, true, false) => {
663                let mut needs_update = mz.conditions_need_update();
664                if mz.update_in_progress() {
665                    resources
666                        .teardown_generation(&client, mz, next_generation)
667                        .await?;
668                    needs_update = true;
669                }
670                if needs_update {
671                    self.update_status(
672                        &mz_api,
673                        mz,
674                        MaterializeStatus {
675                            active_generation,
676                            last_completed_rollout_request: mz.requested_reconciliation_id(),
677                            resource_id: status.resource_id,
678                            resources_hash: status.resources_hash,
679                            conditions: vec![Condition {
680                                type_: "UpToDate".into(),
681                                status: "False".into(),
682                                last_transition_time: Time(chrono::offset::Utc::now()),
683                                message: format!(
684                                    "Changes detected, waiting for approval for generation {desired_generation}"
685                                ),
686                                observed_generation: mz.meta().generation,
687                                reason: "WaitingForApproval".into(),
688                            }],
689                        },
690                        active_generation != desired_generation,
691                    )
692                    .await?;
693                }
694                debug!("changes detected, waiting for approval");
695                Ok(None)
696            }
697            // No changes pending, but we might need to clean up a partially applied rollout.
698            (false, false, _) => {
699                // this can happen if we update the environment, but then revert
700                // that update before the update was deployed. in this case, we
701                // don't want the environment to still show up as
702                // WaitingForApproval.
703                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
704                if mz.update_in_progress() {
705                    resources
706                        .teardown_generation(&client, mz, next_generation)
707                        .await?;
708                    needs_update = true;
709                }
710                if needs_update {
711                    self.update_status(
712                        &mz_api,
713                        mz,
714                        MaterializeStatus {
715                            active_generation,
716                            last_completed_rollout_request: mz.requested_reconciliation_id(),
717                            resource_id: status.resource_id,
718                            resources_hash: status.resources_hash,
719                            conditions: vec![Condition {
720                                type_: "UpToDate".into(),
721                                status: "True".into(),
722                                last_transition_time: Time(chrono::offset::Utc::now()),
723                                message: format!(
724                                    "No changes found from generation {active_generation}"
725                                ),
726                                observed_generation: mz.meta().generation,
727                                reason: "Applied".into(),
728                            }],
729                        },
730                        active_generation != desired_generation,
731                    )
732                    .await?;
733                }
734                debug!("no changes");
735                Ok(None)
736            }
737        }?;
738
739        if let Some(action) = result {
740            return Ok(Some(action));
741        }
742
743        // balancers rely on the environmentd service existing, which is
744        // enforced by the environmentd rollout process being able to call
745        // into the promotion endpoint
746
747        let balancer = balancer::Resources::new(&self.config, mz);
748        if self.config.create_balancers {
749            result = balancer.apply(&client, &mz.namespace()).await?;
750        } else {
751            result = balancer.cleanup(&client, &mz.namespace()).await?;
752        }
753
754        if let Some(action) = result {
755            return Ok(Some(action));
756        }
757
758        // and the console relies on the balancer service existing, which is
759        // enforced by balancer::Resources::apply having a check for its pods
760        // being up, and not returning successfully until they are
761
762        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
763        else {
764            return Err(Error::Anyhow(anyhow::anyhow!(
765                "failed to parse environmentd image ref: {}",
766                mz.spec.environmentd_image_ref
767            )));
768        };
769        let console_image_tag = self
770            .config
771            .console_image_tag_map
772            .iter()
773            .find(|kv| kv.key == environmentd_image_tag)
774            .map(|kv| kv.value.clone())
775            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
776        let console = console::Resources::new(
777            &self.config,
778            mz,
779            &matching_image_from_environmentd_image_ref(
780                &mz.spec.environmentd_image_ref,
781                "console",
782                Some(&console_image_tag),
783            ),
784        );
785        if self.config.create_console {
786            console.apply(&client, &mz.namespace()).await?;
787        } else {
788            console.cleanup(&client, &mz.namespace()).await?;
789        }
790
791        Ok(result)
792    }
793
794    #[instrument(fields(organization_name=mz.name_unchecked()))]
795    async fn cleanup(
796        &self,
797        _client: Client,
798        mz: &Self::Resource,
799    ) -> Result<Option<Action>, Self::Error> {
800        self.set_needs_update(mz, false);
801
802        Ok(None)
803    }
804}
805
806fn matching_image_from_environmentd_image_ref(
807    environmentd_image_ref: &str,
808    image_name: &str,
809    image_tag: Option<&str>,
810) -> String {
811    let namespace = environmentd_image_ref
812        .rsplit_once('/')
813        .unwrap_or(("materialize", ""))
814        .0;
815    let tag = image_tag.unwrap_or_else(|| {
816        environmentd_image_ref
817            .rsplit_once(':')
818            .unwrap_or(("", "unstable"))
819            .1
820    });
821    format!("{namespace}/{image_name}:{tag}")
822}