Skip to main content

mz_orchestratord/
k8s.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{future::ready, time::Duration};
11
12use futures::StreamExt;
13use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition;
14use kube::{
15    Api, Client, CustomResourceExt, Resource, ResourceExt,
16    api::{DeleteParams, Patch, PatchParams, PostParams},
17    core::Status,
18    core::response::StatusSummary,
19    runtime::{reflector, watcher},
20};
21use serde::{Serialize, de::DeserializeOwned};
22use tracing::{info, warn};
23
24use mz_cloud_resources::crd::{self, VersionedCrd, register_versioned_crds};
25
26const FIELD_MANAGER: &str = "orchestratord.materialize.cloud";
27
28pub async fn get_resource<K>(api: &Api<K>, name: &str) -> Result<Option<K>, anyhow::Error>
29where
30    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
31    <K as Resource>::DynamicType: Default,
32{
33    match api.get(name).await {
34        Ok(k) => Ok(Some(k)),
35        Err(kube::Error::Api(e)) if e.code == 404 => Ok(None),
36        Err(e) => Err(e.into()),
37    }
38}
39
40pub async fn apply_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
41where
42    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
43    <K as Resource>::DynamicType: Default,
44{
45    Ok(api
46        .patch(
47            &resource.name_unchecked(),
48            &PatchParams::apply(FIELD_MANAGER).force(),
49            &Patch::Apply(resource),
50        )
51        .await?)
52}
53
54pub async fn replace_resource<K>(api: &Api<K>, resource: &K) -> Result<K, anyhow::Error>
55where
56    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
57    <K as Resource>::DynamicType: Default,
58{
59    if resource.meta().resource_version.is_none() {
60        return Err(kube::Error::Api(
61            Box::new(Status {
62                status: Some(StatusSummary::Failure),
63                message: "Must use apply_resource instead of replace_resource to apply fully created resources.".to_string(),
64                reason: "BadRequest".to_string(),
65                code: 400,
66                metadata: None,
67                details: None,
68            }),
69        )
70        .into());
71    }
72    Ok(api
73        .replace(&resource.name_unchecked(), &PostParams::default(), resource)
74        .await?)
75}
76
77pub async fn delete_resource<K>(api: &Api<K>, name: &str) -> Result<(), anyhow::Error>
78where
79    K: Resource + Clone + Send + DeserializeOwned + Serialize + std::fmt::Debug + 'static,
80    <K as Resource>::DynamicType: Default,
81{
82    match kube::runtime::wait::delete::delete_and_finalize(
83        api.clone(),
84        name,
85        &DeleteParams::foreground(),
86    )
87    .await
88    {
89        Ok(_) => Ok(()),
90        Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e))) if e.code == 404 => {
91            // the resource already doesn't exist
92            Ok(())
93        }
94        Err(e) => Err(e.into()),
95    }
96}
97
98pub async fn register_crds(
99    client: Client,
100    additional_crd_columns: Vec<CustomResourceColumnDefinition>,
101) -> Result<(), anyhow::Error> {
102    let mut mz_crd = crd::materialize::v1alpha1::Materialize::crd();
103    let default_columns = mz_crd.spec.versions[0]
104        .additional_printer_columns
105        .take()
106        .expect("should contain ImageRef and UpToDate columns");
107    mz_crd.spec.versions[0].additional_printer_columns = Some(
108        additional_crd_columns
109            .into_iter()
110            .chain(default_columns)
111            .collect(),
112    );
113    tokio::time::timeout(
114        Duration::from_secs(120),
115        register_versioned_crds(
116            client.clone(),
117            vec![
118                VersionedCrd {
119                    crds: vec![mz_crd],
120                    stored_version: String::from("v1alpha1"),
121                },
122                VersionedCrd {
123                    crds: vec![crd::balancer::v1alpha1::Balancer::crd()],
124                    stored_version: String::from("v1alpha1"),
125                },
126                VersionedCrd {
127                    crds: vec![crd::console::v1alpha1::Console::crd()],
128                    stored_version: String::from("v1alpha1"),
129                },
130                VersionedCrd {
131                    crds: vec![crd::vpc_endpoint::v1::VpcEndpoint::crd()],
132                    stored_version: String::from("v1"),
133                },
134            ],
135            FIELD_MANAGER,
136        ),
137    )
138    .await??;
139
140    info!("Done rewriting CRDs");
141
142    Ok(())
143}
144
145pub async fn make_reflector<K>(client: Client) -> reflector::Store<K>
146where
147    K: kube::Resource<DynamicType = ()>
148        + Clone
149        + Send
150        + Sync
151        + DeserializeOwned
152        + Serialize
153        + std::fmt::Debug
154        + 'static,
155{
156    let api = kube::Api::all(client);
157    let (store, writer) = reflector::store();
158    let reflector =
159        reflector::reflector(writer, watcher(api, watcher::Config::default().timeout(29)));
160    mz_ore::task::spawn(
161        || format!("{} reflector", K::kind(&Default::default())),
162        async {
163            reflector
164                .for_each(|res| {
165                    if let Err(e) = res {
166                        warn!("error in {} reflector: {}", K::kind(&Default::default()), e);
167                    }
168                    ready(())
169                })
170                .await
171        },
172    );
173    // the only way this can return an error is if we drop the writer,
174    // which we do not ever do, so unwrap is fine
175    store.wait_until_ready().await.expect("writer dropped");
176    store
177}