1use std::{
11 collections::BTreeSet,
12 sync::{Arc, Mutex},
13 time::Duration,
14};
15
16use anyhow::Context as _;
17use http::HeaderValue;
18use k8s_openapi::{
19 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
20 apimachinery::pkg::apis::meta::v1::{Condition, Time},
21 jiff::Timestamp,
22};
23use kube::{
24 Api, Client, Resource, ResourceExt,
25 api::{ListParams, PostParams},
26 runtime::controller::Action,
27};
28use tracing::{debug, trace};
29use uuid::Uuid;
30
31use crate::{
32 Error,
33 controller::materialize::generation::V161,
34 k8s::{apply_resource, delete_resource},
35 matching_image_from_environmentd_image_ref,
36 metrics::Metrics,
37 parse_image_tag,
38 tls::{DefaultCertificateSpecs, issuer_ref_defined},
39};
40use mz_cloud_provider::CloudProvider;
41use mz_cloud_resources::crd::{
42 ManagedResource,
43 balancer::v1alpha1::{Balancer, BalancerSpec},
44 console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme},
45 materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus},
46};
47use mz_license_keys::validate;
48use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
49use mz_orchestrator_tracing::TracingCliArgs;
50use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
51
52pub mod generation;
53pub mod global;
54
55pub struct Config {
56 pub cloud_provider: CloudProvider,
57 pub region: String,
58 pub create_balancers: bool,
59 pub create_console: bool,
60 pub helm_chart_version: Option<String>,
61 pub secrets_controller: String,
62 pub collect_pod_metrics: bool,
63 pub enable_prometheus_scrape_annotations: bool,
64
65 pub segment_api_key: Option<String>,
66 pub segment_client_side: bool,
67
68 pub console_image_tag_default: String,
69 pub console_image_tag_map: Vec<KeyValueArg<String, String>>,
70
71 pub aws_account_id: Option<String>,
72 pub environmentd_iam_role_arn: Option<String>,
73 pub environmentd_connection_role_arn: Option<String>,
74 pub aws_secrets_controller_tags: Vec<String>,
75 pub environmentd_availability_zones: Option<Vec<String>>,
76
77 pub ephemeral_volume_class: Option<String>,
78 pub scheduler_name: Option<String>,
79 pub enable_security_context: bool,
80 pub enable_internal_statement_logging: bool,
81 pub disable_statement_logging: bool,
82
83 pub orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
84 pub environmentd_node_selector: Vec<KeyValueArg<String, String>>,
85 pub environmentd_affinity: Option<Affinity>,
86 pub environmentd_tolerations: Option<Vec<Toleration>>,
87 pub environmentd_default_resources: Option<ResourceRequirements>,
88 pub clusterd_node_selector: Vec<KeyValueArg<String, String>>,
89 pub clusterd_affinity: Option<Affinity>,
90 pub clusterd_tolerations: Option<Vec<Toleration>>,
91 pub image_pull_policy: KubernetesImagePullPolicy,
92 pub network_policies_internal_enabled: bool,
93 pub network_policies_ingress_enabled: bool,
94 pub network_policies_ingress_cidrs: Vec<String>,
95 pub network_policies_egress_enabled: bool,
96 pub network_policies_egress_cidrs: Vec<String>,
97
98 pub environmentd_cluster_replica_sizes: Option<String>,
99 pub bootstrap_default_cluster_replica_size: Option<String>,
100 pub bootstrap_builtin_system_cluster_replica_size: Option<String>,
101 pub bootstrap_builtin_probe_cluster_replica_size: Option<String>,
102 pub bootstrap_builtin_support_cluster_replica_size: Option<String>,
103 pub bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
104 pub bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
105 pub bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
106 pub bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
107 pub bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
108 pub bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
109
110 pub environmentd_allowed_origins: Vec<HeaderValue>,
111 pub internal_console_proxy_url: String,
112
113 pub environmentd_sql_port: u16,
114 pub environmentd_http_port: u16,
115 pub environmentd_internal_sql_port: u16,
116 pub environmentd_internal_http_port: u16,
117 pub environmentd_internal_persist_pubsub_port: u16,
118
119 pub default_certificate_specs: DefaultCertificateSpecs,
120
121 pub disable_license_key_checks: bool,
122
123 pub tracing: TracingCliArgs,
124 pub orchestratord_namespace: String,
125}
126
127pub struct Context {
128 config: Config,
129 metrics: Arc<Metrics>,
130 needs_update: Arc<Mutex<BTreeSet<String>>>,
131}
132
133impl Context {
134 pub fn new(config: Config, metrics: Arc<Metrics>) -> Self {
135 if config.cloud_provider == CloudProvider::Aws {
136 assert!(
137 config.aws_account_id.is_some(),
138 "--aws-account-id is required when using --cloud-provider=aws"
139 );
140 }
141
142 Self {
143 config,
144 metrics,
145 needs_update: Default::default(),
146 }
147 }
148
149 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
150 let mut needs_update_set = self.needs_update.lock().unwrap();
151 if needs_update {
152 needs_update_set.insert(mz.name_unchecked());
153 } else {
154 needs_update_set.remove(&mz.name_unchecked());
155 }
156 self.metrics
157 .environmentd_needs_update
158 .set(u64::cast_from(needs_update_set.len()));
159 }
160
161 async fn update_status(
162 &self,
163 mz_api: &Api<Materialize>,
164 mz: &Materialize,
165 status: MaterializeStatus,
166 needs_update: bool,
167 ) -> Result<Materialize, kube::Error> {
168 self.set_needs_update(mz, needs_update);
169
170 let mut new_mz = mz.clone();
171 if !mz
172 .status
173 .as_ref()
174 .map_or(true, |mz_status| mz_status.needs_update(&status))
175 {
176 return Ok(new_mz);
177 }
178
179 new_mz.status = Some(status);
180 mz_api
181 .replace_status(&mz.name_unchecked(), &PostParams::default(), &new_mz)
182 .await
183 }
184
185 async fn promote(
186 &self,
187 client: &Client,
188 mz: &Materialize,
189 resources: generation::Resources,
190 active_generation: u64,
191 desired_generation: u64,
192 resources_hash: String,
193 ) -> Result<Option<Action>, Error> {
194 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
195 return Ok(Some(action));
196 }
197 resources
198 .teardown_generation(client, mz, active_generation)
199 .await?;
200 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
201 self.update_status(
202 &mz_api,
203 mz,
204 MaterializeStatus {
205 active_generation: desired_generation,
206 last_completed_rollout_request: mz.requested_reconciliation_id(),
207 last_completed_rollout_environmentd_image_ref: Some(
208 mz.spec.environmentd_image_ref.clone(),
209 ),
210 resource_id: mz.status().resource_id,
211 resources_hash,
212 conditions: vec![Condition {
213 type_: "UpToDate".into(),
214 status: "True".into(),
215 last_transition_time: Time(Timestamp::now()),
216 message: format!(
217 "Successfully applied changes for generation {desired_generation}"
218 ),
219 observed_generation: mz.meta().generation,
220 reason: "Applied".into(),
221 }],
222 },
223 false,
224 )
225 .await?;
226 Ok(None)
227 }
228
229 async fn check_environment_id_conflicts(
230 &self,
231 client: &Client,
232 mz: &Materialize,
233 ) -> Result<(), Error> {
234 if mz.spec.environment_id.is_nil() {
235 return Err(Error::Anyhow(anyhow::anyhow!(
239 "trying to reconcile a materialize resource with no environment id - this is a bug!"
240 )));
241 }
242
243 let mz_api: Api<Materialize> = Api::all(client.clone());
244 let all_mz = mz_api.list(&ListParams::default()).await?;
245 for existing_mz in &all_mz.items {
246 if existing_mz.spec.environment_id == mz.spec.environment_id
247 && existing_mz.metadata.uid != mz.metadata.uid
248 {
249 return Err(Error::Anyhow(anyhow::anyhow!(
250 "Materialize resources {}/{} and {}/{} have the environmentId field set to the same value. This field must be unique across environments.",
251 mz.namespace(),
252 mz.name_unchecked(),
253 existing_mz.namespace(),
254 existing_mz.name_unchecked(),
255 )));
256 }
257 }
258
259 Ok(())
260 }
261}
262
263#[async_trait::async_trait]
264impl k8s_controller::Context for Context {
265 type Resource = Materialize;
266 type Error = Error;
267
268 const FINALIZER_NAME: Option<&'static str> =
269 Some("orchestratord.materialize.cloud/materialize");
270
271 #[instrument(fields(organization_name=mz.name_unchecked()))]
272 async fn apply(
273 &self,
274 client: Client,
275 mz: &Self::Resource,
276 ) -> Result<Option<Action>, Self::Error> {
277 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
278 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &mz.namespace());
279 let console_api: Api<Console> = Api::namespaced(client.clone(), &mz.namespace());
280 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
281
282 let status = mz.status();
283 if mz.status.is_none() {
284 self.update_status(&mz_api, mz, status, true).await?;
285 return Ok(None);
288 }
289
290 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
291 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
292 .data
293 .as_ref()
294 .and_then(|data| data.get("license_key"))
295 {
296 let license_key = validate(
297 str::from_utf8(&license_key.0)
298 .context("invalid utf8")?
299 .trim(),
300 )?;
301 let environment_id = license_key
302 .environment_id
303 .parse()
304 .context("invalid environment id in license key")?;
305 Some(environment_id)
306 } else {
307 if mz.meets_minimum_version(&V161) {
308 return Err(Error::Anyhow(anyhow::anyhow!(
309 "license_key is required when running in kubernetes",
310 )));
311 } else {
312 None
313 }
314 };
315
316 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
317 let mut mz = mz.clone();
318 if mz.spec.request_rollout.is_nil() {
319 mz.spec.request_rollout = Uuid::new_v4();
320 }
321 if mz.spec.environment_id.is_nil() {
322 if let Some(environment_id) = license_key_environment_id {
323 if environment_id.is_nil() {
324 mz.spec.environment_id = Uuid::new_v4();
327 } else {
328 mz.spec.environment_id = environment_id;
329 }
330 } else {
331 if mz.meets_minimum_version(&V161) {
332 return Err(Error::Anyhow(anyhow::anyhow!(
333 "environmentId is not set in materialize resource {}/{} but no license key was given",
334 mz.namespace(),
335 mz.name_unchecked()
336 )));
337 } else {
338 mz.spec.environment_id = Uuid::new_v4();
339 }
340 }
341 }
342 mz_api
343 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
344 .await?;
345 return Ok(None);
349 }
350
351 if let Some(environment_id) = license_key_environment_id {
352 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
355 return Err(Error::Anyhow(anyhow::anyhow!(
356 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
357 mz.namespace(),
358 mz.name_unchecked(),
359 environment_id,
360 )));
361 }
362 }
363
364 self.check_environment_id_conflicts(&client, mz).await?;
365
366 global::Resources::new(&self.config, mz)?
367 .apply(&client, &mz.namespace())
368 .await?;
369
370 let active_resources =
376 generation::Resources::new(&self.config, mz, status.active_generation);
377 let has_current_changes = status.resources_hash != active_resources.generate_hash();
378 let active_generation = status.active_generation;
379 let next_generation = active_generation + 1;
380 let desired_generation = if has_current_changes {
381 next_generation
382 } else {
383 active_generation
384 };
385
386 let resources = generation::Resources::new(&self.config, mz, desired_generation);
389 let resources_hash = resources.generate_hash();
390
391 let mut result = match (
392 mz.is_promoting(),
393 has_current_changes,
394 mz.rollout_requested(),
395 ) {
396 (true, _, _) => {
399 self.promote(
400 &client,
401 mz,
402 resources,
403 active_generation,
404 desired_generation,
405 resources_hash,
406 )
407 .await
408 }
409 (false, true, true) => {
411 let mz = if mz.is_ready_to_promote(&resources_hash) {
423 mz
424 } else {
425 &self
426 .update_status(
427 &mz_api,
428 mz,
429 MaterializeStatus {
430 active_generation,
431 last_completed_rollout_request: status
436 .last_completed_rollout_request,
437 last_completed_rollout_environmentd_image_ref: status
438 .last_completed_rollout_environmentd_image_ref,
439 resource_id: status.resource_id.clone(),
440 resources_hash: String::new(),
441 conditions: vec![Condition {
442 type_: "UpToDate".into(),
443 status: "Unknown".into(),
444 last_transition_time: Time(Timestamp::now()),
445 message: format!(
446 "Applying changes for generation {desired_generation}"
447 ),
448 observed_generation: mz.meta().generation,
449 reason: "Applying".into(),
450 }],
451 },
452 active_generation != desired_generation,
453 )
454 .await?
455 };
456 let status = mz.status();
457
458 if !mz.within_upgrade_window() {
459 let last_completed_rollout_environmentd_image_ref =
460 status.last_completed_rollout_environmentd_image_ref;
461
462 self.update_status(
463 &mz_api,
464 mz,
465 MaterializeStatus {
466 active_generation,
467 last_completed_rollout_request: status.last_completed_rollout_request,
468 last_completed_rollout_environmentd_image_ref:
469 last_completed_rollout_environmentd_image_ref.clone(),
470 resource_id: status.resource_id,
471 resources_hash: status.resources_hash,
472 conditions: vec![Condition {
473 type_: "UpToDate".into(),
474 status: "False".into(),
475 last_transition_time: Time(Timestamp::now()),
476 message: format!(
477 "Refusing to upgrade from {} to {}. \
478 More than one major version from \
479 last successful rollout. If coming \
480 from Self Managed 25.2, upgrade to \
481 materialize/environmentd:v0.147.20 \
482 first.",
483 last_completed_rollout_environmentd_image_ref
484 .expect("should be set if upgrade window check fails"),
485 &mz.spec.environmentd_image_ref,
486 ),
487 observed_generation: mz.meta().generation,
488 reason: "FailedDeploy".into(),
489 }],
490 },
491 active_generation != desired_generation,
492 )
493 .await?;
494 return Ok(None);
495 }
496
497 if mz.spec.rollout_strategy
498 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
499 {
500 resources
504 .teardown_generation(&client, mz, active_generation)
505 .await?;
506 }
507
508 trace!("applying environment resources");
509 match resources
510 .apply(&client, mz.should_force_promote(), &mz.namespace())
511 .await
512 {
513 Ok(Some(action)) => {
514 trace!("new environment is not yet ready");
515 Ok(Some(action))
516 }
517 Ok(None) => {
518 if mz.spec.rollout_strategy == MaterializeRolloutStrategy::ManuallyPromote
519 && !mz.should_force_promote()
520 {
521 trace!(
522 "Ready to promote, but not promoting because the instance is configured with ManuallyPromote rollout strategy."
523 );
524 self.update_status(
525 &mz_api,
526 mz,
527 MaterializeStatus {
528 active_generation,
529 last_completed_rollout_request: status
530 .last_completed_rollout_request,
531 last_completed_rollout_environmentd_image_ref: status
532 .last_completed_rollout_environmentd_image_ref,
533 resource_id: status.resource_id,
534 resources_hash,
535 conditions: vec![Condition {
536 type_: "UpToDate".into(),
537 status: "Unknown".into(),
538 last_transition_time: Time(Timestamp::now()),
539 message: format!(
540 "Ready to promote generation {desired_generation}"
541 ),
542 observed_generation: mz.meta().generation,
543 reason: "ReadyToPromote".into(),
544 }],
545 },
546 active_generation != desired_generation,
547 )
548 .await?;
549 return Ok(None);
550 }
551 self.update_status(
559 &mz_api,
560 mz,
561 MaterializeStatus {
562 active_generation,
563 last_completed_rollout_request: status
568 .last_completed_rollout_request,
569 last_completed_rollout_environmentd_image_ref: status
570 .last_completed_rollout_environmentd_image_ref,
571 resource_id: status.resource_id,
572 resources_hash: resources_hash.clone(),
573 conditions: vec![Condition {
574 type_: "UpToDate".into(),
575 status: "Unknown".into(),
576 last_transition_time: Time(Timestamp::now()),
577 message: format!(
578 "Attempting to promote generation {desired_generation}"
579 ),
580 observed_generation: mz.meta().generation,
581 reason: "Promoting".into(),
582 }],
583 },
584 active_generation != desired_generation,
585 )
586 .await?;
587 self.promote(
588 &client,
589 mz,
590 resources,
591 active_generation,
592 desired_generation,
593 resources_hash,
594 )
595 .await
596 }
597 Err(e) => {
598 self.update_status(
599 &mz_api,
600 mz,
601 MaterializeStatus {
602 active_generation,
603 last_completed_rollout_request: status
608 .last_completed_rollout_request,
609 last_completed_rollout_environmentd_image_ref: status
610 .last_completed_rollout_environmentd_image_ref,
611 resource_id: status.resource_id,
612 resources_hash: status.resources_hash,
613 conditions: vec![Condition {
614 type_: "UpToDate".into(),
615 status: "False".into(),
616 last_transition_time: Time(Timestamp::now()),
617 message: format!(
618 "Failed to apply changes for \
619 generation {desired_generation}: {e}"
620 ),
621 observed_generation: mz.meta().generation,
622 reason: "FailedDeploy".into(),
623 }],
624 },
625 active_generation != desired_generation,
626 )
627 .await?;
628 Err(e)
629 }
630 }
631 }
632 (false, true, false) => {
634 let mut needs_update = mz.conditions_need_update();
635 if mz.update_in_progress() {
636 resources
637 .teardown_generation(&client, mz, next_generation)
638 .await?;
639 needs_update = true;
640 }
641 if needs_update {
642 self.update_status(
643 &mz_api,
644 mz,
645 MaterializeStatus {
646 active_generation,
647 last_completed_rollout_request: mz.requested_reconciliation_id(),
648 last_completed_rollout_environmentd_image_ref: status
649 .last_completed_rollout_environmentd_image_ref,
650 resource_id: status.resource_id.clone(),
651 resources_hash: status.resources_hash,
652 conditions: vec![Condition {
653 type_: "UpToDate".into(),
654 status: "False".into(),
655 last_transition_time: Time(Timestamp::now()),
656 message: format!(
657 "Changes detected, waiting for approval for generation {desired_generation}"
658 ),
659 observed_generation: mz.meta().generation,
660 reason: "WaitingForApproval".into(),
661 }],
662 },
663 active_generation != desired_generation,
664 )
665 .await?;
666 }
667 debug!("changes detected, waiting for approval");
668 Ok(None)
669 }
670 (false, false, _) => {
672 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
677 if mz.update_in_progress() {
678 resources
679 .teardown_generation(&client, mz, next_generation)
680 .await?;
681 needs_update = true;
682 }
683 if needs_update {
684 self.update_status(
685 &mz_api,
686 mz,
687 MaterializeStatus {
688 active_generation,
689 last_completed_rollout_request: mz.requested_reconciliation_id(),
690 last_completed_rollout_environmentd_image_ref: status
691 .last_completed_rollout_environmentd_image_ref,
692 resource_id: status.resource_id.clone(),
693 resources_hash: status.resources_hash,
694 conditions: vec![Condition {
695 type_: "UpToDate".into(),
696 status: "True".into(),
697 last_transition_time: Time(Timestamp::now()),
698 message: format!(
699 "No changes found from generation {active_generation}"
700 ),
701 observed_generation: mz.meta().generation,
702 reason: "Applied".into(),
703 }],
704 },
705 active_generation != desired_generation,
706 )
707 .await?;
708 }
709 debug!("no changes");
710 Ok(None)
711 }
712 }?;
713
714 if let Some(action) = result {
715 return Ok(Some(action));
716 }
717
718 if self.config.create_balancers {
723 let balancer = Balancer {
724 metadata: mz.managed_resource_meta(mz.name_unchecked()),
725 spec: BalancerSpec {
726 balancerd_image_ref: matching_image_from_environmentd_image_ref(
727 &mz.spec.environmentd_image_ref,
728 "balancerd",
729 None,
730 ),
731 resource_requirements: mz.spec.balancerd_resource_requirements.clone(),
732 replicas: Some(mz.balancerd_replicas()),
733 external_certificate_spec: mz.spec.balancerd_external_certificate_spec.clone(),
734 internal_certificate_spec: mz.spec.internal_certificate_spec.clone(),
735 pod_annotations: mz.spec.pod_annotations.clone(),
736 pod_labels: mz.spec.pod_labels.clone(),
737 static_routing: Some(
738 mz_cloud_resources::crd::balancer::v1alpha1::StaticRoutingConfig {
739 environmentd_namespace: mz.namespace(),
740 environmentd_service_name: mz.environmentd_service_name(),
741 },
742 ),
743 frontegg_routing: None,
744 resource_id: Some(status.resource_id.clone()),
745 },
746 status: None,
747 };
748 let balancer = apply_resource(&balancer_api, &balancer).await?;
749 result = wait_for_balancer(&balancer)?;
750 } else {
751 delete_resource(&balancer_api, &mz.name_unchecked()).await?;
752 }
753
754 if let Some(action) = result {
755 return Ok(Some(action));
756 }
757
758 if self.config.create_console {
762 let environmentd_image_tag =
763 parse_image_tag(&mz.spec.environmentd_image_ref).unwrap_or("latest");
764 let console_image_tag = self
765 .config
766 .console_image_tag_map
767 .iter()
768 .find(|kv| kv.key == environmentd_image_tag)
769 .map(|kv| kv.value.clone())
770 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
771 let console = Console {
772 metadata: mz.managed_resource_meta(mz.name_unchecked()),
773 spec: ConsoleSpec {
774 console_image_ref: matching_image_from_environmentd_image_ref(
775 &mz.spec.environmentd_image_ref,
776 "console",
777 Some(&console_image_tag),
778 ),
779 resource_requirements: mz.spec.console_resource_requirements.clone(),
780 replicas: Some(mz.console_replicas()),
781 external_certificate_spec: mz.spec.console_external_certificate_spec.clone(),
782 pod_annotations: mz.spec.pod_annotations.clone(),
783 pod_labels: mz.spec.pod_labels.clone(),
784 balancerd: BalancerdRef {
785 service_name: mz.balancerd_service_name(),
786 namespace: mz.namespace(),
787 scheme: if issuer_ref_defined(
788 &self.config.default_certificate_specs.balancerd_external,
789 &mz.spec.balancerd_external_certificate_spec,
790 ) {
791 HttpConnectionScheme::Https
792 } else {
793 HttpConnectionScheme::Http
794 },
795 },
796 authenticator_kind: mz.spec.authenticator_kind,
797 resource_id: Some(status.resource_id),
798 },
799 status: None,
800 };
801 apply_resource(&console_api, &console).await?;
802 } else {
803 delete_resource(&console_api, &mz.name_unchecked()).await?;
804 }
805
806 Ok(result)
807 }
808
809 #[instrument(fields(organization_name=mz.name_unchecked()))]
810 async fn cleanup(
811 &self,
812 _client: Client,
813 mz: &Self::Resource,
814 ) -> Result<Option<Action>, Self::Error> {
815 self.set_needs_update(mz, false);
816
817 Ok(None)
818 }
819}
820
821fn wait_for_balancer(balancer: &Balancer) -> Result<Option<Action>, Error> {
822 if let Some(conditions) = balancer
823 .status
824 .as_ref()
825 .map(|status| status.conditions.as_slice())
826 {
827 if conditions
828 .iter()
829 .any(|condition| condition.type_ == "Ready" && condition.status == "True")
830 {
831 return Ok(None);
832 }
833 }
834
835 Ok(Some(Action::requeue(Duration::from_secs(1))))
836}