1use std::{
11 collections::BTreeSet,
12 sync::{Arc, Mutex},
13 time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20 apimachinery::pkg::apis::meta::v1::{Condition, Time},
21 jiff::Timestamp,
22};
23use kube::{
24 Api, Client, Resource, ResourceExt,
25 api::PostParams,
26 runtime::{controller::Action, reflector},
27};
28use tracing::{debug, trace};
29use uuid::Uuid;
30
31use crate::{
32 Error,
33 controller::materialize::generation::V161,
34 k8s::{apply_resource, delete_resource, make_reflector},
35 matching_image_from_environmentd_image_ref,
36 metrics::Metrics,
37 tls::{DefaultCertificateSpecs, issuer_ref_defined},
38};
39use mz_cloud_provider::CloudProvider;
40use mz_cloud_resources::crd::{
41 ManagedResource,
42 balancer::v1alpha1::{Balancer, BalancerSpec},
43 console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
44 materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
45};
46use mz_license_keys::validate;
47use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
48use mz_orchestrator_tracing::TracingCliArgs;
49use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
50
51pub mod generation;
52pub mod global;
53
54pub struct Config {
55 pub cloud_provider: CloudProvider,
56 pub region: String,
57 pub create_balancers: bool,
58 pub create_console: bool,
59 pub helm_chart_version: Option<String>,
60 pub secrets_controller: String,
61 pub collect_pod_metrics: bool,
62 pub enable_prometheus_scrape_annotations: bool,
63
64 pub segment_api_key: Option<String>,
65 pub segment_client_side: bool,
66
67 pub console_image_tag_default: String,
68 pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
69
70 pub aws_account_id: Option<String>,
71 pub environmentd_iam_role_arn: Option<String>,
72 pub environmentd_connection_role_arn: Option<String>,
73 pub aws_secrets_controller_tags: Vec<String>,
74 pub environmentd_availability_zones: Option<Vec<String>>,
75
76 pub ephemeral_volume_class: Option<String>,
77 pub scheduler_name: Option<String>,
78 pub enable_security_context: bool,
79 pub enable_internal_statement_logging: bool,
80 pub disable_statement_logging: bool,
81
82 pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
83 pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
84 pub environmentd_affinity: Option<Affinity>,
85 pub environmentd_tolerations: Option<Vec<Toleration>>,
86 pub environmentd_default_resources: Option<ResourceRequirements>,
87 pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
88 pub clusterd_affinity: Option<Affinity>,
89 pub clusterd_tolerations: Option<Vec<Toleration>>,
90 pub image_pull_policy: KubernetesImagePullPolicy,
91 pub network_policies_internal_enabled: bool,
92 pub network_policies_ingress_enabled: bool,
93 pub network_policies_ingress_cidrs: Vec<String>,
94 pub network_policies_egress_enabled: bool,
95 pub network_policies_egress_cidrs: Vec<String>,
96
97 pub environmentd_cluster_replica_sizes: Option<String>,
98 pub bootstrap_default_cluster_replica_size: Option<String>,
99 pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
100 pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
101 pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
102 pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
103 pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
104 pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
105 pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
106 pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
107 pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
108
109 pub environmentd_allowed_origins: Vec<HeaderValue>,
110 pub internal_console_proxy_url: String,
111
112 pub environmentd_sql_port: u16,
113 pub environmentd_http_port: u16,
114 pub environmentd_internal_sql_port: u16,
115 pub environmentd_internal_http_port: u16,
116 pub environmentd_internal_persist_pubsub_port: u16,
117
118 pub default_certificate_specs: DefaultCertificateSpecs,
119
120 pub disable_license_key_checks: bool,
121
122 pub tracing: TracingCliArgs,
123 pub orchestratord_namespace: String,
124}
125
126pub struct Context {
127 config: Config,
128 metrics: Arc<Metrics>,
129 materializes: reflector::Store<Materialize>,
130 needs_update: Arc<Mutex<BTreeSet<String>>>,
131}
132
133impl Context {
134 pub async fn new(config: Config, metrics: Arc<Metrics>, client: kube::Client) -> Self {
135 if config.cloud_provider == CloudProvider::Aws {
136 assert!(
137 config.aws_account_id.is_some(),
138 "--aws-account-id is required when using --cloud-provider=aws"
139 );
140 }
141
142 Self {
143 config,
144 metrics,
145 materializes: make_reflector(client.clone()).await,
146 needs_update: Default::default(),
147 }
148 }
149
150 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
151 let mut needs_update_set = self.needs_update.lock().unwrap();
152 if needs_update {
153 needs_update_set.insert(mz.name_unchecked());
154 } else {
155 needs_update_set.remove(&mz.name_unchecked());
156 }
157 self.metrics
158 .environmentd_needs_update
159 .set(u64::cast_from(needs_update_set.len()));
160 }
161
162 async fn update_status(
163 &self,
164 mz_api: &Api<Materialize>,
165 mz: &Materialize,
166 status: MaterializeStatus,
167 needs_update: bool,
168 ) -> Result<Materialize, kube::Error> {
169 self.set_needs_update(mz, needs_update);
170
171 let mut new_mz = mz.clone();
172 if !mz
173 .status
174 .as_ref()
175 .map_or(true, |mz_status| mz_status.needs_update(&status))
176 {
177 return Ok(new_mz);
178 }
179
180 new_mz.status = Some(status);
181 mz_api
182 .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
183 .await
184 }
185
186 async fn promote(
187 &self,
188 client: &Client,
189 mz: &Materialize,
190 resources: generation::Resources,
191 active_generation: u64,
192 desired_generation: u64,
193 resources_hash: String,
194 ) -> Result<Option<Action>, Error> {
195 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
196 return Ok(Some(action));
197 }
198 resources
199 .teardown_generation(client, mz, active_generation)
200 .await?;
201 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
202 self.update_status(
203 &mz_api,
204 mz,
205 MaterializeStatus {
206 active_generation: desired_generation,
207 last_completed_rollout_request: mz.requested_reconciliation_id(),
208 last_completed_rollout_environmentd_image_ref: Some(
209 mz.spec.environmentd_image_ref.clone(),
210 ),
211 resource_id: mz.status().resource_id,
212 resources_hash,
213 conditions: vec![Condition {
214 type_: "UpToDate".into(),
215 status: "True".into(),
216 last_transition_time: Time(Timestamp::now()),
217 message: format!(
218 "Successfully applied changes for generation {desired_generation}"
219 ),
220 observed_generation: mz.meta().generation,
221 reason: "Applied".into(),
222 }],
223 },
224 false,
225 )
226 .await?;
227 Ok(None)
228 }
229
230 fn check_environment_id_conflicts(&self, mz: &Materialize) -> Result<(), Error> {
231 if mz.spec.environment_id.is_nil() {
232 return Err(Error::Anyhow(anyhow::anyhow!(
236 "trying to reconcile a materialize resource with no environment id - this is a bug!"
237 )));
238 }
239
240 for existing_mz in self.materializes.state() {
241 if existing_mz.spec.environment_id == mz.spec.environment_id
242 && existing_mz.metadata.uid != mz.metadata.uid
243 {
244 return Err(Error::Anyhow(anyhow::anyhow!(
245 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
246 mz.namespace(),
247 mz.name_unchecked(),
248 existing_mz.namespace(),
249 existing_mz.name_unchecked(),
250 )));
251 }
252 }
253
254 Ok(())
255 }
256}
257
258#[async_trait::async_trait]
259impl k8s_controller::Context for Context {
260 type Resource = Materialize;
261 type Error = Error;
262
263 const FINALIZER_NAME: Option<&'static str> =
264 Some("orchestratord.materialize.cloud/materialize");
265
266 #[instrument(fields(organization_name=mz.name_unchecked()))]
267 async fn apply(
268 &self,
269 client: Client,
270 mz: &Self::Resource,
271 ) -> Result<Option<Action>, Self::Error> {
272 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
273 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
274 let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
275 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
276
277 let status = mz.status();
278 if mz.status.is_none() {
279 self.update_status(&mz_api, mz, status, true).await?;
280 return Ok(None);
283 }
284
285 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
286 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
287 .data
288 .as_ref()
289 .and_then(|data| data.get("license_key"))
290 {
291 let license_key = validate(
292 str::from_utf8(&license_key.0)
293 .context("invalid utf8")?
294 .trim(),
295 )?;
296 let environment_id = license_key
297 .environment_id
298 .parse()
299 .context("invalid environment id in license key")?;
300 Some(environment_id)
301 } else {
302 if mz.meets_minimum_version(&V161) {
303 return Err(Error::Anyhow(anyhow::anyhow!(
304 "license_key is required when running in kubernetes",
305 )));
306 } else {
307 None
308 }
309 };
310
311 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
312 let mut mz = mz.clone();
313 if mz.spec.request_rollout.is_nil() {
314 mz.spec.request_rollout = Uuid::new_v4();
315 }
316 if mz.spec.environment_id.is_nil() {
317 if let Some(environment_id) = license_key_environment_id {
318 if environment_id.is_nil() {
319 mz.spec.environment_id = Uuid::new_v4();
322 } else {
323 mz.spec.environment_id = environment_id;
324 }
325 } else {
326 if mz.meets_minimum_version(&V161) {
327 return Err(Error::Anyhow(anyhow::anyhow!(
328 "environmentId is not set in materialize resource {}/{} but no license key was given",
329 mz.namespace(),
330 mz.name_unchecked()
331 )));
332 } else {
333 mz.spec.environment_id = Uuid::new_v4();
334 }
335 }
336 }
337 mz_api
338 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
339 .await?;
340 return Ok(None);
344 }
345
346 if let Some(environment_id) = license_key_environment_id {
347 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
350 return Err(Error::Anyhow(anyhow::anyhow!(
351 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
352 mz.namespace(),
353 mz.name_unchecked(),
354 environment_id,
355 )));
356 }
357 }
358
359 self.check_environment_id_conflicts(mz)?;
360
361 global::Resources::new(&self.config, mz)
362 .apply(&client, &mz.namespace())
363 .await?;
364
365 let active_resources =
371 generation::Resources::new(&self.config, mz, status.active_generation);
372 let has_current_changes = status.resources_hash != active_resources.generate_hash();
373 let active_generation = status.active_generation;
374 let next_generation = active_generation + 1;
375 let desired_generation = if has_current_changes {
376 next_generation
377 } else {
378 active_generation
379 };
380
381 let resources = generation::Resources::new(&self.config, mz, desired_generation);
384 let resources_hash = resources.generate_hash();
385
386 let mut result = match (
387 mz.is_promoting(),
388 has_current_changes,
389 mz.rollout_requested(),
390 ) {
391 (true, _, _) => {
394 self.promote(
395 &client,
396 mz,
397 resources,
398 active_generation,
399 desired_generation,
400 resources_hash,
401 )
402 .await
403 }
404 (false, true, true) => {
406 let mz = if mz.is_ready_to_promote(&resources_hash) {
418 mz
419 } else {
420 &self
421 .update_status(
422 &mz_api,
423 mz,
424 MaterializeStatus {
425 active_generation,
426 last_completed_rollout_request: status
431 .last_completed_rollout_request,
432 last_completed_rollout_environmentd_image_ref: status
433 .last_completed_rollout_environmentd_image_ref,
434 resource_id: status.resource_id.clone(),
435 resources_hash: String::new(),
436 conditions: vec![Condition {
437 type_: "UpToDate".into(),
438 status: "Unknown".into(),
439 last_transition_time: Time(Timestamp::now()),
440 message: format!(
441 "Applying changes for generation {desired_generation}"
442 ),
443 observed_generation: mz.meta().generation,
444 reason: "Applying".into(),
445 }],
446 },
447 active_generation != desired_generation,
448 )
449 .await?
450 };
451 let status = mz.status();
452
453 if !mz.within_upgrade_window() {
454 let last_completed_rollout_environmentd_image_ref =
455 status.last_completed_rollout_environmentd_image_ref;
456
457 self.update_status(
458 &mz_api,
459 mz,
460 MaterializeStatus {
461 active_generation,
462 last_completed_rollout_request: status.last_completed_rollout_request,
463 last_completed_rollout_environmentd_image_ref: last_completed_rollout_environmentd_image_ref.clone(),
464 resource_id: status.resource_id,
465 resources_hash: status.resources_hash,
466 conditions: vec![Condition {
467 type_: "UpToDate".into(),
468 status: "False".into(),
469 last_transition_time: Time(Timestamp::now()),
470 message: format!(
471 "Refusing to upgrade from {} to {}. More than one major version from last successful rollout. If coming from Self Managed 25.2, upgrade to materialize/environmentd:v0.147.20 first.",
472 last_completed_rollout_environmentd_image_ref.expect("should be set if upgrade window check fails"),
473 &mz.spec.environmentd_image_ref,
474 ),
475 observed_generation: mz.meta().generation,
476 reason: "FailedDeploy".into(),
477 }],
478 },
479 active_generation != desired_generation,
480 )
481 .await?;
482 return Ok(None);
483 }
484
485 if mz.spec.rollout_strategy
486 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
487 {
488 resources
492 .teardown_generation(&client, mz, active_generation)
493 .await?;
494 }
495
496 trace!("applying environment resources");
497 match resources
498 .apply(&client, mz.should_force_promote(), &mz.namespace())
499 .await
500 {
501 Ok(Some(action)) => {
502 trace!("new environment is not yet ready");
503 Ok(Some(action))
504 }
505 Ok(None) => {
506 if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
507 && !mz.should_force_promote()
508 {
509 trace!(
510 "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
511 );
512 self.update_status(
513 &mz_api,
514 mz,
515 MaterializeStatus {
516 active_generation,
517 last_completed_rollout_request: status
518 .last_completed_rollout_request,
519 last_completed_rollout_environmentd_image_ref: status
520 .last_completed_rollout_environmentd_image_ref,
521 resource_id: status.resource_id,
522 resources_hash,
523 conditions: vec![Condition {
524 type_: "UpToDate".into(),
525 status: "Unknown".into(),
526 last_transition_time: Time(Timestamp::now()),
527 message: format!(
528 "Ready to promote generation {desired_generation}"
529 ),
530 observed_generation: mz.meta().generation,
531 reason: "ReadyToPromote".into(),
532 }],
533 },
534 active_generation != desired_generation,
535 )
536 .await?;
537 return Ok(None);
538 }
539 self.update_status(
547 &mz_api,
548 mz,
549 MaterializeStatus {
550 active_generation,
551 last_completed_rollout_request: status
556 .last_completed_rollout_request,
557 last_completed_rollout_environmentd_image_ref: status
558 .last_completed_rollout_environmentd_image_ref,
559 resource_id: status.resource_id,
560 resources_hash: resources_hash.clone(),
561 conditions: vec![Condition {
562 type_: "UpToDate".into(),
563 status: "Unknown".into(),
564 last_transition_time: Time(Timestamp::now()),
565 message: format!(
566 "Attempting to promote generation {desired_generation}"
567 ),
568 observed_generation: mz.meta().generation,
569 reason: "Promoting".into(),
570 }],
571 },
572 active_generation != desired_generation,
573 )
574 .await?;
575 self.promote(
576 &client,
577 mz,
578 resources,
579 active_generation,
580 desired_generation,
581 resources_hash,
582 )
583 .await
584 }
585 Err(e) => {
586 self.update_status(
587 &mz_api,
588 mz,
589 MaterializeStatus {
590 active_generation,
591 last_completed_rollout_request: status.last_completed_rollout_request,
596 last_completed_rollout_environmentd_image_ref: status
597 .last_completed_rollout_environmentd_image_ref,
598 resource_id: status.resource_id,
599 resources_hash: status.resources_hash,
600 conditions: vec![Condition {
601 type_: "UpToDate".into(),
602 status: "False".into(),
603 last_transition_time: Time(Timestamp::now()),
604 message: format!(
605 "Failed to apply changes for generation {desired_generation}: {e}"
606 ),
607 observed_generation: mz.meta().generation,
608 reason: "FailedDeploy".into(),
609 }],
610 },
611 active_generation != desired_generation,
612 )
613 .await?;
614 Err(e)
615 }
616 }
617 }
618 (false, true, false) => {
620 let mut needs_update = mz.conditions_need_update();
621 if mz.update_in_progress() {
622 resources
623 .teardown_generation(&client, mz, next_generation)
624 .await?;
625 needs_update = true;
626 }
627 if needs_update {
628 self.update_status(
629 &mz_api,
630 mz,
631 MaterializeStatus {
632 active_generation,
633 last_completed_rollout_request: mz.requested_reconciliation_id(),
634 last_completed_rollout_environmentd_image_ref: status
635 .last_completed_rollout_environmentd_image_ref,
636 resource_id: status.resource_id.clone(),
637 resources_hash: status.resources_hash,
638 conditions: vec![Condition {
639 type_: "UpToDate".into(),
640 status: "False".into(),
641 last_transition_time: Time(Timestamp::now()),
642 message: format!(
643 "Changes detected, waiting for approval for generation {desired_generation}"
644 ),
645 observed_generation: mz.meta().generation,
646 reason: "WaitingForApproval".into(),
647 }],
648 },
649 active_generation != desired_generation,
650 )
651 .await?;
652 }
653 debug!("changes detected, waiting for approval");
654 Ok(None)
655 }
656 (false, false, _) => {
658 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
663 if mz.update_in_progress() {
664 resources
665 .teardown_generation(&client, mz, next_generation)
666 .await?;
667 needs_update = true;
668 }
669 if needs_update {
670 self.update_status(
671 &mz_api,
672 mz,
673 MaterializeStatus {
674 active_generation,
675 last_completed_rollout_request: mz.requested_reconciliation_id(),
676 last_completed_rollout_environmentd_image_ref: status
677 .last_completed_rollout_environmentd_image_ref,
678 resource_id: status.resource_id.clone(),
679 resources_hash: status.resources_hash,
680 conditions: vec![Condition {
681 type_: "UpToDate".into(),
682 status: "True".into(),
683 last_transition_time: Time(Timestamp::now()),
684 message: format!(
685 "No changes found from generation {active_generation}"
686 ),
687 observed_generation: mz.meta().generation,
688 reason: "Applied".into(),
689 }],
690 },
691 active_generation != desired_generation,
692 )
693 .await?;
694 }
695 debug!("no changes");
696 Ok(None)
697 }
698 }?;
699
700 if let Some(action) = result {
701 return Ok(Some(action));
702 }
703
704 if self.config.create_balancers {
709 let balancer = Balancer {
710 metadata: mz.managed_resource_meta(mz.name_unchecked()),
711 spec: BalancerSpec {
712 balancerd_image_ref: matching_image_from_environmentd_image_ref(
713 &mz.spec.environmentd_image_ref,
714 "balancerd",
715 None,
716 ),
717 resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
718 replicas: Some(mz.balancerd_replicas()),
719 external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
720 internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
721 pod_annotations: mz.spec.pod_annotations.clone(),
722 pod_labels: mz.spec.pod_labels.clone(),
723 static_routing: Some(
724 mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
725 environmentd_namespace: mz.namespace(),
726 environmentd_service_name: mz.environmentd_service_name(),
727 },
728 ),
729 frontegg_routing: None,
730 resource_id: Some(status.resource_id.clone()),
731 },
732 status: None,
733 };
734 let balancer = apply_resource(&balancer_api, &balancer).await?;
735 result = wait_for_balancer(&balancer)?;
736 } else {
737 delete_resource(&balancer_api, &mz.name_unchecked()).await?;
738 }
739
740 if let Some(action) = result {
741 return Ok(Some(action));
742 }
743
744 if self.config.create_console {
748 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
749 else {
750 return Err(Error::Anyhow(anyhow::anyhow!(
751 "failed to parse environmentd image ref: {}",
752 mz.spec.environmentd_image_ref
753 )));
754 };
755 let console_image_tag = self
756 .config
757 .console_image_tag_map
758 .iter()
759 .find(|kv| kv.key == environmentd_image_tag)
760 .map(|kv| kv.value.clone())
761 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
762 let console = Console {
763 metadata: mz.managed_resource_meta(mz.name_unchecked()),
764 spec: ConsoleSpec {
765 console_image_ref: matching_image_from_environmentd_image_ref(
766 &mz.spec.environmentd_image_ref,
767 "console",
768 Some(&console_image_tag),
769 ),
770 resource_requirements: mz.spec.console_resource_requirements.clone(),
771 replicas: Some(mz.console_replicas()),
772 external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
773 pod_annotations: mz.spec.pod_annotations.clone(),
774 pod_labels: mz.spec.pod_labels.clone(),
775 balancerd: BalancerdRef {
776 service_name: mz.balancerd_service_name(),
777 namespace: mz.namespace(),
778 scheme: if issuer_ref_defined(
779 &self.config.default_certificate_specs.balancerd_external,
780 &mz.spec.balancerd_external_certificate_spec,
781 ) {
782 HttpConnectionScheme::Https
783 } else {
784 HttpConnectionScheme::Http
785 },
786 },
787 authenticator_kind: mz.spec.authenticator_kind,
788 resource_id: Some(status.resource_id),
789 },
790 status: None,
791 };
792 apply_resource(&console_api, &console).await?;
793 } else {
794 delete_resource(&console_api, &mz.name_unchecked()).await?;
795 }
796
797 Ok(result)
798 }
799
800 #[instrument(fields(organization_name=mz.name_unchecked()))]
801 async fn cleanup(
802 &self,
803 _client: Client,
804 mz: &Self::Resource,
805 ) -> Result<Option<Action>, Self::Error> {
806 self.set_needs_update(mz, false);
807
808 Ok(None)
809 }
810}
811
812fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
813 if let Some(conditions) = balancer
814 .status
815 .as_ref()
816 .map(|status| status.conditions.as_slice())
817 {
818 if conditions
819 .iter()
820 .any(|condition| condition.type_ == "Ready" && condition.status == "True")
821 {
822 return Ok(None);
823 }
824 }
825
826 Ok(Some(Action::requeue(Duration::from_secs(1))))
827}