mz_aws_glue_schema_registry/
client.rs1use aws_sdk_glue::error::{DisplayErrorContext, SdkError};
13use aws_sdk_glue::operation::get_registry::GetRegistryError as SdkGetRegistryError;
14use aws_sdk_glue::operation::get_schema_version::{
15 GetSchemaVersionError as SdkGetSchemaVersionError, GetSchemaVersionOutput,
16};
17use aws_sdk_glue::types::{
18 DataFormat as SdkDataFormat, RegistryId, RegistryStatus as SdkRegistryStatus, SchemaId,
19 SchemaVersionNumber, SchemaVersionStatus as SdkSchemaVersionStatus,
20};
21use aws_types::SdkConfig;
22use thiserror::Error;
23use uuid::Uuid;
24
25#[derive(Clone, Debug)]
30pub struct Client {
31 inner: aws_sdk_glue::Client,
32}
33
34impl Client {
35 pub(crate) fn from_sdk_config(sdk_config: SdkConfig) -> Self {
36 Client {
37 inner: aws_sdk_glue::Client::new(&sdk_config),
38 }
39 }
40
41 pub async fn get_registry(&self, name: &str) -> Result<Registry, GetRegistryError> {
47 let id = RegistryId::builder().registry_name(name).build();
48 let output = self
49 .inner
50 .get_registry()
51 .registry_id(id)
52 .send()
53 .await
54 .map_err(classify_get_registry_error)?;
55 Ok(Registry {
56 name: output.registry_name,
57 arn: output.registry_arn,
58 description: output.description,
59 lifecycle_status: output.status.map(RegistryLifecycleStatus::from_sdk),
60 })
61 }
62
63 pub async fn get_schema_version_by_id(
82 &self,
83 id: Uuid,
84 ) -> Result<SchemaVersion, GetSchemaVersionError> {
85 let output = self
86 .inner
87 .get_schema_version()
88 .schema_version_id(id.to_string())
89 .send()
90 .await
91 .map_err(classify_get_schema_version_error)?;
92 Ok(SchemaVersion::from_sdk(output))
93 }
94
95 pub async fn get_schema_version_latest_by_name(
104 &self,
105 registry_name: &str,
106 schema_name: &str,
107 ) -> Result<SchemaVersion, GetSchemaVersionError> {
108 let schema_id = SchemaId::builder()
109 .registry_name(registry_name)
110 .schema_name(schema_name)
111 .build();
112 let version_number = SchemaVersionNumber::builder().latest_version(true).build();
113 let output = self
114 .inner
115 .get_schema_version()
116 .schema_id(schema_id)
117 .schema_version_number(version_number)
118 .send()
119 .await
120 .map_err(classify_get_schema_version_error)?;
121 Ok(SchemaVersion::from_sdk(output))
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct Registry {
131 pub name: Option<String>,
132 pub arn: Option<String>,
133 pub description: Option<String>,
134 pub lifecycle_status: Option<RegistryLifecycleStatus>,
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum RegistryLifecycleStatus {
145 Available,
146 Deleting,
147 Unknown(String),
149}
150
151impl RegistryLifecycleStatus {
152 fn from_sdk(status: SdkRegistryStatus) -> Self {
153 match &status {
154 SdkRegistryStatus::Available => RegistryLifecycleStatus::Available,
155 SdkRegistryStatus::Deleting => RegistryLifecycleStatus::Deleting,
156 _ => RegistryLifecycleStatus::Unknown(status.as_str().to_string()),
157 }
158 }
159}
160
161#[derive(Debug, Error)]
163pub enum GetRegistryError {
164 #[error("registry not found")]
167 NotFound,
168 #[error("AWS Glue error: {0}")]
171 Other(String),
172}
173
174fn classify_get_registry_error(err: SdkError<SdkGetRegistryError>) -> GetRegistryError {
175 if let SdkError::ServiceError(service_err) = &err
176 && matches!(
177 service_err.err(),
178 SdkGetRegistryError::EntityNotFoundException(_)
179 )
180 {
181 return GetRegistryError::NotFound;
182 }
183 GetRegistryError::Other(DisplayErrorContext(&err).to_string())
184}
185
186#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct SchemaVersion {
194 pub schema_version_id: Option<String>,
195 pub schema_arn: Option<String>,
196 pub definition: Option<String>,
198 pub data_format: Option<DataFormat>,
202 pub version_number: Option<i64>,
203 pub lifecycle_status: Option<SchemaVersionLifecycleStatus>,
204}
205
206impl SchemaVersion {
207 fn from_sdk(output: GetSchemaVersionOutput) -> Self {
208 SchemaVersion {
209 schema_version_id: output.schema_version_id,
210 schema_arn: output.schema_arn,
211 definition: output.schema_definition,
212 data_format: output.data_format.map(DataFormat::from_sdk),
213 version_number: output.version_number,
214 lifecycle_status: output.status.map(SchemaVersionLifecycleStatus::from_sdk),
215 }
216 }
217}
218
219#[derive(Debug, Clone, PartialEq, Eq)]
225pub enum DataFormat {
226 Avro,
227 Json,
228 Protobuf,
229 Unknown(String),
231}
232
233impl DataFormat {
234 fn from_sdk(format: SdkDataFormat) -> Self {
235 match &format {
236 SdkDataFormat::Avro => DataFormat::Avro,
237 SdkDataFormat::Json => DataFormat::Json,
238 SdkDataFormat::Protobuf => DataFormat::Protobuf,
239 _ => DataFormat::Unknown(format.as_str().to_string()),
240 }
241 }
242}
243
244#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum SchemaVersionLifecycleStatus {
250 Available,
251 Pending,
252 Failure,
253 Deleting,
254 Unknown(String),
256}
257
258impl SchemaVersionLifecycleStatus {
259 fn from_sdk(status: SdkSchemaVersionStatus) -> Self {
260 match &status {
261 SdkSchemaVersionStatus::Available => SchemaVersionLifecycleStatus::Available,
262 SdkSchemaVersionStatus::Pending => SchemaVersionLifecycleStatus::Pending,
263 SdkSchemaVersionStatus::Failure => SchemaVersionLifecycleStatus::Failure,
264 SdkSchemaVersionStatus::Deleting => SchemaVersionLifecycleStatus::Deleting,
265 _ => SchemaVersionLifecycleStatus::Unknown(status.as_str().to_string()),
266 }
267 }
268}
269
270#[derive(Debug, Error)]
273pub enum GetSchemaVersionError {
274 #[error("schema version not found")]
277 NotFound,
278 #[error("AWS Glue error: {0}")]
280 Other(String),
281}
282
283fn classify_get_schema_version_error(
284 err: SdkError<SdkGetSchemaVersionError>,
285) -> GetSchemaVersionError {
286 if let SdkError::ServiceError(service_err) = &err
287 && matches!(
288 service_err.err(),
289 SdkGetSchemaVersionError::EntityNotFoundException(_)
290 )
291 {
292 return GetSchemaVersionError::NotFound;
293 }
294 GetSchemaVersionError::Other(DisplayErrorContext(&err).to_string())
295}