1use std::{
11 collections::BTreeSet,
12 sync::{Arc, Mutex},
13 time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20 apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{
23 Api, Client, Resource, ResourceExt,
24 api::PostParams,
25 runtime::{controller::Action, reflector},
26};
27use tracing::{debug, trace};
28use uuid::Uuid;
29
30use crate::{
31 Error,
32 controller::materialize::environmentd::V161,
33 k8s::{apply_resource, delete_resource, make_reflector},
34 matching_image_from_environmentd_image_ref,
35 metrics::Metrics,
36 tls::{DefaultCertificateSpecs, issuer_ref_defined},
37};
38use mz_cloud_provider::CloudProvider;
39use mz_cloud_resources::crd::{
40 ManagedResource,
41 balancer::v1alpha1::{Balancer, BalancerSpec},
42 console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
43 materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
44};
45use mz_license_keys::validate;
46use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
47use mz_orchestrator_tracing::TracingCliArgs;
48use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
49
50pub mod environmentd;
51
52pub struct Config {
53 pub cloud_provider: CloudProvider,
54 pub region: String,
55 pub create_balancers: bool,
56 pub create_console: bool,
57 pub helm_chart_version: Option<String>,
58 pub secrets_controller: String,
59 pub collect_pod_metrics: bool,
60 pub enable_prometheus_scrape_annotations: bool,
61
62 pub segment_api_key: Option<String>,
63 pub segment_client_side: bool,
64
65 pub console_image_tag_default: String,
66 pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
67
68 pub aws_account_id: Option<String>,
69 pub environmentd_iam_role_arn: Option<String>,
70 pub environmentd_connection_role_arn: Option<String>,
71 pub aws_secrets_controller_tags: Vec<String>,
72 pub environmentd_availability_zones: Option<Vec<String>>,
73
74 pub ephemeral_volume_class: Option<String>,
75 pub scheduler_name: Option<String>,
76 pub enable_security_context: bool,
77 pub enable_internal_statement_logging: bool,
78 pub disable_statement_logging: bool,
79
80 pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
81 pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
82 pub environmentd_affinity: Option<Affinity>,
83 pub environmentd_tolerations: Option<Vec<Toleration>>,
84 pub environmentd_default_resources: Option<ResourceRequirements>,
85 pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
86 pub clusterd_affinity: Option<Affinity>,
87 pub clusterd_tolerations: Option<Vec<Toleration>>,
88 pub image_pull_policy: KubernetesImagePullPolicy,
89 pub network_policies_internal_enabled: bool,
90 pub network_policies_ingress_enabled: bool,
91 pub network_policies_ingress_cidrs: Vec<String>,
92 pub network_policies_egress_enabled: bool,
93 pub network_policies_egress_cidrs: Vec<String>,
94
95 pub environmentd_cluster_replica_sizes: Option<String>,
96 pub bootstrap_default_cluster_replica_size: Option<String>,
97 pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
98 pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
99 pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
100 pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
101 pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
102 pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
103 pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
104 pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
105 pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
106
107 pub environmentd_allowed_origins: Vec<HeaderValue>,
108 pub internal_console_proxy_url: String,
109
110 pub environmentd_sql_port: u16,
111 pub environmentd_http_port: u16,
112 pub environmentd_internal_sql_port: u16,
113 pub environmentd_internal_http_port: u16,
114 pub environmentd_internal_persist_pubsub_port: u16,
115
116 pub default_certificate_specs: DefaultCertificateSpecs,
117
118 pub disable_license_key_checks: bool,
119
120 pub tracing: TracingCliArgs,
121 pub orchestratord_namespace: String,
122}
123
124pub struct Context {
125 config: Config,
126 metrics: Arc<Metrics>,
127 materializes: reflector::Store<Materialize>,
128 needs_update: Arc<Mutex<BTreeSet<String>>>,
129}
130
131impl Context {
132 pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
133 if config.cloud_provider == CloudProvider::Aws {
134 assert!(
135 config.aws_account_id.is_some(),
136 "--aws-account-id is required when using --cloud-provider=aws"
137 );
138 }
139
140 Self {
141 config,
142 metrics,
143 materializes: make_reflector(client.clone()).await,
144 needs_update: Default::default(),
145 }
146 }
147
148 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
149 let mut needs_update_set = self.needs_update.lock().unwrap();
150 if needs_update {
151 needs_update_set.insert(mz.name_unchecked());
152 } else {
153 needs_update_set.remove(&mz.name_unchecked());
154 }
155 self.metrics
156 .environmentd_needs_update
157 .set(u64::cast_from(needs_update_set.len()));
158 }
159
160 async fn update_status(
161 &self,
162 mz_api: &Api<Materialize>,
163 mz: &Materialize,
164 status: MaterializeStatus,
165 needs_update: bool,
166 ) -> Result<Materialize, kube::Error> {
167 self.set_needs_update(mz, needs_update);
168
169 let mut new_mz = mz.clone();
170 if !mz
171 .status
172 .as_ref()
173 .map_or(true, |mz_status| mz_status.needs_update(&status))
174 {
175 return Ok(new_mz);
176 }
177
178 new_mz.status = Some(status);
179 mz_api
180 .replace_status(
181 &mz.name_unchecked(),
182 &PostParams::default(),
183 serde_json::to_vec(&new_mz).unwrap(),
184 )
185 .await
186 }
187
188 async fn promote(
189 &self,
190 client: &Client,
191 mz: &Materialize,
192 resources: environmentd::Resources,
193 active_generation: u64,
194 desired_generation: u64,
195 resources_hash: String,
196 ) -> Result<Option<Action>, Error> {
197 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
198 return Ok(Some(action));
199 }
200 resources
201 .teardown_generation(client, mz, active_generation)
202 .await?;
203 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
204 self.update_status(
205 &mz_api,
206 mz,
207 MaterializeStatus {
208 active_generation: desired_generation,
209 last_completed_rollout_request: mz.requested_reconciliation_id(),
210 last_completed_rollout_environmentd_image_ref: Some(
211 mz.spec.environmentd_image_ref.clone(),
212 ),
213 resource_id: mz.status().resource_id,
214 resources_hash,
215 conditions: vec![Condition {
216 type_: "UpToDate".into(),
217 status: "True".into(),
218 last_transition_time: Time(chrono::offset::Utc::now()),
219 message: format!(
220 "Successfully applied changes for generation {desired_generation}"
221 ),
222 observed_generation: mz.meta().generation,
223 reason: "Applied".into(),
224 }],
225 },
226 false,
227 )
228 .await?;
229 Ok(None)
230 }
231
232 fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
233 if mz.spec.environment_id.is_nil() {
234 return Err(Error::Anyhow(anyhow::anyhow!(
238 "trying to reconcile a materialize resource with no environment id - this is a bug!"
239 )));
240 }
241
242 for existing_mz in self.materializes.state() {
243 if existing_mz.spec.environment_id == mz.spec.environment_id
244 && existing_mz.metadata.uid != mz.metadata.uid
245 {
246 return Err(Error::Anyhow(anyhow::anyhow!(
247 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
248 mz.namespace(),
249 mz.name_unchecked(),
250 existing_mz.namespace(),
251 existing_mz.name_unchecked(),
252 )));
253 }
254 }
255
256 Ok(())
257 }
258}
259
260#[async_trait::async_trait]
261impl k8s_controller::Context for Context {
262 type Resource = Materialize;
263 type Error = Error;
264
265 const FINALIZER_NAME: Option<&'static str> =
266 Some("orchestratord.materialize.cloud/materialize");
267
268 #[instrument(fields(organization_name=mz.name_unchecked()))]
269 async fn apply(
270 &self,
271 client: Client,
272 mz: &Self::Resource,
273 ) -> Result<Option<Action>, Self::Error> {
274 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
275 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
276 let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
277 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
278
279 let status = mz.status();
280 if mz.status.is_none() {
281 self.update_status(&mz_api, mz, status, true).await?;
282 return Ok(None);
285 }
286
287 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
288 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
289 .data
290 .as_ref()
291 .and_then(|data| data.get("license_key"))
292 {
293 let license_key = validate(
294 str::from_utf8(&license_key.0)
295 .context("invalid utf8")?
296 .trim(),
297 )?;
298 let environment_id = license_key
299 .environment_id
300 .parse()
301 .context("invalid environment id in license key")?;
302 Some(environment_id)
303 } else {
304 if mz.meets_minimum_version(&V161) {
305 return Err(Error::Anyhow(anyhow::anyhow!(
306 "license_key is required when running in kubernetes",
307 )));
308 } else {
309 None
310 }
311 };
312
313 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
314 let mut mz = mz.clone();
315 if mz.spec.request_rollout.is_nil() {
316 mz.spec.request_rollout = Uuid::new_v4();
317 }
318 if mz.spec.environment_id.is_nil() {
319 if let Some(environment_id) = license_key_environment_id {
320 if environment_id.is_nil() {
321 mz.spec.environment_id = Uuid::new_v4();
324 } else {
325 mz.spec.environment_id = environment_id;
326 }
327 } else {
328 if mz.meets_minimum_version(&V161) {
329 return Err(Error::Anyhow(anyhow::anyhow!(
330 "environmentId is not set in materialize resource {}/{} but no license key was given",
331 mz.namespace(),
332 mz.name_unchecked()
333 )));
334 } else {
335 mz.spec.environment_id = Uuid::new_v4();
336 }
337 }
338 }
339 mz_api
340 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
341 .await?;
342 return Ok(None);
346 }
347
348 if let Some(environment_id) = license_key_environment_id {
349 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
352 return Err(Error::Anyhow(anyhow::anyhow!(
353 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
354 mz.namespace(),
355 mz.name_unchecked(),
356 environment_id,
357 )));
358 }
359 }
360
361 self.check_environment_id_conflicts(mz)?;
362
363 let active_resources =
369 environmentd::Resources::new(&self.config, mz, status.active_generation);
370 let has_current_changes = status.resources_hash != active_resources.generate_hash();
371 let active_generation = status.active_generation;
372 let next_generation = active_generation + 1;
373 let desired_generation = if has_current_changes {
374 next_generation
375 } else {
376 active_generation
377 };
378
379 let resources = environmentd::Resources::new(&self.config, mz, desired_generation);
382 let resources_hash = resources.generate_hash();
383
384 let mut result = match (
385 mz.is_promoting(),
386 has_current_changes,
387 mz.rollout_requested(),
388 ) {
389 (true, _, _) => {
392 self.promote(
393 &client,
394 mz,
395 resources,
396 active_generation,
397 desired_generation,
398 resources_hash,
399 )
400 .await
401 }
402 (false, true, true) => {
404 let mz = if mz.is_ready_to_promote(&resources_hash) {
416 mz
417 } else {
418 &self
419 .update_status(
420 &mz_api,
421 mz,
422 MaterializeStatus {
423 active_generation,
424 last_completed_rollout_request: status
429 .last_completed_rollout_request,
430 last_completed_rollout_environmentd_image_ref: status
431 .last_completed_rollout_environmentd_image_ref,
432 resource_id: status.resource_id.clone(),
433 resources_hash: String::new(),
434 conditions: vec![Condition {
435 type_: "UpToDate".into(),
436 status: "Unknown".into(),
437 last_transition_time: Time(chrono::offset::Utc::now()),
438 message: format!(
439 "Applying changes for generation {desired_generation}"
440 ),
441 observed_generation: mz.meta().generation,
442 reason: "Applying".into(),
443 }],
444 },
445 active_generation != desired_generation,
446 )
447 .await?
448 };
449 let status = mz.status();
450
451 if !mz.within_upgrade_window() {
452 let last_completed_rollout_environmentd_image_ref =
453 status.last_completed_rollout_environmentd_image_ref;
454
455 self.update_status(
456 &mz_api,
457 mz,
458 MaterializeStatus {
459 active_generation,
460 last_completed_rollout_request: status.last_completed_rollout_request,
461 last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
462 resource_id: status.resource_id,
463 resources_hash: status.resources_hash,
464 conditions: vec![Condition {
465 type_: "UpToDate".into(),
466 status: "False".into(),
467 last_transition_time: Time(chrono::offset::Utc::now()),
468 message: format!(
469 "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
470 last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
471 &mz.spec.environmentd_image_ref,
472 ),
473 observed_generation: mz.meta().generation,
474 reason: "FailedDeploy".into(),
475 }],
476 },
477 active_generation != desired_generation,
478 )
479 .await?;
480 return Ok(None);
481 }
482
483 if mz.spec.rollout_strategy
484 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
485 {
486 resources
490 .teardown_generation(&client, mz, active_generation)
491 .await?;
492 }
493
494 trace!("applying environment resources");
495 match resources
496 .apply(&client, mz.should_force_promote(), &mz.namespace())
497 .await
498 {
499 Ok(Some(action)) => {
500 trace!("new environment is not yet ready");
501 Ok(Some(action))
502 }
503 Ok(None) => {
504 if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
505 && !mz.should_force_promote()
506 {
507 trace!(
508 "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
509 );
510 self.update_status(
511 &mz_api,
512 mz,
513 MaterializeStatus {
514 active_generation,
515 last_completed_rollout_request: status
516 .last_completed_rollout_request,
517 last_completed_rollout_environmentd_image_ref: status
518 .last_completed_rollout_environmentd_image_ref,
519 resource_id: status.resource_id,
520 resources_hash,
521 conditions: vec![Condition {
522 type_: "UpToDate".into(),
523 status: "Unknown".into(),
524 last_transition_time: Time(chrono::offset::Utc::now()),
525 message: format!(
526 "Ready to promote generation {desired_generation}"
527 ),
528 observed_generation: mz.meta().generation,
529 reason: "ReadyToPromote".into(),
530 }],
531 },
532 active_generation != desired_generation,
533 )
534 .await?;
535 return Ok(None);
536 }
537 self.update_status(
545 &mz_api,
546 mz,
547 MaterializeStatus {
548 active_generation,
549 last_completed_rollout_request: status
554 .last_completed_rollout_request,
555 last_completed_rollout_environmentd_image_ref: status
556 .last_completed_rollout_environmentd_image_ref,
557 resource_id: status.resource_id,
558 resources_hash: resources_hash.clone(),
559 conditions: vec![Condition {
560 type_: "UpToDate".into(),
561 status: "Unknown".into(),
562 last_transition_time: Time(chrono::offset::Utc::now()),
563 message: format!(
564 "Attempting to promote generation {desired_generation}"
565 ),
566 observed_generation: mz.meta().generation,
567 reason: "Promoting".into(),
568 }],
569 },
570 active_generation != desired_generation,
571 )
572 .await?;
573 self.promote(
574 &client,
575 mz,
576 resources,
577 active_generation,
578 desired_generation,
579 resources_hash,
580 )
581 .await
582 }
583 Err(e) => {
584 self.update_status(
585 &mz_api,
586 mz,
587 MaterializeStatus {
588 active_generation,
589 last_completed_rollout_request: status.last_completed_rollout_request,
594 last_completed_rollout_environmentd_image_ref: status
595 .last_completed_rollout_environmentd_image_ref,
596 resource_id: status.resource_id,
597 resources_hash: status.resources_hash,
598 conditions: vec![Condition {
599 type_: "UpToDate".into(),
600 status: "False".into(),
601 last_transition_time: Time(chrono::offset::Utc::now()),
602 message: format!(
603 "Failed to apply changes for generation {desired_generation}: {e}"
604 ),
605 observed_generation: mz.meta().generation,
606 reason: "FailedDeploy".into(),
607 }],
608 },
609 active_generation != desired_generation,
610 )
611 .await?;
612 Err(e)
613 }
614 }
615 }
616 (false, true, false) => {
618 let mut needs_update = mz.conditions_need_update();
619 if mz.update_in_progress() {
620 resources
621 .teardown_generation(&client, mz, next_generation)
622 .await?;
623 needs_update = true;
624 }
625 if needs_update {
626 self.update_status(
627 &mz_api,
628 mz,
629 MaterializeStatus {
630 active_generation,
631 last_completed_rollout_request: mz.requested_reconciliation_id(),
632 last_completed_rollout_environmentd_image_ref: status
633 .last_completed_rollout_environmentd_image_ref,
634 resource_id: status.resource_id.clone(),
635 resources_hash: status.resources_hash,
636 conditions: vec![Condition {
637 type_: "UpToDate".into(),
638 status: "False".into(),
639 last_transition_time: Time(chrono::offset::Utc::now()),
640 message: format!(
641 "Changes detected, waiting for approval for generation {desired_generation}"
642 ),
643 observed_generation: mz.meta().generation,
644 reason: "WaitingForApproval".into(),
645 }],
646 },
647 active_generation != desired_generation,
648 )
649 .await?;
650 }
651 debug!("changes detected, waiting for approval");
652 Ok(None)
653 }
654 (false, false, _) => {
656 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
661 if mz.update_in_progress() {
662 resources
663 .teardown_generation(&client, mz, next_generation)
664 .await?;
665 needs_update = true;
666 }
667 if needs_update {
668 self.update_status(
669 &mz_api,
670 mz,
671 MaterializeStatus {
672 active_generation,
673 last_completed_rollout_request: mz.requested_reconciliation_id(),
674 last_completed_rollout_environmentd_image_ref: status
675 .last_completed_rollout_environmentd_image_ref,
676 resource_id: status.resource_id.clone(),
677 resources_hash: status.resources_hash,
678 conditions: vec![Condition {
679 type_: "UpToDate".into(),
680 status: "True".into(),
681 last_transition_time: Time(chrono::offset::Utc::now()),
682 message: format!(
683 "No changes found from generation {active_generation}"
684 ),
685 observed_generation: mz.meta().generation,
686 reason: "Applied".into(),
687 }],
688 },
689 active_generation != desired_generation,
690 )
691 .await?;
692 }
693 debug!("no changes");
694 Ok(None)
695 }
696 }?;
697
698 if let Some(action) = result {
699 return Ok(Some(action));
700 }
701
702 if self.config.create_balancers {
707 let balancer = Balancer {
708 metadata: mz.managed_resource_meta(mz.name_unchecked()),
709 spec: BalancerSpec {
710 balancerd_image_ref: matching_image_from_environmentd_image_ref(
711 &mz.spec.environmentd_image_ref,
712 "balancerd",
713 None,
714 ),
715 resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
716 replicas: Some(mz.balancerd_replicas()),
717 external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
718 internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
719 pod_annotations: mz.spec.pod_annotations.clone(),
720 pod_labels: mz.spec.pod_labels.clone(),
721 static_routing: Some(
722 mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
723 environmentd_namespace: mz.namespace(),
724 environmentd_service_name: mz.environmentd_service_name(),
725 },
726 ),
727 frontegg_routing: None,
728 resource_id: Some(status.resource_id.clone()),
729 },
730 status: None,
731 };
732 let balancer = apply_resource(&balancer_api, &balancer).await?;
733 result = wait_for_balancer(&balancer)?;
734 } else {
735 delete_resource(&balancer_api, &mz.name_unchecked()).await?;
736 }
737
738 if let Some(action) = result {
739 return Ok(Some(action));
740 }
741
742 if self.config.create_console {
746 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
747 else {
748 return Err(Error::Anyhow(anyhow::anyhow!(
749 "failed to parse environmentd image ref: {}",
750 mz.spec.environmentd_image_ref
751 )));
752 };
753 let console_image_tag = self
754 .config
755 .console_image_tag_map
756 .iter()
757 .find(|kv| kv.key == environmentd_image_tag)
758 .map(|kv| kv.value.clone())
759 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
760 let console = Console {
761 metadata: mz.managed_resource_meta(mz.name_unchecked()),
762 spec: ConsoleSpec {
763 console_image_ref: matching_image_from_environmentd_image_ref(
764 &mz.spec.environmentd_image_ref,
765 "console",
766 Some(&console_image_tag),
767 ),
768 resource_requirements: mz.spec.console_resource_requirements.clone(),
769 replicas: Some(mz.console_replicas()),
770 external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
771 pod_annotations: mz.spec.pod_annotations.clone(),
772 pod_labels: mz.spec.pod_labels.clone(),
773 balancerd: BalancerdRef {
774 service_name: mz.balancerd_service_name(),
775 namespace: mz.namespace(),
776 scheme: if issuer_ref_defined(
777 &self.config.default_certificate_specs.balancerd_external,
778 &mz.spec.balancerd_external_certificate_spec,
779 ) {
780 HttpConnectionScheme::Https
781 } else {
782 HttpConnectionScheme::Http
783 },
784 },
785 authenticator_kind: mz.spec.authenticator_kind,
786 resource_id: Some(status.resource_id),
787 },
788 status: None,
789 };
790 apply_resource(&console_api, &console).await?;
791 } else {
792 delete_resource(&console_api, &mz.name_unchecked()).await?;
793 }
794
795 Ok(result)
796 }
797
798 #[instrument(fields(organization_name=mz.name_unchecked()))]
799 async fn cleanup(
800 &self,
801 _client: Client,
802 mz: &Self::Resource,
803 ) -> Result<Option<Action>, Self::Error> {
804 self.set_needs_update(mz, false);
805
806 Ok(None)
807 }
808}
809
810fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
811 if let Some(conditions) = balancer
812 .status
813 .as_ref()
814 .map(|status| status.conditions.as_slice())
815 {
816 if conditions
817 .iter()
818 .any(|condition| condition.type_ == "Ready" && condition.status == "True")
819 {
820 return Ok(None);
821 }
822 }
823
824 Ok(Some(Action::requeue(Duration::from_secs(1))))
825}