Skip to main content

mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use apiextensions::v1::CustomResourceColumnDefinition;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions;
14use kube::{
15    Api, Client, CustomResourceExt, Resource, ResourceExt,
16    api::{DeleteParams, Patch, PatchParams, PostParams},
17    core::Status,
18    core::response::StatusSummary,
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::info;
22
23use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
24
25const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
26
27pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
28where
29    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
30    <K as Resource>::DynamicType: Default,
31{
32    match api.get(name).await {
33        Ok(k) => Ok(Some(k)),
34        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
35        Err(e) => Err(e.into()),
36    }
37}
38
39pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
40where
41    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
42    <K as Resource>::DynamicType: Default,
43{
44    Ok(api
45        .patch(
46            &resource.name_unchecked(),
47            &PatchParams::apply(FIELD_MANAGER).force(),
48            &Patch::Apply(resource),
49        )
50        .await?)
51}
52
53pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
54where
55    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
56    <K as Resource>::DynamicType: Default,
57{
58    if resource.meta().resource_version.is_none() {
59        return Err(kube::Error::Api(
60            Box::new(Status {
61                status: Some(StatusSummary::Failure),
62                message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
63                reason: "BadRequest".to_string(),
64                code: 400,
65                metadata: None,
66                details: None,
67            }),
68        )
69        .into());
70    }
71    Ok(api
72        .replace(&resource.name_unchecked(), &PostParams::default(), resource)
73        .await?)
74}
75
76pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
77where
78    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
79    <K as Resource>::DynamicType: Default,
80{
81    match kube::runtime::wait::delete::delete_and_finalize(
82        api.clone(),
83        name,
84        &DeleteParams::foreground(),
85    )
86    .await
87    {
88        Ok(_) => Ok(()),
89        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
90            // the resource already doesn't exist
91            Ok(())
92        }
93        Err(e) => Err(e.into()),
94    }
95}
96
97pub async fn register_crds(
98    client: Client,
99    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
100) -> Result<(), anyhow::Error> {
101    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
102    let default_columns = mz_crd.spec.versions[0]
103        .additional_printer_columns
104        .take()
105        .expect("should contain ImageRef and UpToDate columns");
106    mz_crd.spec.versions[0].additional_printer_columns = Some(
107        additional_crd_columns
108            .into_iter()
109            .chain(default_columns)
110            .collect(),
111    );
112    tokio::time::timeout(
113        Duration::from_secs(120),
114        register_versioned_crds(
115            client.clone(),
116            vec![
117                VersionedCrd {
118                    crds: vec![mz_crd],
119                    stored_version: String::from("v1alpha1"),
120                },
121                VersionedCrd {
122                    crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
123                    stored_version: String::from("v1alpha1"),
124                },
125                VersionedCrd {
126                    crds: vec![crd::console::v1alpha1::Console::crd()],
127                    stored_version: String::from("v1alpha1"),
128                },
129                VersionedCrd {
130                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
131                    stored_version: String::from("v1"),
132                },
133            ],
134            FIELD_MANAGER,
135        ),
136    )
137    .await??;
138
139    info!("Done rewriting CRDs");
140
141    Ok(())
142}