mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15    Api, Client, CustomResourceExt, Resource, ResourceExt,
16    api::{DeleteParams, Patch, PatchParams, PostParams},
17    core::ErrorResponse,
18    runtime::{reflector, watcher},
19};
20use serde::{Serialize, de::DeserializeOwned};
21use tracing::{info, warn};
22
23use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
24
25const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
26
27pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
28where
29    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
30    <K as Resource>::DynamicType: Default,
31{
32    match api.get(name).await {
33        Ok(k) => Ok(Some(k)),
34        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
35        Err(e) => Err(e.into()),
36    }
37}
38
39pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
40where
41    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
42    <K as Resource>::DynamicType: Default,
43{
44    Ok(api
45        .patch(
46            &resource.name_unchecked(),
47            &PatchParams::apply(FIELD_MANAGER).force(),
48            &Patch::Apply(resource),
49        )
50        .await?)
51}
52
53pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
54where
55    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
56    <K as Resource>::DynamicType: Default,
57{
58    if resource.meta().resource_version.is_none() {
59        return Err(kube::Error::Api(ErrorResponse {
60            status: "Failure".to_string(),
61            message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
62            reason: "BadRequest".to_string(),
63            code: 400,
64        }).into());
65    }
66    Ok(api
67        .replace(&resource.name_unchecked(), &PostParams::default(), resource)
68        .await?)
69}
70
71pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
72where
73    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
74    <K as Resource>::DynamicType: Default,
75{
76    match kube::runtime::wait::delete::delete_and_finalize(
77        api.clone(),
78        name,
79        &DeleteParams::foreground(),
80    )
81    .await
82    {
83        Ok(_) => Ok(()),
84        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
85            // the resource already doesn't exist
86            Ok(())
87        }
88        Err(e) => Err(e.into()),
89    }
90}
91
92pub async fn register_crds(
93    client: Client,
94    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
95) -> Result<(), anyhow::Error> {
96    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
97    let default_columns = mz_crd.spec.versions[0]
98        .additional_printer_columns
99        .take()
100        .expect("should contain ImageRef and UpToDate columns");
101    mz_crd.spec.versions[0].additional_printer_columns = Some(
102        additional_crd_columns
103            .into_iter()
104            .chain(default_columns)
105            .collect(),
106    );
107    tokio::time::timeout(
108        Duration::from_secs(120),
109        register_versioned_crds(
110            client.clone(),
111            vec![
112                VersionedCrd {
113                    crds: vec![mz_crd],
114                    stored_version: String::from("v1alpha1"),
115                },
116                VersionedCrd {
117                    crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
118                    stored_version: String::from("v1alpha1"),
119                },
120                VersionedCrd {
121                    crds: vec![crd::console::v1alpha1::Console::crd()],
122                    stored_version: String::from("v1alpha1"),
123                },
124                VersionedCrd {
125                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
126                    stored_version: String::from("v1"),
127                },
128            ],
129            FIELD_MANAGER,
130        ),
131    )
132    .await??;
133
134    info!("Done rewriting CRDs");
135
136    Ok(())
137}
138
139pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
140where
141    K: kube::Resource<DynamicType = ()>
142        + Clone
143        + Send
144        + Sync
145        + DeserializeOwned
146        + Serialize
147        + std::fmt::Debug
148        + 'static,
149{
150    let api = kube::Api::all(client);
151    let (store, writer) = reflector::store();
152    let reflector =
153        reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
154    mz_ore::task::spawn(
155        || format!("{} reflector", K::kind(&Default::default())),
156        async {
157            reflector
158                .for_each(|res| {
159                    if let Err(e) = res {
160                        warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
161                    }
162                    ready(())
163                })
164                .await
165        },
166    );
167    // the only way this can return an error is if we drop the writer,
168    // which we do not ever do, so unwrap is fine
169    store.wait_until_ready().await.expect("writer dropped");
170    store
171}