mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
13use kube::{
14    Api, Client, CustomResourceExt, Resource, ResourceExt,
15    api::{DeleteParams, Patch, PatchParams},
16};
17use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
18use serde::{Serialize, de::DeserializeOwned};
19use tracing::info;
20
21const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
22
23pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
24where
25    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
26    <K as Resource>::DynamicType: Default,
27{
28    match api.get(name).await {
29        Ok(k) => Ok(Some(k)),
30        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
31        Err(e) => Err(e.into()),
32    }
33}
34
35pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
36where
37    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
38    <K as Resource>::DynamicType: Default,
39{
40    api.patch(
41        &resource.name_unchecked(),
42        &PatchParams::apply(FIELD_MANAGER).force(),
43        &Patch::Apply(resource),
44    )
45    .await?;
46    Ok(())
47}
48
49pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
50where
51    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
52    <K as Resource>::DynamicType: Default,
53{
54    match kube::runtime::wait::delete::delete_and_finalize(
55        api.clone(),
56        name,
57        &DeleteParams::foreground(),
58    )
59    .await
60    {
61        Ok(_) => Ok(()),
62        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
63            // the resource already doesn't exist
64            Ok(())
65        }
66        Err(e) => Err(e.into()),
67    }
68}
69
70pub async fn register_crds(
71    client: Client,
72    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
73) -> Result<(), anyhow::Error> {
74    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
75    let default_columns = mz_crd.spec.versions[0]
76        .additional_printer_columns
77        .take()
78        .expect("should contain ImageRef and UpToDate columns");
79    mz_crd.spec.versions[0].additional_printer_columns = Some(
80        additional_crd_columns
81            .into_iter()
82            .chain(default_columns)
83            .collect(),
84    );
85    tokio::time::timeout(
86        Duration::from_secs(120),
87        register_versioned_crds(
88            client.clone(),
89            vec![
90                VersionedCrd {
91                    crds: vec![mz_crd],
92                    stored_version: String::from("v1alpha1"),
93                },
94                VersionedCrd {
95                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
96                    stored_version: String::from("v1"),
97                },
98            ],
99            FIELD_MANAGER,
100        ),
101    )
102    .await??;
103
104    info!("Done rewriting CRDs");
105
106    Ok(())
107}