mz_orchestratord/
webhook.rs1use anyhow::anyhow;
11use axum::routing::{get, post};
12use axum::{Json, Router};
13use http::StatusCode;
14use kube::core::Status;
15use kube::core::conversion::{ConversionRequest, ConversionResponse, ConversionReview};
16use kube::core::response::reason;
17
18use mz_cloud_resources::crd::materialize::{v1, v1alpha1};
19use tracing::{debug, warn};
20
21pub fn router() -> Router {
22 Router::new()
23 .route("/convert", post(post_convert))
24 .route("/healthz", get(get_health))
25}
26
27#[derive(Clone, Copy)]
28enum SupportedVersion {
29 V1alpha1,
30 V1,
31}
32
33impl TryFrom<&str> for SupportedVersion {
34 type Error = anyhow::Error;
35
36 fn try_from(value: &str) -> Result<Self, Self::Error> {
37 match value {
38 "materialize.cloud/v1alpha1" => Ok(SupportedVersion::V1alpha1),
39 "materialize.cloud/v1" => Ok(SupportedVersion::V1),
40 _ => Err(anyhow!("unexpected version: {}", value)),
41 }
42 }
43}
44
45fn version_label(v: SupportedVersion) -> &'static str {
46 match v {
47 SupportedVersion::V1alpha1 => "v1alpha1",
48 SupportedVersion::V1 => "v1",
49 }
50}
51
52fn convert(
53 desired_version: SupportedVersion,
54 value: serde_json::Value,
55) -> Result<serde_json::Value, anyhow::Error> {
56 let from_version = SupportedVersion::try_from(
57 value
58 .get("apiVersion")
59 .and_then(|version| version.as_str())
60 .ok_or_else(|| anyhow!("missing version"))?,
61 )?;
62 debug!(
63 from = version_label(from_version),
64 to = version_label(desired_version),
65 input_spec = ?value.get("spec"),
66 input_status = ?value.get("status"),
67 "conversion webhook called",
68 );
69 let result = match (from_version, desired_version) {
70 (SupportedVersion::V1alpha1, SupportedVersion::V1alpha1) => Ok(value),
71 (SupportedVersion::V1alpha1, SupportedVersion::V1) => {
72 serde_json::from_value::<v1alpha1::Materialize>(value)
73 .and_then(|mz_v1alpha1| serde_json::to_value(v1::Materialize::from(mz_v1alpha1)))
74 .map_err(|e| e.into())
75 }
76 (SupportedVersion::V1, SupportedVersion::V1alpha1) => {
77 serde_json::from_value::<v1::Materialize>(value)
78 .and_then(|mz_v1| serde_json::to_value(v1alpha1::Materialize::from(mz_v1)))
79 .map_err(|e| e.into())
80 }
81 (SupportedVersion::V1, SupportedVersion::V1) => Ok(value),
82 };
83 match &result {
84 Ok(converted) => {
85 debug!(
86 from = version_label(from_version),
87 to = version_label(desired_version),
88 output_spec = ?converted.get("spec"),
89 output_status = ?converted.get("status"),
90 "conversion webhook succeeded",
91 );
92 }
93 Err(e) => {
94 warn!(
95 from = version_label(from_version),
96 to = version_label(desired_version),
97 error = ?e,
98 "conversion webhook failed",
99 );
100 }
101 }
102 result
103}
104
105async fn post_convert(
106 Json(conversion_review): Json<ConversionReview>,
107) -> (StatusCode, Json<ConversionReview>) {
108 let Ok(request) = ConversionRequest::from_review(conversion_review) else {
109 warn!("missing request");
110 return (
111 StatusCode::UNPROCESSABLE_ENTITY,
112 Json(
113 ConversionResponse::invalid(Status::failure("missing request", reason::INVALID))
114 .into_review(),
115 ),
116 );
117 };
118
119 let desired_version = match SupportedVersion::try_from(request.desired_api_version.as_str()) {
120 Ok(v) => v,
121 Err(e) => {
122 return (
123 StatusCode::INTERNAL_SERVER_ERROR,
124 Json(
125 ConversionResponse::for_request(request)
126 .failure(Status::failure(&e.to_string(), reason::BAD_REQUEST))
127 .into_review(),
128 ),
129 );
130 }
131 };
132
133 let converted_objects: Result<Vec<serde_json::Value>, anyhow::Error> = request
134 .objects
135 .iter()
136 .cloned()
137 .map(|value| convert(desired_version, value))
138 .collect();
139 match converted_objects {
140 Ok(converted_objects) => (
141 StatusCode::OK,
142 Json(
143 ConversionResponse::for_request(request)
144 .success(converted_objects)
145 .into_review(),
146 ),
147 ),
148 Err(e) => {
149 warn!("error when converting: {:?}\n{:?}", &e, request.objects);
150 (
151 StatusCode::INTERNAL_SERVER_ERROR,
152 Json(
153 ConversionResponse::for_request(request)
154 .failure(Status::failure(&e.to_string(), reason::UNKNOWN))
155 .into_review(),
156 ),
157 )
158 }
159 }
160}
161
162async fn get_health() -> StatusCode {
163 StatusCode::OK
164}