mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15    Api, Client, CustomResourceExt, Resource, ResourceExt,
16    api::{DeleteParams, Patch, PatchParams},
17    runtime::{reflector, watcher},
18};
19use serde::{Serialize, de::DeserializeOwned};
20use tracing::{info, warn};
21
22use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
23
24const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
25
26pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
27where
28    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
29    <K as Resource>::DynamicType: Default,
30{
31    match api.get(name).await {
32        Ok(k) => Ok(Some(k)),
33        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
34        Err(e) => Err(e.into()),
35    }
36}
37
38pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<(), anyhow::Error>
39where
40    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
41    <K as Resource>::DynamicType: Default,
42{
43    api.patch(
44        &resource.name_unchecked(),
45        &PatchParams::apply(FIELD_MANAGER).force(),
46        &Patch::Apply(resource),
47    )
48    .await?;
49    Ok(())
50}
51
52pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
53where
54    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
55    <K as Resource>::DynamicType: Default,
56{
57    match kube::runtime::wait::delete::delete_and_finalize(
58        api.clone(),
59        name,
60        &DeleteParams::foreground(),
61    )
62    .await
63    {
64        Ok(_) => Ok(()),
65        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
66            // the resource already doesn't exist
67            Ok(())
68        }
69        Err(e) => Err(e.into()),
70    }
71}
72
73pub async fn register_crds(
74    client: Client,
75    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
76) -> Result<(), anyhow::Error> {
77    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
78    let default_columns = mz_crd.spec.versions[0]
79        .additional_printer_columns
80        .take()
81        .expect("should contain ImageRef and UpToDate columns");
82    mz_crd.spec.versions[0].additional_printer_columns = Some(
83        additional_crd_columns
84            .into_iter()
85            .chain(default_columns)
86            .collect(),
87    );
88    tokio::time::timeout(
89        Duration::from_secs(120),
90        register_versioned_crds(
91            client.clone(),
92            vec![
93                VersionedCrd {
94                    crds: vec![mz_crd],
95                    stored_version: String::from("v1alpha1"),
96                },
97                VersionedCrd {
98                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
99                    stored_version: String::from("v1"),
100                },
101            ],
102            FIELD_MANAGER,
103        ),
104    )
105    .await??;
106
107    info!("Done rewriting CRDs");
108
109    Ok(())
110}
111
112pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
113where
114    K: kube::Resource<DynamicType = ()>
115        + Clone
116        + Send
117        + Sync
118        + DeserializeOwned
119        + Serialize
120        + std::fmt::Debug
121        + 'static,
122{
123    let api = kube::Api::all(client);
124    let (store, writer) = reflector::store();
125    let reflector =
126        reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
127    mz_ore::task::spawn(
128        || format!("{} reflector", K::kind(&Default::default())),
129        async {
130            reflector
131                .for_each(|res| {
132                    if let Err(e) = res {
133                        warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
134                    }
135                    ready(())
136                })
137                .await
138        },
139    );
140    // the only way this can return an error is if we drop the writer,
141    // which we do not ever do, so unwrap is fine
142    store.wait_until_ready().await.expect("writer dropped");
143    store
144}