mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    future::ready,
14    str::FromStr,
15    sync::{Arc, Mutex},
16};
17
18use anyhow::Context as _;
19use futures::StreamExt;
20use http::HeaderValue;
21use k8s_openapi::{
22    api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
23    apimachinery::pkg::apis::meta::v1::{Condition, Time},
24};
25use kube::{
26    Api, Client, Resource, ResourceExt,
27    api::PostParams,
28    runtime::{controller::Action, reflector, watcher},
29};
30use serde::{Deserialize, Serialize, de::DeserializeOwned};
31use tracing::{debug, trace, warn};
32use uuid::Uuid;
33
34use crate::{controller::materialize::environmentd::V161, metrics::Metrics};
35use mz_cloud_provider::CloudProvider;
36use mz_cloud_resources::crd::materialize::v1alpha1::{
37    Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
38};
39use mz_license_keys::validate;
40use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
41use mz_orchestrator_tracing::TracingCliArgs;
42use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
43
44pub mod balancer;
45pub mod console;
46pub mod environmentd;
47pub mod tls;
48
49#[derive(clap::Parser)]
50pub struct MaterializeControllerArgs {
51    #[clap(long)]
52    cloud_provider: CloudProvider,
53    #[clap(long)]
54    region: String,
55    #[clap(long)]
56    create_balancers: bool,
57    #[clap(long)]
58    create_console: bool,
59    #[clap(long)]
60    helm_chart_version: Option<String>,
61    #[clap(long, default_value = "kubernetes")]
62    secrets_controller: String,
63    #[clap(long)]
64    collect_pod_metrics: bool,
65    #[clap(long)]
66    enable_prometheus_scrape_annotations: bool,
67    #[clap(long)]
68    disable_authentication: bool,
69
70    #[clap(long)]
71    segment_api_key: Option<String>,
72    #[clap(long)]
73    segment_client_side: bool,
74
75    #[clap(long)]
76    console_image_tag_default: String,
77    #[clap(long)]
78    console_image_tag_map: Vec<KeyValueArg<String, String>>,
79
80    #[clap(flatten)]
81    aws_info: AwsInfo,
82
83    #[clap(long)]
84    ephemeral_volume_class: Option<String>,
85    #[clap(long)]
86    scheduler_name: Option<String>,
87    #[clap(long)]
88    enable_security_context: bool,
89    #[clap(long)]
90    enable_internal_statement_logging: bool,
91    #[clap(long, default_value = "false")]
92    disable_statement_logging: bool,
93
94    #[clap(long)]
95    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
96    #[clap(long)]
97    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
98    #[clap(long, value_parser = parse_affinity)]
99    environmentd_affinity: Option<Affinity>,
100    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
101    environmentd_tolerations: Option<Vec<Toleration>>,
102    #[clap(long, value_parser = parse_resources)]
103    environmentd_default_resources: Option<ResourceRequirements>,
104    #[clap(long)]
105    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
106    #[clap(long, value_parser = parse_affinity)]
107    clusterd_affinity: Option<Affinity>,
108    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
109    clusterd_tolerations: Option<Vec<Toleration>>,
110    #[clap(long)]
111    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
112    #[clap(long, value_parser = parse_affinity)]
113    balancerd_affinity: Option<Affinity>,
114    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
115    balancerd_tolerations: Option<Vec<Toleration>>,
116    #[clap(long, value_parser = parse_resources)]
117    balancerd_default_resources: Option<ResourceRequirements>,
118    #[clap(long)]
119    console_node_selector: Vec<KeyValueArg<String, String>>,
120    #[clap(long, value_parser = parse_affinity)]
121    console_affinity: Option<Affinity>,
122    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
123    console_tolerations: Option<Vec<Toleration>>,
124    #[clap(long, value_parser = parse_resources)]
125    console_default_resources: Option<ResourceRequirements>,
126    #[clap(long, default_value = "always", value_enum)]
127    image_pull_policy: KubernetesImagePullPolicy,
128    #[clap(flatten)]
129    network_policies: NetworkPolicyConfig,
130
131    #[clap(long)]
132    environmentd_cluster_replica_sizes: Option<String>,
133    #[clap(long)]
134    bootstrap_default_cluster_replica_size: Option<String>,
135    #[clap(long)]
136    bootstrap_builtin_system_cluster_replica_size: Option<String>,
137    #[clap(long)]
138    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
139    #[clap(long)]
140    bootstrap_builtin_support_cluster_replica_size: Option<String>,
141    #[clap(long)]
142    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
143    #[clap(long)]
144    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
145    #[clap(long)]
146    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
147    #[clap(long)]
148    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
149    #[clap(long)]
150    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
151    #[clap(long)]
152    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
153
154    #[clap(
155        long,
156        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
157    )]
158    environmentd_allowed_origins: Vec<HeaderValue>,
159    #[clap(long, default_value = "https://console.materialize.com")]
160    internal_console_proxy_url: String,
161
162    #[clap(long, default_value = "6875")]
163    environmentd_sql_port: u16,
164    #[clap(long, default_value = "6876")]
165    environmentd_http_port: u16,
166    #[clap(long, default_value = "6877")]
167    environmentd_internal_sql_port: u16,
168    #[clap(long, default_value = "6878")]
169    environmentd_internal_http_port: u16,
170    #[clap(long, default_value = "6879")]
171    environmentd_internal_persist_pubsub_port: u16,
172
173    #[clap(long, default_value = "6875")]
174    balancerd_sql_port: u16,
175    #[clap(long, default_value = "6876")]
176    balancerd_http_port: u16,
177    #[clap(long, default_value = "8080")]
178    balancerd_internal_http_port: u16,
179
180    #[clap(long, default_value = "8080")]
181    console_http_port: u16,
182
183    #[clap(long, default_value = "{}")]
184    default_certificate_specs: DefaultCertificateSpecs,
185
186    #[clap(long, hide = true)]
187    disable_license_key_checks: bool,
188}
189
190fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
191    Ok(serde_json::from_str(s)?)
192}
193
194fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
195    Ok(serde_json::from_str(s)?)
196}
197
198fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
199    Ok(serde_json::from_str(s)?)
200}
201
202#[derive(Clone, Deserialize, Default)]
203#[serde(rename_all = "camelCase")]
204pub struct DefaultCertificateSpecs {
205    balancerd_external: Option<MaterializeCertSpec>,
206    console_external: Option<MaterializeCertSpec>,
207    internal: Option<MaterializeCertSpec>,
208}
209
210impl FromStr for DefaultCertificateSpecs {
211    type Err = serde_json::Error;
212
213    fn from_str(s: &str) -> Result<Self, Self::Err> {
214        serde_json::from_str(s)
215    }
216}
217
218#[derive(clap::Parser)]
219pub struct AwsInfo {
220    #[clap(long)]
221    aws_account_id: Option<String>,
222    #[clap(long)]
223    environmentd_iam_role_arn: Option<String>,
224    #[clap(long)]
225    environmentd_connection_role_arn: Option<String>,
226    #[clap(long)]
227    aws_secrets_controller_tags: Vec<String>,
228    #[clap(long)]
229    environmentd_availability_zones: Option<Vec<String>>,
230}
231
232#[derive(clap::Parser)]
233pub struct NetworkPolicyConfig {
234    #[clap(long = "network-policies-internal-enabled")]
235    internal_enabled: bool,
236
237    #[clap(long = "network-policies-ingress-enabled")]
238    ingress_enabled: bool,
239
240    #[clap(long = "network-policies-ingress-cidrs")]
241    ingress_cidrs: Vec<String>,
242
243    #[clap(long = "network-policies-egress-enabled")]
244    egress_enabled: bool,
245
246    #[clap(long = "network-policies-egress-cidrs")]
247    egress_cidrs: Vec<String>,
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum Error {
252    Anyhow(#[from] anyhow::Error),
253    Kube(#[from] kube::Error),
254    Reqwest(#[from] reqwest::Error),
255}
256
257impl Display for Error {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        match self {
260            Self::Anyhow(e) => write!(f, "{e}"),
261            Self::Kube(e) => write!(f, "{e}"),
262            Self::Reqwest(e) => write!(f, "{e}"),
263        }
264    }
265}
266
267pub struct Context {
268    config: MaterializeControllerArgs,
269    tracing: TracingCliArgs,
270    orchestratord_namespace: String,
271    metrics: Arc<Metrics>,
272    materializes: reflector::Store<Materialize>,
273    needs_update: Arc<Mutex<BTreeSet<String>>>,
274}
275
276impl Context {
277    pub async fn new(
278        config: MaterializeControllerArgs,
279        tracing: TracingCliArgs,
280        orchestratord_namespace: String,
281        metrics: Arc<Metrics>,
282        client: kube::Client,
283    ) -> Self {
284        if config.cloud_provider == CloudProvider::Aws {
285            assert!(
286                config.aws_info.aws_account_id.is_some(),
287                "--aws-account-id is required when using --cloud-provider=aws"
288            );
289        }
290
291        Self {
292            config,
293            tracing,
294            orchestratord_namespace,
295            metrics,
296            materializes: Self::make_reflector(client.clone()).await,
297            needs_update: Default::default(),
298        }
299    }
300
301    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
302        let mut needs_update_set = self.needs_update.lock().unwrap();
303        if needs_update {
304            needs_update_set.insert(mz.name_unchecked());
305        } else {
306            needs_update_set.remove(&mz.name_unchecked());
307        }
308        self.metrics
309            .environmentd_needs_update
310            .set(u64::cast_from(needs_update_set.len()));
311    }
312
313    async fn update_status(
314        &self,
315        mz_api: &Api<Materialize>,
316        mz: &Materialize,
317        status: MaterializeStatus,
318        needs_update: bool,
319    ) -> Result<Materialize, kube::Error> {
320        self.set_needs_update(mz, needs_update);
321
322        let mut new_mz = mz.clone();
323        if !mz
324            .status
325            .as_ref()
326            .map_or(true, |mz_status| mz_status.needs_update(&status))
327        {
328            return Ok(new_mz);
329        }
330
331        new_mz.status = Some(status);
332        mz_api
333            .replace_status(
334                &mz.name_unchecked(),
335                &PostParams::default(),
336                serde_json::to_vec(&new_mz).unwrap(),
337            )
338            .await
339    }
340
341    async fn promote(
342        &self,
343        client: &Client,
344        mz: &Materialize,
345        resources: environmentd::Resources,
346        active_generation: u64,
347        desired_generation: u64,
348        resources_hash: String,
349    ) -> Result<Option<Action>, Error> {
350        if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
351            return Ok(Some(action));
352        }
353        resources
354            .teardown_generation(client, mz, active_generation)
355            .await?;
356        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
357        self.update_status(
358            &mz_api,
359            mz,
360            MaterializeStatus {
361                active_generation: desired_generation,
362                last_completed_rollout_request: mz.requested_reconciliation_id(),
363                resource_id: mz.status().resource_id,
364                resources_hash,
365                conditions: vec![Condition {
366                    type_: "UpToDate".into(),
367                    status: "True".into(),
368                    last_transition_time: Time(chrono::offset::Utc::now()),
369                    message: format!(
370                        "Successfully applied changes for generation {desired_generation}"
371                    ),
372                    observed_generation: mz.meta().generation,
373                    reason: "Applied".into(),
374                }],
375            },
376            false,
377        )
378        .await?;
379        Ok(None)
380    }
381
382    fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
383        if mz.spec.environment_id.is_nil() {
384            // this is always a bug - we delay doing this check until the
385            // resource should have an environment id set, either from the
386            // license key, or explicitly given, or randomly defaulted.
387            return Err(Error::Anyhow(anyhow::anyhow!(
388                "trying to reconcile a materialize resource with no environment id - this is a bug!"
389            )));
390        }
391
392        for existing_mz in self.materializes.state() {
393            if existing_mz.spec.environment_id == mz.spec.environment_id
394                && existing_mz.metadata.uid != mz.metadata.uid
395            {
396                return Err(Error::Anyhow(anyhow::anyhow!(
397                    "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
398                    mz.namespace(),
399                    mz.name_unchecked(),
400                    existing_mz.namespace(),
401                    existing_mz.name_unchecked(),
402                )));
403            }
404        }
405
406        Ok(())
407    }
408
409    async fn make_reflector<K>(client: Client) -> reflector::Store<K>
410    where
411        K: kube::Resource<DynamicType = ()>
412            + Clone
413            + Send
414            + Sync
415            + DeserializeOwned
416            + Serialize
417            + std::fmt::Debug
418            + 'static,
419    {
420        let api = kube::Api::all(client);
421        let (store, writer) = reflector::store();
422        let reflector =
423            reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
424        mz_ore::task::spawn(
425            || format!("{} reflector", K::kind(&Default::default())),
426            async {
427                reflector
428                    .for_each(|res| {
429                        if let Err(e) = res {
430                            warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
431                        }
432                        ready(())
433                    })
434                    .await
435            },
436        );
437        // the only way this can return an error is if we drop the writer,
438        // which we do not ever do, so unwrap is fine
439        store.wait_until_ready().await.unwrap();
440        store
441    }
442}
443
444#[async_trait::async_trait]
445impl k8s_controller::Context for Context {
446    type Resource = Materialize;
447    type Error = Error;
448
449    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
450
451    #[instrument(fields(organization_name=mz.name_unchecked()))]
452    async fn apply(
453        &self,
454        client: Client,
455        mz: &Self::Resource,
456    ) -> Result<Option<Action>, Self::Error> {
457        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
458        let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
459
460        let status = mz.status();
461        if mz.status.is_none() {
462            self.update_status(&mz_api, mz, status, true).await?;
463            // Updating the status should trigger a reconciliation
464            // which will include a status this time.
465            return Ok(None);
466        }
467
468        let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
469        let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
470            .data
471            .as_ref()
472            .and_then(|data| data.get("license_key"))
473        {
474            let license_key = validate(
475                str::from_utf8(&license_key.0)
476                    .context("invalid utf8")?
477                    .trim(),
478            )?;
479            let environment_id = license_key
480                .environment_id
481                .parse()
482                .context("invalid environment id in license key")?;
483            Some(environment_id)
484        } else {
485            if mz.meets_minimum_version(&V161) {
486                return Err(Error::Anyhow(anyhow::anyhow!(
487                    "license_key is required when running in kubernetes",
488                )));
489            } else {
490                None
491            }
492        };
493
494        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
495            let mut mz = mz.clone();
496            if mz.spec.request_rollout.is_nil() {
497                mz.spec.request_rollout = Uuid::new_v4();
498            }
499            if mz.spec.environment_id.is_nil() {
500                if let Some(environment_id) = license_key_environment_id {
501                    if environment_id.is_nil() {
502                        // this makes it easier to use a license key in
503                        // development with no environment id set
504                        mz.spec.environment_id = Uuid::new_v4();
505                    } else {
506                        mz.spec.environment_id = environment_id;
507                    }
508                } else {
509                    if mz.meets_minimum_version(&V161) {
510                        return Err(Error::Anyhow(anyhow::anyhow!(
511                            "environmentId is not set in materialize resource {}/{} but no license key was given",
512                            mz.namespace(),
513                            mz.name_unchecked()
514                        )));
515                    } else {
516                        mz.spec.environment_id = Uuid::new_v4();
517                    }
518                }
519            }
520            mz_api
521                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
522                .await?;
523            // Updating the spec should also trigger a reconciliation.
524            // We can't do that as part of the above check because you can't
525            // update both the spec and the status in a single api call.
526            return Ok(None);
527        }
528
529        if let Some(environment_id) = license_key_environment_id {
530            // we still allow a nil environment id in the license key to be
531            // accepted for any provided environment id, to support cloud
532            if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
533                return Err(Error::Anyhow(anyhow::anyhow!(
534                    "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
535                    mz.namespace(),
536                    mz.name_unchecked(),
537                    environment_id,
538                )));
539            }
540        }
541
542        self.check_environment_id_conflicts(mz)?;
543
544        // we compare the hash against the environment resources generated
545        // for the current active generation, since that's what we expect to
546        // have been applied earlier, but we don't want to use these
547        // environment resources because when we apply them, we want to apply
548        // them with data that uses the new generation
549        let active_resources = environmentd::Resources::new(
550            &self.config,
551            &self.tracing,
552            &self.orchestratord_namespace,
553            mz,
554            status.active_generation,
555        );
556        let has_current_changes = status.resources_hash != active_resources.generate_hash();
557        let active_generation = status.active_generation;
558        let next_generation = active_generation + 1;
559        let desired_generation = if has_current_changes {
560            next_generation
561        } else {
562            active_generation
563        };
564
565        // here we regenerate the environment resources using the
566        // same inputs except with an updated generation
567        let resources = environmentd::Resources::new(
568            &self.config,
569            &self.tracing,
570            &self.orchestratord_namespace,
571            mz,
572            desired_generation,
573        );
574        let resources_hash = resources.generate_hash();
575
576        let mut result = match (
577            mz.is_promoting(),
578            has_current_changes,
579            mz.rollout_requested(),
580        ) {
581            // If we're in status promoting, we MUST promote now.
582            // We don't know if we successfully promoted or not yet.
583            (true, _, _) => {
584                self.promote(
585                    &client,
586                    mz,
587                    resources,
588                    active_generation,
589                    desired_generation,
590                    resources_hash,
591                )
592                .await
593            }
594            // There are changes pending, and we want to appy them.
595            (false, true, true) => {
596                // we remove the environment resources hash annotation here
597                // because if we fail halfway through applying the resources,
598                // things will be in an inconsistent state, and we don't want
599                // to allow the possibility of the user making a second
600                // change which reverts to the original state and then
601                // skipping retrying this apply, since that would leave
602                // things in a permanently inconsistent state.
603                // note that environment.spec will be empty here after
604                // replace_status, but this is fine because we already
605                // extracted all of the information we want from the spec
606                // earlier.
607                let mz = self
608                    .update_status(
609                        &mz_api,
610                        mz,
611                        MaterializeStatus {
612                            active_generation,
613                            // don't update the reconciliation id yet,
614                            // because the rollout hasn't yet completed. if
615                            // we fail later on, we want to ensure that the
616                            // rollout gets retried.
617                            last_completed_rollout_request: status.last_completed_rollout_request,
618                            resource_id: status.resource_id,
619                            resources_hash: String::new(),
620                            conditions: vec![Condition {
621                                type_: "UpToDate".into(),
622                                status: "Unknown".into(),
623                                last_transition_time: Time(chrono::offset::Utc::now()),
624                                message: format!(
625                                    "Applying changes for generation {desired_generation}"
626                                ),
627                                observed_generation: mz.meta().generation,
628                                reason: "Applying".into(),
629                            }],
630                        },
631                        active_generation != desired_generation,
632                    )
633                    .await?;
634                let mz = &mz;
635                let status = mz.status();
636
637                if mz.spec.rollout_strategy
638                    == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
639                {
640                    // The only reason someone would choose this strategy is if they didn't have
641                    // space for the two generations of pods.
642                    // Lets make room for the new ones by deleting the old generation.
643                    resources
644                        .teardown_generation(&client, mz, active_generation)
645                        .await?;
646                }
647
648                trace!("applying environment resources");
649                match resources
650                    .apply(&client, mz.should_force_promote(), &mz.namespace())
651                    .await
652                {
653                    Ok(Some(action)) => {
654                        trace!("new environment is not yet ready");
655                        Ok(Some(action))
656                    }
657                    Ok(None) => {
658                        // do this last, so that we keep traffic pointing at
659                        // the previous environmentd until the new one is
660                        // fully ready
661
662                        // Update the status before calling promote, so that we know
663                        // we've crossed the point of no return.
664                        // Once we see this status, we must promote without taking other actions.
665                        self.update_status(
666                            &mz_api,
667                            mz,
668                            MaterializeStatus {
669                                active_generation,
670                                // don't update the reconciliation id yet,
671                                // because the rollout hasn't yet completed. if
672                                // we fail later on, we want to ensure that the
673                                // rollout gets retried.
674                                last_completed_rollout_request: status
675                                    .last_completed_rollout_request,
676                                resource_id: status.resource_id,
677                                resources_hash: resources_hash.clone(),
678                                conditions: vec![Condition {
679                                    type_: "UpToDate".into(),
680                                    status: "Unknown".into(),
681                                    last_transition_time: Time(chrono::offset::Utc::now()),
682                                    message: format!(
683                                        "Attempting to promote generation {desired_generation}"
684                                    ),
685                                    observed_generation: mz.meta().generation,
686                                    reason: "Promoting".into(),
687                                }],
688                            },
689                            active_generation != desired_generation,
690                        )
691                        .await?;
692                        self.promote(
693                            &client,
694                            mz,
695                            resources,
696                            active_generation,
697                            desired_generation,
698                            resources_hash,
699                        )
700                        .await
701                    }
702                    Err(e) => {
703                        self.update_status(
704                            &mz_api,
705                            mz,
706                            MaterializeStatus {
707                                active_generation,
708                                // also don't update the reconciliation id
709                                // here, because there was an error during
710                                // the rollout and we want to ensure it gets
711                                // retried.
712                                last_completed_rollout_request: status.last_completed_rollout_request,
713                                resource_id: status.resource_id,
714                                resources_hash: status.resources_hash,
715                                conditions: vec![Condition {
716                                    type_: "UpToDate".into(),
717                                    status: "False".into(),
718                                    last_transition_time: Time(chrono::offset::Utc::now()),
719                                    message: format!(
720                                        "Failed to apply changes for generation {desired_generation}: {e}"
721                                    ),
722                                    observed_generation: mz.meta().generation,
723                                    reason: "FailedDeploy".into(),
724                                }],
725                            },
726                            active_generation != desired_generation,
727                        )
728                        .await?;
729                        Err(e)
730                    }
731                }
732            }
733            // There are changes pending, but we don't want to apply them yet.
734            (false, true, false) => {
735                let mut needs_update = mz.conditions_need_update();
736                if mz.update_in_progress() {
737                    resources
738                        .teardown_generation(&client, mz, next_generation)
739                        .await?;
740                    needs_update = true;
741                }
742                if needs_update {
743                    self.update_status(
744                        &mz_api,
745                        mz,
746                        MaterializeStatus {
747                            active_generation,
748                            last_completed_rollout_request: mz.requested_reconciliation_id(),
749                            resource_id: status.resource_id,
750                            resources_hash: status.resources_hash,
751                            conditions: vec![Condition {
752                                type_: "UpToDate".into(),
753                                status: "False".into(),
754                                last_transition_time: Time(chrono::offset::Utc::now()),
755                                message: format!(
756                                    "Changes detected, waiting for approval for generation {desired_generation}"
757                                ),
758                                observed_generation: mz.meta().generation,
759                                reason: "WaitingForApproval".into(),
760                            }],
761                        },
762                        active_generation != desired_generation,
763                    )
764                    .await?;
765                }
766                debug!("changes detected, waiting for approval");
767                Ok(None)
768            }
769            // No changes pending, but we might need to clean up a partially applied rollout.
770            (false, false, _) => {
771                // this can happen if we update the environment, but then revert
772                // that update before the update was deployed. in this case, we
773                // don't want the environment to still show up as
774                // WaitingForApproval.
775                let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
776                if mz.update_in_progress() {
777                    resources
778                        .teardown_generation(&client, mz, next_generation)
779                        .await?;
780                    needs_update = true;
781                }
782                if needs_update {
783                    self.update_status(
784                        &mz_api,
785                        mz,
786                        MaterializeStatus {
787                            active_generation,
788                            last_completed_rollout_request: mz.requested_reconciliation_id(),
789                            resource_id: status.resource_id,
790                            resources_hash: status.resources_hash,
791                            conditions: vec![Condition {
792                                type_: "UpToDate".into(),
793                                status: "True".into(),
794                                last_transition_time: Time(chrono::offset::Utc::now()),
795                                message: format!(
796                                    "No changes found from generation {active_generation}"
797                                ),
798                                observed_generation: mz.meta().generation,
799                                reason: "Applied".into(),
800                            }],
801                        },
802                        active_generation != desired_generation,
803                    )
804                    .await?;
805                }
806                debug!("no changes");
807                Ok(None)
808            }
809        }?;
810
811        if let Some(action) = result {
812            return Ok(Some(action));
813        }
814
815        // balancers rely on the environmentd service existing, which is
816        // enforced by the environmentd rollout process being able to call
817        // into the promotion endpoint
818
819        let balancer = balancer::Resources::new(&self.config, mz);
820        if self.config.create_balancers {
821            result = balancer.apply(&client, &mz.namespace()).await?;
822        } else {
823            result = balancer.cleanup(&client, &mz.namespace()).await?;
824        }
825
826        if let Some(action) = result {
827            return Ok(Some(action));
828        }
829
830        // and the console relies on the balancer service existing, which is
831        // enforced by balancer::Resources::apply having a check for its pods
832        // being up, and not returning successfully until they are
833
834        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
835        else {
836            return Err(Error::Anyhow(anyhow::anyhow!(
837                "failed to parse environmentd image ref: {}",
838                mz.spec.environmentd_image_ref
839            )));
840        };
841        let console_image_tag = self
842            .config
843            .console_image_tag_map
844            .iter()
845            .find(|kv| kv.key == environmentd_image_tag)
846            .map(|kv| kv.value.clone())
847            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
848        let console = console::Resources::new(
849            &self.config,
850            mz,
851            &matching_image_from_environmentd_image_ref(
852                &mz.spec.environmentd_image_ref,
853                "console",
854                Some(&console_image_tag),
855            ),
856        );
857        if self.config.create_console {
858            console.apply(&client, &mz.namespace()).await?;
859        } else {
860            console.cleanup(&client, &mz.namespace()).await?;
861        }
862
863        Ok(result)
864    }
865
866    #[instrument(fields(organization_name=mz.name_unchecked()))]
867    async fn cleanup(
868        &self,
869        _client: Client,
870        mz: &Self::Resource,
871    ) -> Result<Option<Action>, Self::Error> {
872        self.set_needs_update(mz, false);
873
874        Ok(None)
875    }
876}
877
878fn matching_image_from_environmentd_image_ref(
879    environmentd_image_ref: &str,
880    image_name: &str,
881    image_tag: Option<&str>,
882) -> String {
883    let namespace = environmentd_image_ref
884        .rsplit_once('/')
885        .unwrap_or(("materialize", ""))
886        .0;
887    let tag = image_tag.unwrap_or_else(|| {
888        environmentd_image_ref
889            .rsplit_once(':')
890            .unwrap_or(("", "unstable"))
891            .1
892    });
893    format!("{namespace}/{image_name}:{tag}")
894}