1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 future::ready,
14 str::FromStr,
15 sync::{Arc, Mutex},
16};
17
18use anyhow::Context as _;
19use futures::StreamExt;
20use http::HeaderValue;
21use k8s_openapi::{
22 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
23 apimachinery::pkg::apis::meta::v1::{Condition, Time},
24};
25use kube::{
26 Api, Client, Resource, ResourceExt,
27 api::PostParams,
28 runtime::{controller::Action, reflector, watcher},
29};
30use serde::{Deserialize, Serialize, de::DeserializeOwned};
31use tracing::{debug, trace, warn};
32use uuid::Uuid;
33
34use crate::{controller::materialize::environmentd::V161, metrics::Metrics};
35use mz_cloud_provider::CloudProvider;
36use mz_cloud_resources::crd::materialize::v1alpha1::{
37 Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
38};
39use mz_license_keys::validate;
40use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
41use mz_orchestrator_tracing::TracingCliArgs;
42use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
43
44pub mod balancer;
45pub mod console;
46pub mod environmentd;
47pub mod tls;
48
49#[derive(clap::Parser)]
50pub struct MaterializeControllerArgs {
51 #[clap(long)]
52 cloud_provider: CloudProvider,
53 #[clap(long)]
54 region: String,
55 #[clap(long)]
56 create_balancers: bool,
57 #[clap(long)]
58 create_console: bool,
59 #[clap(long)]
60 helm_chart_version: Option<String>,
61 #[clap(long, default_value = "kubernetes")]
62 secrets_controller: String,
63 #[clap(long)]
64 collect_pod_metrics: bool,
65 #[clap(long)]
66 enable_prometheus_scrape_annotations: bool,
67 #[clap(long)]
68 disable_authentication: bool,
69
70 #[clap(long)]
71 segment_api_key: Option<String>,
72 #[clap(long)]
73 segment_client_side: bool,
74
75 #[clap(long)]
76 console_image_tag_default: String,
77 #[clap(long)]
78 console_image_tag_map: Vec<KeyValueArg<String, String>>,
79
80 #[clap(flatten)]
81 aws_info: AwsInfo,
82
83 #[clap(long)]
84 ephemeral_volume_class: Option<String>,
85 #[clap(long)]
86 scheduler_name: Option<String>,
87 #[clap(long)]
88 enable_security_context: bool,
89 #[clap(long)]
90 enable_internal_statement_logging: bool,
91 #[clap(long, default_value = "false")]
92 disable_statement_logging: bool,
93
94 #[clap(long)]
95 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
96 #[clap(long)]
97 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
98 #[clap(long, value_parser = parse_affinity)]
99 environmentd_affinity: Option<Affinity>,
100 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
101 environmentd_tolerations: Option<Vec<Toleration>>,
102 #[clap(long, value_parser = parse_resources)]
103 environmentd_default_resources: Option<ResourceRequirements>,
104 #[clap(long)]
105 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
106 #[clap(long, value_parser = parse_affinity)]
107 clusterd_affinity: Option<Affinity>,
108 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
109 clusterd_tolerations: Option<Vec<Toleration>>,
110 #[clap(long)]
111 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
112 #[clap(long, value_parser = parse_affinity)]
113 balancerd_affinity: Option<Affinity>,
114 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
115 balancerd_tolerations: Option<Vec<Toleration>>,
116 #[clap(long, value_parser = parse_resources)]
117 balancerd_default_resources: Option<ResourceRequirements>,
118 #[clap(long)]
119 console_node_selector: Vec<KeyValueArg<String, String>>,
120 #[clap(long, value_parser = parse_affinity)]
121 console_affinity: Option<Affinity>,
122 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
123 console_tolerations: Option<Vec<Toleration>>,
124 #[clap(long, value_parser = parse_resources)]
125 console_default_resources: Option<ResourceRequirements>,
126 #[clap(long, default_value = "always", value_enum)]
127 image_pull_policy: KubernetesImagePullPolicy,
128 #[clap(flatten)]
129 network_policies: NetworkPolicyConfig,
130
131 #[clap(long)]
132 environmentd_cluster_replica_sizes: Option<String>,
133 #[clap(long)]
134 bootstrap_default_cluster_replica_size: Option<String>,
135 #[clap(long)]
136 bootstrap_builtin_system_cluster_replica_size: Option<String>,
137 #[clap(long)]
138 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
139 #[clap(long)]
140 bootstrap_builtin_support_cluster_replica_size: Option<String>,
141 #[clap(long)]
142 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
143 #[clap(long)]
144 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
145 #[clap(long)]
146 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
147 #[clap(long)]
148 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
149 #[clap(long)]
150 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
151 #[clap(long)]
152 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
153
154 #[clap(
155 long,
156 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
157 )]
158 environmentd_allowed_origins: Vec<HeaderValue>,
159 #[clap(long, default_value = "https://console.materialize.com")]
160 internal_console_proxy_url: String,
161
162 #[clap(long, default_value = "6875")]
163 environmentd_sql_port: u16,
164 #[clap(long, default_value = "6876")]
165 environmentd_http_port: u16,
166 #[clap(long, default_value = "6877")]
167 environmentd_internal_sql_port: u16,
168 #[clap(long, default_value = "6878")]
169 environmentd_internal_http_port: u16,
170 #[clap(long, default_value = "6879")]
171 environmentd_internal_persist_pubsub_port: u16,
172
173 #[clap(long, default_value = "6875")]
174 balancerd_sql_port: u16,
175 #[clap(long, default_value = "6876")]
176 balancerd_http_port: u16,
177 #[clap(long, default_value = "8080")]
178 balancerd_internal_http_port: u16,
179
180 #[clap(long, default_value = "8080")]
181 console_http_port: u16,
182
183 #[clap(long, default_value = "{}")]
184 default_certificate_specs: DefaultCertificateSpecs,
185
186 #[clap(long, hide = true)]
187 disable_license_key_checks: bool,
188}
189
190fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
191 Ok(serde_json::from_str(s)?)
192}
193
194fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
195 Ok(serde_json::from_str(s)?)
196}
197
198fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
199 Ok(serde_json::from_str(s)?)
200}
201
202#[derive(Clone, Deserialize, Default)]
203#[serde(rename_all = "camelCase")]
204pub struct DefaultCertificateSpecs {
205 balancerd_external: Option<MaterializeCertSpec>,
206 console_external: Option<MaterializeCertSpec>,
207 internal: Option<MaterializeCertSpec>,
208}
209
210impl FromStr for DefaultCertificateSpecs {
211 type Err = serde_json::Error;
212
213 fn from_str(s: &str) -> Result<Self, Self::Err> {
214 serde_json::from_str(s)
215 }
216}
217
218#[derive(clap::Parser)]
219pub struct AwsInfo {
220 #[clap(long)]
221 aws_account_id: Option<String>,
222 #[clap(long)]
223 environmentd_iam_role_arn: Option<String>,
224 #[clap(long)]
225 environmentd_connection_role_arn: Option<String>,
226 #[clap(long)]
227 aws_secrets_controller_tags: Vec<String>,
228 #[clap(long)]
229 environmentd_availability_zones: Option<Vec<String>>,
230}
231
232#[derive(clap::Parser)]
233pub struct NetworkPolicyConfig {
234 #[clap(long = "network-policies-internal-enabled")]
235 internal_enabled: bool,
236
237 #[clap(long = "network-policies-ingress-enabled")]
238 ingress_enabled: bool,
239
240 #[clap(long = "network-policies-ingress-cidrs")]
241 ingress_cidrs: Vec<String>,
242
243 #[clap(long = "network-policies-egress-enabled")]
244 egress_enabled: bool,
245
246 #[clap(long = "network-policies-egress-cidrs")]
247 egress_cidrs: Vec<String>,
248}
249
250#[derive(Debug, thiserror::Error)]
251pub enum Error {
252 Anyhow(#[from] anyhow::Error),
253 Kube(#[from] kube::Error),
254 Reqwest(#[from] reqwest::Error),
255}
256
257impl Display for Error {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 match self {
260 Self::Anyhow(e) => write!(f, "{e}"),
261 Self::Kube(e) => write!(f, "{e}"),
262 Self::Reqwest(e) => write!(f, "{e}"),
263 }
264 }
265}
266
267pub struct Context {
268 config: MaterializeControllerArgs,
269 tracing: TracingCliArgs,
270 orchestratord_namespace: String,
271 metrics: Arc<Metrics>,
272 materializes: reflector::Store<Materialize>,
273 needs_update: Arc<Mutex<BTreeSet<String>>>,
274}
275
276impl Context {
277 pub async fn new(
278 config: MaterializeControllerArgs,
279 tracing: TracingCliArgs,
280 orchestratord_namespace: String,
281 metrics: Arc<Metrics>,
282 client: kube::Client,
283 ) -> Self {
284 if config.cloud_provider == CloudProvider::Aws {
285 assert!(
286 config.aws_info.aws_account_id.is_some(),
287 "--aws-account-id is required when using --cloud-provider=aws"
288 );
289 }
290
291 Self {
292 config,
293 tracing,
294 orchestratord_namespace,
295 metrics,
296 materializes: Self::make_reflector(client.clone()).await,
297 needs_update: Default::default(),
298 }
299 }
300
301 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
302 let mut needs_update_set = self.needs_update.lock().unwrap();
303 if needs_update {
304 needs_update_set.insert(mz.name_unchecked());
305 } else {
306 needs_update_set.remove(&mz.name_unchecked());
307 }
308 self.metrics
309 .environmentd_needs_update
310 .set(u64::cast_from(needs_update_set.len()));
311 }
312
313 async fn update_status(
314 &self,
315 mz_api: &Api<Materialize>,
316 mz: &Materialize,
317 status: MaterializeStatus,
318 needs_update: bool,
319 ) -> Result<Materialize, kube::Error> {
320 self.set_needs_update(mz, needs_update);
321
322 let mut new_mz = mz.clone();
323 if !mz
324 .status
325 .as_ref()
326 .map_or(true, |mz_status| mz_status.needs_update(&status))
327 {
328 return Ok(new_mz);
329 }
330
331 new_mz.status = Some(status);
332 mz_api
333 .replace_status(
334 &mz.name_unchecked(),
335 &PostParams::default(),
336 serde_json::to_vec(&new_mz).unwrap(),
337 )
338 .await
339 }
340
341 async fn promote(
342 &self,
343 client: &Client,
344 mz: &Materialize,
345 resources: environmentd::Resources,
346 active_generation: u64,
347 desired_generation: u64,
348 resources_hash: String,
349 ) -> Result<Option<Action>, Error> {
350 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
351 return Ok(Some(action));
352 }
353 resources
354 .teardown_generation(client, mz, active_generation)
355 .await?;
356 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
357 self.update_status(
358 &mz_api,
359 mz,
360 MaterializeStatus {
361 active_generation: desired_generation,
362 last_completed_rollout_request: mz.requested_reconciliation_id(),
363 resource_id: mz.status().resource_id,
364 resources_hash,
365 conditions: vec![Condition {
366 type_: "UpToDate".into(),
367 status: "True".into(),
368 last_transition_time: Time(chrono::offset::Utc::now()),
369 message: format!(
370 "Successfully applied changes for generation {desired_generation}"
371 ),
372 observed_generation: mz.meta().generation,
373 reason: "Applied".into(),
374 }],
375 },
376 false,
377 )
378 .await?;
379 Ok(None)
380 }
381
382 fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
383 if mz.spec.environment_id.is_nil() {
384 return Err(Error::Anyhow(anyhow::anyhow!(
388 "trying to reconcile a materialize resource with no environment id - this is a bug!"
389 )));
390 }
391
392 for existing_mz in self.materializes.state() {
393 if existing_mz.spec.environment_id == mz.spec.environment_id
394 && existing_mz.metadata.uid != mz.metadata.uid
395 {
396 return Err(Error::Anyhow(anyhow::anyhow!(
397 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
398 mz.namespace(),
399 mz.name_unchecked(),
400 existing_mz.namespace(),
401 existing_mz.name_unchecked(),
402 )));
403 }
404 }
405
406 Ok(())
407 }
408
409 async fn make_reflector<K>(client: Client) -> reflector::Store<K>
410 where
411 K: kube::Resource<DynamicType = ()>
412 + Clone
413 + Send
414 + Sync
415 + DeserializeOwned
416 + Serialize
417 + std::fmt::Debug
418 + 'static,
419 {
420 let api = kube::Api::all(client);
421 let (store, writer) = reflector::store();
422 let reflector =
423 reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
424 mz_ore::task::spawn(
425 || format!("{} reflector", K::kind(&Default::default())),
426 async {
427 reflector
428 .for_each(|res| {
429 if let Err(e) = res {
430 warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
431 }
432 ready(())
433 })
434 .await
435 },
436 );
437 store.wait_until_ready().await.unwrap();
440 store
441 }
442}
443
444#[async_trait::async_trait]
445impl k8s_controller::Context for Context {
446 type Resource = Materialize;
447 type Error = Error;
448
449 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
450
451 #[instrument(fields(organization_name=mz.name_unchecked()))]
452 async fn apply(
453 &self,
454 client: Client,
455 mz: &Self::Resource,
456 ) -> Result<Option<Action>, Self::Error> {
457 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
458 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
459
460 let status = mz.status();
461 if mz.status.is_none() {
462 self.update_status(&mz_api, mz, status, true).await?;
463 return Ok(None);
466 }
467
468 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
469 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
470 .data
471 .as_ref()
472 .and_then(|data| data.get("license_key"))
473 {
474 let license_key = validate(
475 str::from_utf8(&license_key.0)
476 .context("invalid utf8")?
477 .trim(),
478 )?;
479 let environment_id = license_key
480 .environment_id
481 .parse()
482 .context("invalid environment id in license key")?;
483 Some(environment_id)
484 } else {
485 if mz.meets_minimum_version(&V161) {
486 return Err(Error::Anyhow(anyhow::anyhow!(
487 "license_key is required when running in kubernetes",
488 )));
489 } else {
490 None
491 }
492 };
493
494 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
495 let mut mz = mz.clone();
496 if mz.spec.request_rollout.is_nil() {
497 mz.spec.request_rollout = Uuid::new_v4();
498 }
499 if mz.spec.environment_id.is_nil() {
500 if let Some(environment_id) = license_key_environment_id {
501 if environment_id.is_nil() {
502 mz.spec.environment_id = Uuid::new_v4();
505 } else {
506 mz.spec.environment_id = environment_id;
507 }
508 } else {
509 if mz.meets_minimum_version(&V161) {
510 return Err(Error::Anyhow(anyhow::anyhow!(
511 "environmentId is not set in materialize resource {}/{} but no license key was given",
512 mz.namespace(),
513 mz.name_unchecked()
514 )));
515 } else {
516 mz.spec.environment_id = Uuid::new_v4();
517 }
518 }
519 }
520 mz_api
521 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
522 .await?;
523 return Ok(None);
527 }
528
529 if let Some(environment_id) = license_key_environment_id {
530 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
533 return Err(Error::Anyhow(anyhow::anyhow!(
534 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
535 mz.namespace(),
536 mz.name_unchecked(),
537 environment_id,
538 )));
539 }
540 }
541
542 self.check_environment_id_conflicts(mz)?;
543
544 let active_resources = environmentd::Resources::new(
550 &self.config,
551 &self.tracing,
552 &self.orchestratord_namespace,
553 mz,
554 status.active_generation,
555 );
556 let has_current_changes = status.resources_hash != active_resources.generate_hash();
557 let active_generation = status.active_generation;
558 let next_generation = active_generation + 1;
559 let desired_generation = if has_current_changes {
560 next_generation
561 } else {
562 active_generation
563 };
564
565 let resources = environmentd::Resources::new(
568 &self.config,
569 &self.tracing,
570 &self.orchestratord_namespace,
571 mz,
572 desired_generation,
573 );
574 let resources_hash = resources.generate_hash();
575
576 let mut result = match (
577 mz.is_promoting(),
578 has_current_changes,
579 mz.rollout_requested(),
580 ) {
581 (true, _, _) => {
584 self.promote(
585 &client,
586 mz,
587 resources,
588 active_generation,
589 desired_generation,
590 resources_hash,
591 )
592 .await
593 }
594 (false, true, true) => {
596 let mz = self
608 .update_status(
609 &mz_api,
610 mz,
611 MaterializeStatus {
612 active_generation,
613 last_completed_rollout_request: status.last_completed_rollout_request,
618 resource_id: status.resource_id,
619 resources_hash: String::new(),
620 conditions: vec![Condition {
621 type_: "UpToDate".into(),
622 status: "Unknown".into(),
623 last_transition_time: Time(chrono::offset::Utc::now()),
624 message: format!(
625 "Applying changes for generation {desired_generation}"
626 ),
627 observed_generation: mz.meta().generation,
628 reason: "Applying".into(),
629 }],
630 },
631 active_generation != desired_generation,
632 )
633 .await?;
634 let mz = &mz;
635 let status = mz.status();
636
637 if mz.spec.rollout_strategy
638 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
639 {
640 resources
644 .teardown_generation(&client, mz, active_generation)
645 .await?;
646 }
647
648 trace!("applying environment resources");
649 match resources
650 .apply(&client, mz.should_force_promote(), &mz.namespace())
651 .await
652 {
653 Ok(Some(action)) => {
654 trace!("new environment is not yet ready");
655 Ok(Some(action))
656 }
657 Ok(None) => {
658 self.update_status(
666 &mz_api,
667 mz,
668 MaterializeStatus {
669 active_generation,
670 last_completed_rollout_request: status
675 .last_completed_rollout_request,
676 resource_id: status.resource_id,
677 resources_hash: resources_hash.clone(),
678 conditions: vec![Condition {
679 type_: "UpToDate".into(),
680 status: "Unknown".into(),
681 last_transition_time: Time(chrono::offset::Utc::now()),
682 message: format!(
683 "Attempting to promote generation {desired_generation}"
684 ),
685 observed_generation: mz.meta().generation,
686 reason: "Promoting".into(),
687 }],
688 },
689 active_generation != desired_generation,
690 )
691 .await?;
692 self.promote(
693 &client,
694 mz,
695 resources,
696 active_generation,
697 desired_generation,
698 resources_hash,
699 )
700 .await
701 }
702 Err(e) => {
703 self.update_status(
704 &mz_api,
705 mz,
706 MaterializeStatus {
707 active_generation,
708 last_completed_rollout_request: status.last_completed_rollout_request,
713 resource_id: status.resource_id,
714 resources_hash: status.resources_hash,
715 conditions: vec![Condition {
716 type_: "UpToDate".into(),
717 status: "False".into(),
718 last_transition_time: Time(chrono::offset::Utc::now()),
719 message: format!(
720 "Failed to apply changes for generation {desired_generation}: {e}"
721 ),
722 observed_generation: mz.meta().generation,
723 reason: "FailedDeploy".into(),
724 }],
725 },
726 active_generation != desired_generation,
727 )
728 .await?;
729 Err(e)
730 }
731 }
732 }
733 (false, true, false) => {
735 let mut needs_update = mz.conditions_need_update();
736 if mz.update_in_progress() {
737 resources
738 .teardown_generation(&client, mz, next_generation)
739 .await?;
740 needs_update = true;
741 }
742 if needs_update {
743 self.update_status(
744 &mz_api,
745 mz,
746 MaterializeStatus {
747 active_generation,
748 last_completed_rollout_request: mz.requested_reconciliation_id(),
749 resource_id: status.resource_id,
750 resources_hash: status.resources_hash,
751 conditions: vec![Condition {
752 type_: "UpToDate".into(),
753 status: "False".into(),
754 last_transition_time: Time(chrono::offset::Utc::now()),
755 message: format!(
756 "Changes detected, waiting for approval for generation {desired_generation}"
757 ),
758 observed_generation: mz.meta().generation,
759 reason: "WaitingForApproval".into(),
760 }],
761 },
762 active_generation != desired_generation,
763 )
764 .await?;
765 }
766 debug!("changes detected, waiting for approval");
767 Ok(None)
768 }
769 (false, false, _) => {
771 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
776 if mz.update_in_progress() {
777 resources
778 .teardown_generation(&client, mz, next_generation)
779 .await?;
780 needs_update = true;
781 }
782 if needs_update {
783 self.update_status(
784 &mz_api,
785 mz,
786 MaterializeStatus {
787 active_generation,
788 last_completed_rollout_request: mz.requested_reconciliation_id(),
789 resource_id: status.resource_id,
790 resources_hash: status.resources_hash,
791 conditions: vec![Condition {
792 type_: "UpToDate".into(),
793 status: "True".into(),
794 last_transition_time: Time(chrono::offset::Utc::now()),
795 message: format!(
796 "No changes found from generation {active_generation}"
797 ),
798 observed_generation: mz.meta().generation,
799 reason: "Applied".into(),
800 }],
801 },
802 active_generation != desired_generation,
803 )
804 .await?;
805 }
806 debug!("no changes");
807 Ok(None)
808 }
809 }?;
810
811 if let Some(action) = result {
812 return Ok(Some(action));
813 }
814
815 let balancer = balancer::Resources::new(&self.config, mz);
820 if self.config.create_balancers {
821 result = balancer.apply(&client, &mz.namespace()).await?;
822 } else {
823 result = balancer.cleanup(&client, &mz.namespace()).await?;
824 }
825
826 if let Some(action) = result {
827 return Ok(Some(action));
828 }
829
830 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
835 else {
836 return Err(Error::Anyhow(anyhow::anyhow!(
837 "failed to parse environmentd image ref: {}",
838 mz.spec.environmentd_image_ref
839 )));
840 };
841 let console_image_tag = self
842 .config
843 .console_image_tag_map
844 .iter()
845 .find(|kv| kv.key == environmentd_image_tag)
846 .map(|kv| kv.value.clone())
847 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
848 let console = console::Resources::new(
849 &self.config,
850 mz,
851 &matching_image_from_environmentd_image_ref(
852 &mz.spec.environmentd_image_ref,
853 "console",
854 Some(&console_image_tag),
855 ),
856 );
857 if self.config.create_console {
858 console.apply(&client, &mz.namespace()).await?;
859 } else {
860 console.cleanup(&client, &mz.namespace()).await?;
861 }
862
863 Ok(result)
864 }
865
866 #[instrument(fields(organization_name=mz.name_unchecked()))]
867 async fn cleanup(
868 &self,
869 _client: Client,
870 mz: &Self::Resource,
871 ) -> Result<Option<Action>, Self::Error> {
872 self.set_needs_update(mz, false);
873
874 Ok(None)
875 }
876}
877
878fn matching_image_from_environmentd_image_ref(
879 environmentd_image_ref: &str,
880 image_name: &str,
881 image_tag: Option<&str>,
882) -> String {
883 let namespace = environmentd_image_ref
884 .rsplit_once('/')
885 .unwrap_or(("materialize", ""))
886 .0;
887 let tag = image_tag.unwrap_or_else(|| {
888 environmentd_image_ref
889 .rsplit_once(':')
890 .unwrap_or(("", "unstable"))
891 .1
892 });
893 format!("{namespace}/{image_name}:{tag}")
894}