Skip to main content

mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{future::ready, time::Duration};
11
12use apiextensions::v1::CustomResourceColumnDefinition;
13use futures::StreamExt;
14use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions;
15use kube::{
16    Api, Client, CustomResourceExt, Resource, ResourceExt,
17    api::{DeleteParams, Patch, PatchParams, PostParams},
18    core::Status,
19    core::response::StatusSummary,
20    runtime::{reflector, watcher},
21};
22use serde::{Serialize, de::DeserializeOwned};
23use tracing::{info, warn};
24
25use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
26
27const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
28
29pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
30where
31    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
32    <K as Resource>::DynamicType: Default,
33{
34    match api.get(name).await {
35        Ok(k) => Ok(Some(k)),
36        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
37        Err(e) => Err(e.into()),
38    }
39}
40
41pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
42where
43    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
44    <K as Resource>::DynamicType: Default,
45{
46    Ok(api
47        .patch(
48            &resource.name_unchecked(),
49            &PatchParams::apply(FIELD_MANAGER).force(),
50            &Patch::Apply(resource),
51        )
52        .await?)
53}
54
55pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
56where
57    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
58    <K as Resource>::DynamicType: Default,
59{
60    if resource.meta().resource_version.is_none() {
61        return Err(kube::Error::Api(
62            Box::new(Status {
63                status: Some(StatusSummary::Failure),
64                message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
65                reason: "BadRequest".to_string(),
66                code: 400,
67                metadata: None,
68                details: None,
69            }),
70        )
71        .into());
72    }
73    Ok(api
74        .replace(&resource.name_unchecked(), &PostParams::default(), resource)
75        .await?)
76}
77
78pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
79where
80    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
81    <K as Resource>::DynamicType: Default,
82{
83    match kube::runtime::wait::delete::delete_and_finalize(
84        api.clone(),
85        name,
86        &DeleteParams::foreground(),
87    )
88    .await
89    {
90        Ok(_) => Ok(()),
91        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
92            // the resource already doesn't exist
93            Ok(())
94        }
95        Err(e) => Err(e.into()),
96    }
97}
98
99pub async fn register_crds(
100    client: Client,
101    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
102) -> Result<(), anyhow::Error> {
103    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
104    let default_columns = mz_crd.spec.versions[0]
105        .additional_printer_columns
106        .take()
107        .expect("should contain ImageRef and UpToDate columns");
108    mz_crd.spec.versions[0].additional_printer_columns = Some(
109        additional_crd_columns
110            .into_iter()
111            .chain(default_columns)
112            .collect(),
113    );
114    tokio::time::timeout(
115        Duration::from_secs(120),
116        register_versioned_crds(
117            client.clone(),
118            vec![
119                VersionedCrd {
120                    crds: vec![mz_crd],
121                    stored_version: String::from("v1alpha1"),
122                },
123                VersionedCrd {
124                    crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
125                    stored_version: String::from("v1alpha1"),
126                },
127                VersionedCrd {
128                    crds: vec![crd::console::v1alpha1::Console::crd()],
129                    stored_version: String::from("v1alpha1"),
130                },
131                VersionedCrd {
132                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
133                    stored_version: String::from("v1"),
134                },
135            ],
136            FIELD_MANAGER,
137        ),
138    )
139    .await??;
140
141    info!("Done rewriting CRDs");
142
143    Ok(())
144}
145
146pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
147where
148    K: kube::Resource<DynamicType = ()>
149        + Clone
150        + Send
151        + Sync
152        + DeserializeOwned
153        + Serialize
154        + std::fmt::Debug
155        + 'static,
156{
157    let api = kube::Api::all(client);
158    let (store, writer) = reflector::store();
159    let reflector =
160        reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
161    mz_ore::task::spawn(
162        || format!("{} reflector", K::kind(&Default::default())),
163        async {
164            reflector
165                .for_each(|res| {
166                    if let Err(e) = res {
167                        warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
168                    }
169                    ready(())
170                })
171                .await
172        },
173    );
174    // the only way this can return an error is if we drop the writer,
175    // which we do not ever do, so unwrap is fine
176    store.wait_until_ready().await.expect("writer dropped");
177    store
178}