Skip to main content

mz_orchestratord/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use axum::routing::{get, post};
12use axum::{Json, Router};
13use http::StatusCode;
14use kube::core::Status;
15use kube::core::conversion::{ConversionRequest, ConversionResponse, ConversionReview};
16use kube::core::response::reason;
17
18use mz_cloud_resources::crd::materialize::{v1, v1alpha1};
19use tracing::{debug, warn};
20
21pub fn router() -> Router {
22    Router::new()
23        .route("/convert", post(post_convert))
24        .route("/healthz", get(get_health))
25}
26
27#[derive(Clone, Copy)]
28enum SupportedVersion {
29    V1alpha1,
30    V1,
31}
32
33impl TryFrom<&str> for SupportedVersion {
34    type Error = anyhow::Error;
35
36    fn try_from(value: &str) -> Result<Self, Self::Error> {
37        match value {
38            "materialize.cloud/v1alpha1" => Ok(SupportedVersion::V1alpha1),
39            "materialize.cloud/v1" => Ok(SupportedVersion::V1),
40            _ => Err(anyhow!("unexpected version: {}", value)),
41        }
42    }
43}
44
45fn version_label(v: SupportedVersion) -> &'static str {
46    match v {
47        SupportedVersion::V1alpha1 => "v1alpha1",
48        SupportedVersion::V1 => "v1",
49    }
50}
51
52fn convert(
53    desired_version: SupportedVersion,
54    value: serde_json::Value,
55) -> Result<serde_json::Value, anyhow::Error> {
56    let from_version = SupportedVersion::try_from(
57        value
58            .get("apiVersion")
59            .and_then(|version| version.as_str())
60            .ok_or_else(|| anyhow!("missing version"))?,
61    )?;
62    debug!(
63        from = version_label(from_version),
64        to = version_label(desired_version),
65        input_spec = ?value.get("spec"),
66        input_status = ?value.get("status"),
67        "conversion webhook called",
68    );
69    let result = match (from_version, desired_version) {
70        (SupportedVersion::V1alpha1, SupportedVersion::V1alpha1) => Ok(value),
71        (SupportedVersion::V1alpha1, SupportedVersion::V1) => {
72            serde_json::from_value::<v1alpha1::Materialize>(value)
73                .and_then(|mz_v1alpha1| serde_json::to_value(v1::Materialize::from(mz_v1alpha1)))
74                .map_err(|e| e.into())
75        }
76        (SupportedVersion::V1, SupportedVersion::V1alpha1) => {
77            serde_json::from_value::<v1::Materialize>(value)
78                .and_then(|mz_v1| serde_json::to_value(v1alpha1::Materialize::from(mz_v1)))
79                .map_err(|e| e.into())
80        }
81        (SupportedVersion::V1, SupportedVersion::V1) => Ok(value),
82    };
83    match &result {
84        Ok(converted) => {
85            debug!(
86                from = version_label(from_version),
87                to = version_label(desired_version),
88                output_spec = ?converted.get("spec"),
89                output_status = ?converted.get("status"),
90                "conversion webhook succeeded",
91            );
92        }
93        Err(e) => {
94            warn!(
95                from = version_label(from_version),
96                to = version_label(desired_version),
97                error = ?e,
98                "conversion webhook failed",
99            );
100        }
101    }
102    result
103}
104
105async fn post_convert(
106    Json(conversion_review): Json<ConversionReview>,
107) -> (StatusCode, Json<ConversionReview>) {
108    let Ok(request) = ConversionRequest::from_review(conversion_review) else {
109        warn!("missing request");
110        return (
111            StatusCode::UNPROCESSABLE_ENTITY,
112            Json(
113                ConversionResponse::invalid(Status::failure("missing request", reason::INVALID))
114                    .into_review(),
115            ),
116        );
117    };
118
119    let desired_version = match SupportedVersion::try_from(request.desired_api_version.as_str()) {
120        Ok(v) => v,
121        Err(e) => {
122            return (
123                StatusCode::INTERNAL_SERVER_ERROR,
124                Json(
125                    ConversionResponse::for_request(request)
126                        .failure(Status::failure(&e.to_string(), reason::BAD_REQUEST))
127                        .into_review(),
128                ),
129            );
130        }
131    };
132
133    let converted_objects: Result<Vec<serde_json::Value>, anyhow::Error> = request
134        .objects
135        .iter()
136        .cloned()
137        .map(|value| convert(desired_version, value))
138        .collect();
139    match converted_objects {
140        Ok(converted_objects) => (
141            StatusCode::OK,
142            Json(
143                ConversionResponse::for_request(request)
144                    .success(converted_objects)
145                    .into_review(),
146            ),
147        ),
148        Err(e) => {
149            warn!("error when converting: {:?}\n{:?}", &e, request.objects);
150            (
151                StatusCode::INTERNAL_SERVER_ERROR,
152                Json(
153                    ConversionResponse::for_request(request)
154                        .failure(Status::failure(&e.to_string(), reason::UNKNOWN))
155                        .into_review(),
156                ),
157            )
158        }
159    }
160}
161
162async fn get_health() -> StatusCode {
163    StatusCode::OK
164}