Skip to main content

mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    sync::{Arc, Mutex},
13    time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_controller::TraceMetadata;
19use k8s_openapi::{
20    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21    apimachinery::pkg::apis::meta::v1::{Condition, Time},
22    jiff::{SignedDuration, Timestamp},
23};
24use kube::{
25    Api, Client, Resource, ResourceExt,
26    api::{ListParams, PostParams},
27    runtime::controller::Action,
28};
29use tracing::{debug, trace, warn};
30use uuid::Uuid;
31
32use crate::{
33    Error,
34    controller::materialize::generation::V161,
35    k8s::{apply_resource, delete_resource},
36    matching_image_from_environmentd_image_ref,
37    metrics::Metrics,
38    parse_image_tag,
39    tls::{DefaultCertificateSpecs, issuer_ref_defined, resolved_dns_names},
40};
41use mz_cloud_provider::CloudProvider;
42use mz_cloud_resources::crd::{
43    ManagedResource,
44    balancer::v1alpha1::{Balancer, BalancerSpec},
45    console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
46    materialize::MaterializeRolloutStrategy,
47    materialize::v1alpha1::{Materialize, MaterializeStatus},
48};
49use mz_license_keys::validate;
50use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
51use mz_orchestrator_tracing::TracingCliArgs;
52use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
53
54pub mod generation;
55pub mod global;
56
57pub struct Config {
58    pub cloud_provider: CloudProvider,
59    pub region: String,
60    pub create_balancers: bool,
61    pub create_console: bool,
62    pub helm_chart_version: Option<String>,
63    pub secrets_controller: String,
64    pub collect_pod_metrics: bool,
65    pub enable_prometheus_scrape_annotations: bool,
66
67    pub segment_api_key: Option<String>,
68    pub segment_client_side: bool,
69
70    pub console_image_tag_default: String,
71    pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
72
73    pub aws_account_id: Option<String>,
74    pub environmentd_iam_role_arn: Option<String>,
75    pub environmentd_connection_role_arn: Option<String>,
76    pub aws_secrets_controller_tags: Vec<String>,
77    pub environmentd_availability_zones: Option<Vec<String>>,
78
79    pub ephemeral_volume_class: Option<String>,
80    pub scheduler_name: Option<String>,
81    pub enable_security_context: bool,
82    pub enable_internal_statement_logging: bool,
83    pub disable_statement_logging: bool,
84
85    pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
86    pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
87    pub environmentd_affinity: Option<Affinity>,
88    pub environmentd_tolerations: Option<Vec<Toleration>>,
89    pub environmentd_default_resources: Option<ResourceRequirements>,
90    pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
91    pub clusterd_affinity: Option<Affinity>,
92    pub clusterd_tolerations: Option<Vec<Toleration>>,
93    pub image_pull_policy: KubernetesImagePullPolicy,
94    pub network_policies_internal_enabled: bool,
95    pub network_policies_ingress_enabled: bool,
96    pub network_policies_ingress_cidrs: Vec<String>,
97    pub network_policies_egress_enabled: bool,
98    pub network_policies_egress_cidrs: Vec<String>,
99
100    pub environmentd_cluster_replica_sizes: Option<String>,
101    pub bootstrap_default_cluster_replica_size: Option<String>,
102    pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
103    pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
104    pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
105    pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
106    pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
107    pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
108    pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
109    pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
110    pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
111
112    pub environmentd_allowed_origins: Vec<HeaderValue>,
113    pub internal_console_proxy_url: String,
114
115    pub environmentd_sql_port: u16,
116    pub environmentd_http_port: u16,
117    pub environmentd_internal_sql_port: u16,
118    pub environmentd_internal_http_port: u16,
119    pub environmentd_internal_persist_pubsub_port: u16,
120
121    pub default_certificate_specs: DefaultCertificateSpecs,
122
123    pub disable_license_key_checks: bool,
124
125    pub tracing: TracingCliArgs,
126    pub orchestratord_namespace: String,
127}
128
129pub struct Context {
130    config: Config,
131    metrics: Arc<Metrics>,
132    needs_update: Arc<Mutex<BTreeSet<String>>>,
133}
134
135impl Context {
136    pub fn new(config: Config, metrics: Arc<Metrics>) -> Self {
137        if config.cloud_provider == CloudProvider::Aws {
138            assert!(
139                config.aws_account_id.is_some(),
140                "--aws-account-id is required when using --cloud-provider=aws"
141            );
142        }
143
144        Self {
145            config,
146            metrics,
147            needs_update: Default::default(),
148        }
149    }
150
151    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
152        let mut needs_update_set = self.needs_update.lock().unwrap();
153        if needs_update {
154            needs_update_set.insert(mz.name_unchecked());
155        } else {
156            needs_update_set.remove(&mz.name_unchecked());
157        }
158        self.metrics
159            .environmentd_needs_update
160            .set(u64::cast_from(needs_update_set.len()));
161    }
162
163    async fn update_status(
164        &self,
165        mz_api: &Api<Materialize>,
166        mz: &Materialize,
167        status: MaterializeStatus,
168        needs_update: bool,
169    ) -> Result<Materialize, kube::Error> {
170        self.set_needs_update(mz, needs_update);
171
172        let mut new_mz = mz.clone();
173        if !mz
174            .status
175            .as_ref()
176            .map_or(true, |mz_status| mz_status.needs_update(&status))
177        {
178            return Ok(new_mz);
179        }
180
181        new_mz.status = Some(status);
182        mz_api
183            .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
184            .await
185    }
186
187    async fn promote(
188        &self,
189        client: &Client,
190        mz: &Materialize,
191        resources: generation::Resources,
192        active_generation: u64,
193        desired_generation: u64,
194        resources_hash: String,
195    ) -> Result<Option<Action>, Error> {
196        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
197            return Ok(Some(action));
198        }
199        resources
200            .teardown_generation(client, mz, active_generation)
201            .await?;
202        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
203        self.update_status(
204            &mz_api,
205            mz,
206            MaterializeStatus {
207                active_generation: desired_generation,
208                last_completed_rollout_request: mz.requested_reconciliation_id(),
209                last_completed_rollout_environmentd_image_ref: Some(
210                    mz.spec.environmentd_image_ref.clone(),
211                ),
212                resource_id: mz.status().resource_id,
213                resources_hash,
214                last_completed_rollout_hash: None,
215                conditions: vec![Condition {
216                    type_: "UpToDate".into(),
217                    status: "True".into(),
218                    last_transition_time: Time(Timestamp::now()),
219                    message: format!(
220                        "Successfully applied changes for generation {desired_generation}"
221                    ),
222                    observed_generation: mz.meta().generation,
223                    reason: "Applied".into(),
224                }],
225            },
226            false,
227        )
228        .await?;
229        Ok(None)
230    }
231
232    async fn check_environment_id_conflicts(
233        &self,
234        client: &Client,
235        mz: &Materialize,
236    ) -> Result<(), Error> {
237        if mz.spec.environment_id.is_nil() {
238            // this is always a bug - we delay doing this check until the
239            // resource should have an environment id set, either from the
240            // license key, or explicitly given, or randomly defaulted.
241            return Err(Error::Anyhow(anyhow::anyhow!(
242                "trying to reconcile a materialize resource with no environment id - this is a bug!"
243            )));
244        }
245
246        let mz_api: Api<Materialize> = Api::all(client.clone());
247        let all_mz = mz_api.list(&ListParams::default()).await?;
248        for existing_mz in &all_mz.items {
249            if existing_mz.spec.environment_id == mz.spec.environment_id
250                && existing_mz.metadata.uid != mz.metadata.uid
251            {
252                return Err(Error::Anyhow(anyhow::anyhow!(
253                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
254                    mz.namespace(),
255                    mz.name_unchecked(),
256                    existing_mz.namespace(),
257                    existing_mz.name_unchecked(),
258                )));
259            }
260        }
261
262        Ok(())
263    }
264}
265
266#[async_trait::async_trait]
267impl k8s_controller::Context for Context {
268    type Resource = Materialize;
269    type Error = Error;
270
271    const FINALIZER_NAME: Option<&'static str> =
272        Some("orchestratord.materialize.cloud/materialize");
273
274    #[instrument(fields(organization_name=mz.name_unchecked()))]
275    async fn apply(
276        &self,
277        client: Client,
278        mz: &Self::Resource,
279        _metadata: &mut TraceMetadata,
280    ) -> Result<Option<Action>, Self::Error> {
281        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
282        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
283        let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
284        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
285
286        let status = mz.status();
287        if mz.status.is_none() {
288            self.update_status(&mz_api, mz, status, true).await?;
289            // Updating the status should trigger a reconciliation
290            // which will include a status this time.
291            return Ok(None);
292        }
293
294        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
295        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
296            .data
297            .as_ref()
298            .and_then(|data| data.get("license_key"))
299        {
300            let license_key = validate(
301                str::from_utf8(&license_key.0)
302                    .context("invalid utf8")?
303                    .trim(),
304            )?;
305            let environment_id = license_key
306                .environment_id
307                .parse()
308                .context("invalid environment id in license key")?;
309            Some(environment_id)
310        } else {
311            if mz.meets_minimum_version(&V161) {
312                return Err(Error::Anyhow(anyhow::anyhow!(
313                    "license_key is required when running in kubernetes",
314                )));
315            } else {
316                None
317            }
318        };
319
320        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
321            let mut mz = mz.clone();
322            if mz.spec.request_rollout.is_nil() {
323                mz.spec.request_rollout = Uuid::new_v4();
324            }
325            if mz.spec.environment_id.is_nil() {
326                if let Some(environment_id) = license_key_environment_id {
327                    if environment_id.is_nil() {
328                        // this makes it easier to use a license key in
329                        // development with no environment id set
330                        mz.spec.environment_id = Uuid::new_v4();
331                    } else {
332                        mz.spec.environment_id = environment_id;
333                    }
334                } else {
335                    if mz.meets_minimum_version(&V161) {
336                        return Err(Error::Anyhow(anyhow::anyhow!(
337                            "environmentId is not set in materialize resource {}/{} but no license key was given",
338                            mz.namespace(),
339                            mz.name_unchecked()
340                        )));
341                    } else {
342                        mz.spec.environment_id = Uuid::new_v4();
343                    }
344                }
345            }
346            mz_api
347                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
348                .await?;
349            // Updating the spec should also trigger a reconciliation.
350            // We can't do that as part of the above check because you can't
351            // update both the spec and the status in a single api call.
352            return Ok(None);
353        }
354
355        if let Some(environment_id) = license_key_environment_id {
356            // we still allow a nil environment id in the license key to be
357            // accepted for any provided environment id, to support cloud
358            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
359                return Err(Error::Anyhow(anyhow::anyhow!(
360                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
361                    mz.namespace(),
362                    mz.name_unchecked(),
363                    environment_id,
364                )));
365            }
366        }
367
368        self.check_environment_id_conflicts(&client, mz).await?;
369
370        global::Resources::new(&self.config, mz)?
371            .apply(&client, &mz.namespace())
372            .await?;
373
374        // we compare the hash against the environment resources generated
375        // for the current active generation, since that's what we expect to
376        // have been applied earlier, but we don't want to use these
377        // environment resources because when we apply them, we want to apply
378        // them with data that uses the new generation
379        let active_resources =
380            generation::Resources::new(&self.config, mz, status.active_generation);
381        let has_current_changes = status.resources_hash != active_resources.generate_hash();
382        let active_generation = status.active_generation;
383        let next_generation = active_generation + 1;
384        let desired_generation = if has_current_changes {
385            next_generation
386        } else {
387            active_generation
388        };
389
390        // here we regenerate the environment resources using the
391        // same inputs except with an updated generation
392        let resources = generation::Resources::new(&self.config, mz, desired_generation);
393        let resources_hash = resources.generate_hash();
394
395        let mut result = match (
396            mz.is_promoting(),
397            has_current_changes,
398            mz.rollout_requested(),
399        ) {
400            // If we're in status promoting, we MUST promote now.
401            // We don't know if we successfully promoted or not yet.
402            (true, _, _) => {
403                self.promote(
404                    &client,
405                    mz,
406                    resources,
407                    active_generation,
408                    desired_generation,
409                    resources_hash,
410                )
411                .await
412            }
413            // There are changes pending, and we want to apply them.
414            (false, true, true) => {
415                // If a rollout has been in progress for longer than the
416                // configured timeout, cancel it. While a rollout is in
417                // progress the new generation runs un-promoted and holds back
418                // compaction via read holds; promoting it after a long delay
419                // can cause incident-inducing load, so we abort instead and
420                // let the user retry by requesting a fresh rollout.
421                //
422                // We never cancel a force-promoting rollout (including the
423                // `ImmediatelyPromoteCausingDowntime` strategy), because by
424                // then the previously-active generation may already be torn
425                // down, leaving nothing to fall back to.
426                if !mz.should_force_promote() {
427                    if let Some(started) = mz.rollout_in_progress_since() {
428                        let timeout = mz.rollout_request_timeout();
429                        let elapsed = Timestamp::now().duration_since(started);
430                        let timed_out = SignedDuration::try_from(timeout)
431                            .is_ok_and(|timeout| elapsed >= timeout);
432                        if timed_out {
433                            warn!(
434                                "rollout to generation {desired_generation} exceeded timeout, cancelling"
435                            );
436                            // Tear down the un-promoted generation to release
437                            // its read holds.
438                            resources
439                                .teardown_generation(&client, mz, next_generation)
440                                .await?;
441                            self.update_status(
442                                &mz_api,
443                                mz,
444                                MaterializeStatus {
445                                    active_generation,
446                                    // Mark this rollout request as completed so
447                                    // that we don't immediately retry it; the
448                                    // user must request a new rollout to try
449                                    // again.
450                                    last_completed_rollout_request: mz
451                                        .requested_reconciliation_id(),
452                                    last_completed_rollout_environmentd_image_ref: status
453                                        .last_completed_rollout_environmentd_image_ref
454                                        .clone(),
455                                    resource_id: status.resource_id.clone(),
456                                    resources_hash: status.resources_hash.clone(),
457                                    last_completed_rollout_hash: None,
458                                    conditions: vec![Condition {
459                                        type_: "UpToDate".into(),
460                                        status: "False".into(),
461                                        last_transition_time: Time(Timestamp::now()),
462                                        message: format!(
463                                            "Cancelled rollout to generation \
464                                             {desired_generation} after it \
465                                             exceeded the rollout timeout of {}",
466                                            humantime::format_duration(timeout),
467                                        ),
468                                        observed_generation: mz.meta().generation,
469                                        reason: "RolloutTimeout".into(),
470                                    }],
471                                },
472                                active_generation != desired_generation,
473                            )
474                            .await?;
475                            return Ok(None);
476                        }
477                    }
478                }
479
480                if !mz.within_upgrade_window() {
481                    let last_completed_rollout_environmentd_image_ref =
482                        status.last_completed_rollout_environmentd_image_ref;
483
484                    self.update_status(
485                        &mz_api,
486                        mz,
487                        MaterializeStatus {
488                            active_generation,
489                            last_completed_rollout_request: status.last_completed_rollout_request,
490                            last_completed_rollout_environmentd_image_ref:
491                                last_completed_rollout_environmentd_image_ref.clone(),
492                            resource_id: status.resource_id,
493                            resources_hash: status.resources_hash,
494                            last_completed_rollout_hash: None,
495                            conditions: vec![Condition {
496                                type_: "UpToDate".into(),
497                                status: "False".into(),
498                                last_transition_time: Time(Timestamp::now()),
499                                message: format!(
500                                    "Refusing to upgrade from {} to {}. \
501                                     More than one major version from \
502                                     last successful rollout. If coming \
503                                     from Self Managed 25.2, upgrade to \
504                                     materialize/environmentd:v0.147.20 \
505                                     first.",
506                                    last_completed_rollout_environmentd_image_ref
507                                        .expect("should be set if upgrade window check fails"),
508                                    &mz.spec.environmentd_image_ref,
509                                ),
510                                observed_generation: mz.meta().generation,
511                                reason: "FailedDeploy".into(),
512                            }],
513                        },
514                        active_generation != desired_generation,
515                    )
516                    .await?;
517                    return Ok(None);
518                }
519
520                // we remove the environment resources hash annotation here
521                // because if we fail halfway through applying the resources,
522                // things will be in an inconsistent state, and we don't want
523                // to allow the possibility of the user making a second
524                // change which reverts to the original state and then
525                // skipping retrying this apply, since that would leave
526                // things in a permanently inconsistent state.
527                // note that environment.spec will be empty here after
528                // replace_status, but this is fine because we already
529                // extracted all of the information we want from the spec
530                // earlier.
531                let mz = if mz.is_ready_to_promote(&resources_hash) {
532                    mz
533                } else {
534                    &self
535                        .update_status(
536                            &mz_api,
537                            mz,
538                            MaterializeStatus {
539                                active_generation,
540                                // don't update the reconciliation id yet,
541                                // because the rollout hasn't yet completed. if
542                                // we fail later on, we want to ensure that the
543                                // rollout gets retried.
544                                last_completed_rollout_request: status
545                                    .last_completed_rollout_request,
546                                last_completed_rollout_environmentd_image_ref: status
547                                    .last_completed_rollout_environmentd_image_ref,
548                                resource_id: status.resource_id.clone(),
549                                resources_hash: String::new(),
550                                last_completed_rollout_hash: None,
551                                conditions: vec![Condition {
552                                    type_: "UpToDate".into(),
553                                    status: "Unknown".into(),
554                                    last_transition_time: Time(Timestamp::now()),
555                                    message: format!(
556                                        "Applying changes for generation {desired_generation}"
557                                    ),
558                                    observed_generation: mz.meta().generation,
559                                    reason: "Applying".into(),
560                                }],
561                            },
562                            active_generation != desired_generation,
563                        )
564                        .await?
565                };
566                let status = mz.status();
567
568                if mz.spec.rollout_strategy
569                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
570                {
571                    // The only reason someone would choose this strategy is if they didn't have
572                    // space for the two generations of pods.
573                    // Lets make room for the new ones by deleting the old generation.
574                    resources
575                        .teardown_generation(&client, mz, active_generation)
576                        .await?;
577                }
578
579                trace!("applying environment resources");
580                match resources
581                    .apply(&client, mz.should_force_promote(), &mz.namespace())
582                    .await
583                {
584                    Ok(Some(action)) => {
585                        trace!("new environment is not yet ready");
586                        Ok(Some(action))
587                    }
588                    Ok(None) => {
589                        if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
590                            && !mz.should_force_promote()
591                        {
592                            trace!(
593                                "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
594                            );
595                            self.update_status(
596                                &mz_api,
597                                mz,
598                                MaterializeStatus {
599                                    active_generation,
600                                    last_completed_rollout_request: status
601                                        .last_completed_rollout_request,
602                                    last_completed_rollout_environmentd_image_ref: status
603                                        .last_completed_rollout_environmentd_image_ref,
604                                    resource_id: status.resource_id,
605                                    resources_hash,
606                                    last_completed_rollout_hash: None,
607                                    conditions: vec![Condition {
608                                        type_: "UpToDate".into(),
609                                        status: "Unknown".into(),
610                                        // Carry the `Applying` phase's
611                                        // timestamp forward (both phases are
612                                        // `Unknown`) so the rollout timeout
613                                        // spans Applying + ReadyToPromote
614                                        // rather than resetting here.
615                                        last_transition_time: Time(mz.up_to_date_transition_time(
616                                            "Unknown",
617                                            Timestamp::now(),
618                                        )),
619                                        message: format!(
620                                            "Ready to promote generation {desired_generation}"
621                                        ),
622                                        observed_generation: mz.meta().generation,
623                                        reason: "ReadyToPromote".into(),
624                                    }],
625                                },
626                                active_generation != desired_generation,
627                            )
628                            .await?;
629                            return Ok(None);
630                        }
631                        // do this last, so that we keep traffic pointing at
632                        // the previous environmentd until the new one is
633                        // fully ready
634
635                        // Update the status before calling promote, so that we know
636                        // we've crossed the point of no return.
637                        // Once we see this status, we must promote without taking other actions.
638                        self.update_status(
639                            &mz_api,
640                            mz,
641                            MaterializeStatus {
642                                active_generation,
643                                // don't update the reconciliation id yet,
644                                // because the rollout hasn't yet completed. if
645                                // we fail later on, we want to ensure that the
646                                // rollout gets retried.
647                                last_completed_rollout_request: status
648                                    .last_completed_rollout_request,
649                                last_completed_rollout_environmentd_image_ref: status
650                                    .last_completed_rollout_environmentd_image_ref,
651                                resource_id: status.resource_id,
652                                resources_hash: resources_hash.clone(),
653                                last_completed_rollout_hash: None,
654                                conditions: vec![Condition {
655                                    type_: "UpToDate".into(),
656                                    status: "Unknown".into(),
657                                    last_transition_time: Time(Timestamp::now()),
658                                    message: format!(
659                                        "Attempting to promote generation {desired_generation}"
660                                    ),
661                                    observed_generation: mz.meta().generation,
662                                    reason: "Promoting".into(),
663                                }],
664                            },
665                            active_generation != desired_generation,
666                        )
667                        .await?;
668                        self.promote(
669                            &client,
670                            mz,
671                            resources,
672                            active_generation,
673                            desired_generation,
674                            resources_hash,
675                        )
676                        .await
677                    }
678                    Err(e) => {
679                        self.update_status(
680                            &mz_api,
681                            mz,
682                            MaterializeStatus {
683                                active_generation,
684                                // also don't update the reconciliation id
685                                // here, because there was an error during
686                                // the rollout and we want to ensure it gets
687                                // retried.
688                                last_completed_rollout_request: status
689                                    .last_completed_rollout_request,
690                                last_completed_rollout_environmentd_image_ref: status
691                                    .last_completed_rollout_environmentd_image_ref,
692                                resource_id: status.resource_id,
693                                resources_hash: status.resources_hash,
694                                last_completed_rollout_hash: None,
695                                conditions: vec![Condition {
696                                    type_: "UpToDate".into(),
697                                    status: "False".into(),
698                                    last_transition_time: Time(Timestamp::now()),
699                                    message: format!(
700                                        "Failed to apply changes for \
701                                         generation {desired_generation}: {e}"
702                                    ),
703                                    observed_generation: mz.meta().generation,
704                                    reason: "FailedDeploy".into(),
705                                }],
706                            },
707                            active_generation != desired_generation,
708                        )
709                        .await?;
710                        Err(e)
711                    }
712                }
713            }
714            // There are changes pending, but we don't want to apply them yet.
715            (false, true, false) => {
716                let mut needs_update = mz.conditions_need_update();
717                if mz.update_in_progress() {
718                    resources
719                        .teardown_generation(&client, mz, next_generation)
720                        .await?;
721                    needs_update = true;
722                }
723                if needs_update {
724                    self.update_status(
725                        &mz_api,
726                        mz,
727                        MaterializeStatus {
728                            active_generation,
729                            last_completed_rollout_request: mz.requested_reconciliation_id(),
730                            last_completed_rollout_environmentd_image_ref: status
731                                .last_completed_rollout_environmentd_image_ref,
732                            resource_id: status.resource_id.clone(),
733                            resources_hash: status.resources_hash,
734                            last_completed_rollout_hash: None,
735                            conditions: vec![Condition {
736                                type_: "UpToDate".into(),
737                                status: "False".into(),
738                                last_transition_time: Time(Timestamp::now()),
739                                message: format!(
740                                    "Changes detected, waiting for approval for generation {desired_generation}"
741                                ),
742                                observed_generation: mz.meta().generation,
743                                reason: "WaitingForApproval".into(),
744                            }],
745                        },
746                        active_generation != desired_generation,
747                    )
748                    .await?;
749                }
750                debug!("changes detected, waiting for approval");
751                Ok(None)
752            }
753            // No changes pending, but we might need to clean up a partially applied rollout.
754            (false, false, _) => {
755                // this can happen if we update the environment, but then revert
756                // that update before the update was deployed. in this case, we
757                // don't want the environment to still show up as
758                // WaitingForApproval.
759                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
760                if mz.update_in_progress() {
761                    resources
762                        .teardown_generation(&client, mz, next_generation)
763                        .await?;
764                    needs_update = true;
765                }
766                if needs_update {
767                    self.update_status(
768                        &mz_api,
769                        mz,
770                        MaterializeStatus {
771                            active_generation,
772                            last_completed_rollout_request: mz.requested_reconciliation_id(),
773                            last_completed_rollout_environmentd_image_ref: status
774                                .last_completed_rollout_environmentd_image_ref,
775                            resource_id: status.resource_id.clone(),
776                            resources_hash: status.resources_hash,
777                            last_completed_rollout_hash: None,
778                            conditions: vec![Condition {
779                                type_: "UpToDate".into(),
780                                status: "True".into(),
781                                last_transition_time: Time(Timestamp::now()),
782                                message: format!(
783                                    "No changes found from generation {active_generation}"
784                                ),
785                                observed_generation: mz.meta().generation,
786                                reason: "Applied".into(),
787                            }],
788                        },
789                        active_generation != desired_generation,
790                    )
791                    .await?;
792                }
793                debug!("no changes");
794                Ok(None)
795            }
796        }?;
797
798        if let Some(action) = result {
799            return Ok(Some(action));
800        }
801
802        // balancers rely on the environmentd service existing, which is
803        // enforced by the environmentd rollout process being able to call
804        // into the promotion endpoint
805
806        if self.config.create_balancers {
807            let balancer = Balancer {
808                metadata: mz.managed_resource_meta(mz.name_unchecked()),
809                spec: BalancerSpec {
810                    balancerd_image_ref: matching_image_from_environmentd_image_ref(
811                        mz.active_environmentd_image_ref(),
812                        "balancerd",
813                        None,
814                    ),
815                    resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
816                    replicas: Some(mz.balancerd_replicas()),
817                    external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
818                    internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
819                    pod_annotations: mz.spec.pod_annotations.clone(),
820                    pod_labels: mz.spec.pod_labels.clone(),
821                    static_routing: Some(
822                        mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
823                            environmentd_namespace: mz.namespace(),
824                            environmentd_service_name: mz.environmentd_service_name(),
825                        },
826                    ),
827                    frontegg_routing: None,
828                    resource_id: Some(status.resource_id.clone()),
829                },
830                status: None,
831            };
832            let balancer = apply_resource(&balancer_api, &balancer).await?;
833            result = wait_for_balancer(&balancer)?;
834        } else {
835            delete_resource(&balancer_api, &mz.name_unchecked()).await?;
836        }
837
838        if let Some(action) = result {
839            return Ok(Some(action));
840        }
841
842        // and the console relies on the balancer service existing, which is
843        // enforced by wait_for_balancer
844
845        if self.config.create_console {
846            let active_environmentd_image_ref = mz.active_environmentd_image_ref();
847            let environmentd_image_tag =
848                parse_image_tag(active_environmentd_image_ref).unwrap_or("latest");
849            let console_image_tag = self
850                .config
851                .console_image_tag_map
852                .iter()
853                .find(|kv| kv.key == environmentd_image_tag)
854                .map(|kv| kv.value.clone())
855                .unwrap_or_else(|| self.config.console_image_tag_default.clone());
856            let console = Console {
857                metadata: mz.managed_resource_meta(mz.name_unchecked()),
858                spec: ConsoleSpec {
859                    console_image_ref: matching_image_from_environmentd_image_ref(
860                        active_environmentd_image_ref,
861                        "console",
862                        Some(&console_image_tag),
863                    ),
864                    resource_requirements: mz.spec.console_resource_requirements.clone(),
865                    replicas: Some(mz.console_replicas()),
866                    external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
867                    pod_annotations: mz.spec.pod_annotations.clone(),
868                    pod_labels: mz.spec.pod_labels.clone(),
869                    balancerd: BalancerdRef {
870                        service_name: mz.balancerd_service_name(),
871                        namespace: mz.namespace(),
872                        scheme: if issuer_ref_defined(
873                            &self.config.default_certificate_specs.balancerd_external,
874                            &mz.spec.balancerd_external_certificate_spec,
875                        ) {
876                            HttpConnectionScheme::Https
877                        } else {
878                            HttpConnectionScheme::Http
879                        },
880                        dns_names: resolved_dns_names(
881                            &self.config.default_certificate_specs.balancerd_external,
882                            &mz.spec.balancerd_external_certificate_spec,
883                        ),
884                    },
885                    authenticator_kind: mz.spec.authenticator_kind,
886                    resource_id: Some(status.resource_id),
887                },
888                status: None,
889            };
890            apply_resource(&console_api, &console).await?;
891        } else {
892            delete_resource(&console_api, &mz.name_unchecked()).await?;
893        }
894
895        Ok(result)
896    }
897
898    #[instrument(fields(organization_name=mz.name_unchecked()))]
899    async fn cleanup(
900        &self,
901        _client: Client,
902        mz: &Self::Resource,
903        _metadata: &mut TraceMetadata,
904    ) -> Result<Option<Action>, Self::Error> {
905        self.set_needs_update(mz, false);
906
907        Ok(None)
908    }
909}
910
911fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
912    if let Some(conditions) = balancer
913        .status
914        .as_ref()
915        .map(|status| status.conditions.as_slice())
916    {
917        if conditions
918            .iter()
919            .any(|condition| condition.type_ == "Ready" && condition.status == "True")
920        {
921            return Ok(None);
922        }
923    }
924
925    Ok(Some(Action::requeue(Duration::from_secs(1))))
926}