mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use kube::{
13    Api, Client, CustomResourceExt, Resource, ResourceExt,
14    api::{DeleteParams, Patch, PatchParams},
15};
16use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
17use serde::{Serialize, de::DeserializeOwned};
18use tracing::info;
19
20const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
21
22pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
23where
24    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
25    <K as Resource>::DynamicType: Default,
26{
27    match api.get(name).await {
28        Ok(k) => Ok(Some(k)),
29        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
30        Err(e) => Err(e.into()),
31    }
32}
33
34pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
35where
36    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
37    <K as Resource>::DynamicType: Default,
38{
39    api.patch(
40        &resource.name_unchecked(),
41        &PatchParams::apply(FIELD_MANAGER).force(),
42        &Patch::Apply(resource),
43    )
44    .await?;
45    Ok(())
46}
47
48pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
49where
50    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
51    <K as Resource>::DynamicType: Default,
52{
53    match kube::runtime::wait::delete::delete_and_finalize(
54        api.clone(),
55        name,
56        &DeleteParams::foreground(),
57    )
58    .await
59    {
60        Ok(_) => Ok(()),
61        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
62            // the resource already doesn't exist
63            Ok(())
64        }
65        Err(e) => Err(e.into()),
66    }
67}
68
69pub async fn register_crds(client: Client) -> Result<(), anyhow::Error> {
70    tokio::time::timeout(
71        Duration::from_secs(120),
72        register_versioned_crds(
73            client.clone(),
74            vec![
75                VersionedCrd {
76                    crds: vec![crd::materialize::v1alpha1::Materialize::crd()],
77                    stored_version: String::from("v1alpha1"),
78                },
79                VersionedCrd {
80                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
81                    stored_version: String::from("v1"),
82                },
83            ],
84            FIELD_MANAGER,
85        ),
86    )
87    .await??;
88
89    info!("Done rewriting CRDs");
90
91    Ok(())
92}