Skip to main content

mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::{
13    ByteString,
14    apiextensions_apiserver::pkg::apis::apiextensions::v1::{
15        CustomResourceColumnDefinition, CustomResourceConversion, ServiceReference,
16        WebhookClientConfig, WebhookConversion,
17    },
18};
19use kube::{
20    Api, Client, CustomResourceExt, Resource, ResourceExt,
21    api::{DeleteParams, Patch, PatchParams, PostParams},
22    core::Status,
23    core::response::StatusSummary,
24};
25use serde::{Serialize, de::DeserializeOwned};
26use tracing::info;
27
28use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
29
30const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
31
32pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
33where
34    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
35    <K as Resource>::DynamicType: Default,
36{
37    match api.get(name).await {
38        Ok(k) => Ok(Some(k)),
39        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
40        Err(e) => Err(e.into()),
41    }
42}
43
44pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
45where
46    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
47    <K as Resource>::DynamicType: Default,
48{
49    Ok(api
50        .patch(
51            &resource.name_unchecked(),
52            &PatchParams::apply(FIELD_MANAGER).force(),
53            &Patch::Apply(resource),
54        )
55        .await?)
56}
57
58pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
59where
60    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
61    <K as Resource>::DynamicType: Default,
62{
63    if resource.meta().resource_version.is_none() {
64        return Err(kube::Error::Api(
65            Box::new(Status {
66                status: Some(StatusSummary::Failure),
67                message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
68                reason: "BadRequest".to_string(),
69                code: 400,
70                metadata: None,
71                details: None,
72            }),
73        )
74        .into());
75    }
76    Ok(api
77        .replace(&resource.name_unchecked(), &PostParams::default(), resource)
78        .await?)
79}
80
81pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
82where
83    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
84    <K as Resource>::DynamicType: Default,
85{
86    match kube::runtime::wait::delete::delete_and_finalize(
87        api.clone(),
88        name,
89        &DeleteParams::foreground(),
90    )
91    .await
92    {
93        Ok(_) => Ok(()),
94        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
95            // the resource already doesn't exist
96            Ok(())
97        }
98        Err(e) => Err(e.into()),
99    }
100}
101
102/// Configuration for the conversion webhook that serves the v1 version of the
103/// Materialize CRD. When present, the v1 version is registered alongside
104/// v1alpha1 with webhook conversion between them; when absent, only v1alpha1
105/// is registered.
106#[derive(Debug, Clone)]
107pub struct ConversionWebhookConfig {
108    pub service_name: String,
109    pub service_namespace: String,
110    pub service_port: u16,
111    pub ca_cert_path: String,
112}
113
114pub async fn register_crds(
115    client: Client,
116    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
117    conversion_webhook: Option<ConversionWebhookConfig>,
118) -> Result<(), anyhow::Error> {
119    let (mut mz_crds, mz_conversion) = match conversion_webhook {
120        Some(config) => {
121            let ca_bytes = tokio::fs::read(config.ca_cert_path).await?;
122            let conversion = CustomResourceConversion {
123                strategy: "Webhook".to_owned(),
124                webhook: Some(WebhookConversion {
125                    client_config: Some(WebhookClientConfig {
126                        ca_bundle: Some(ByteString(ca_bytes)),
127                        service: Some(ServiceReference {
128                            name: config.service_name,
129                            namespace: config.service_namespace,
130                            path: Some("/convert".to_owned()),
131                            port: Some(config.service_port.into()),
132                        }),
133                        url: None,
134                    }),
135                    conversion_review_versions: vec!["v1".to_owned()],
136                }),
137            };
138            (
139                vec![
140                    crd::materialize::v1::Materialize::crd(),
141                    crd::materialize::v1alpha1::Materialize::crd(),
142                ],
143                Some(conversion),
144            )
145        }
146        None => (vec![crd::materialize::v1alpha1::Materialize::crd()], None),
147    };
148    let default_columns = mz_crds[0].spec.versions[0]
149        .additional_printer_columns
150        .take()
151        .expect("should contain ImageRef and UpToDate columns");
152    mz_crds[0].spec.versions[0].additional_printer_columns = Some(
153        additional_crd_columns
154            .into_iter()
155            .chain(default_columns)
156            .collect(),
157    );
158    tokio::time::timeout(
159        Duration::from_secs(120),
160        register_versioned_crds(
161            client.clone(),
162            vec![
163                VersionedCrd {
164                    crds: mz_crds,
165                    stored_version: String::from("v1alpha1"),
166                    conversion: mz_conversion,
167                },
168                VersionedCrd {
169                    crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
170                    stored_version: String::from("v1alpha1"),
171                    conversion: None,
172                },
173                VersionedCrd {
174                    crds: vec![crd::console::v1alpha1::Console::crd()],
175                    stored_version: String::from("v1alpha1"),
176                    conversion: None,
177                },
178                VersionedCrd {
179                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
180                    stored_version: String::from("v1"),
181                    conversion: None,
182                },
183            ],
184            FIELD_MANAGER,
185        ),
186    )
187    .await??;
188
189    info!("Done rewriting CRDs");
190
191    Ok(())
192}