Skip to main content

mz_aws_glue_schema_registry/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Glue Schema Registry client.
11
12use aws_sdk_glue::error::{DisplayErrorContext, SdkError};
13use aws_sdk_glue::operation::get_registry::GetRegistryError as SdkGetRegistryError;
14use aws_sdk_glue::operation::get_schema_version::{
15    GetSchemaVersionError as SdkGetSchemaVersionError, GetSchemaVersionOutput,
16};
17use aws_sdk_glue::types::{
18    DataFormat as SdkDataFormat, RegistryId, RegistryStatus as SdkRegistryStatus, SchemaId,
19    SchemaVersionNumber, SchemaVersionStatus as SdkSchemaVersionStatus,
20};
21use aws_types::SdkConfig;
22use thiserror::Error;
23use uuid::Uuid;
24
25/// An API client for the AWS Glue Schema Registry.
26///
27/// `Client` is cheap to clone — internally it wraps an [`aws_sdk_glue::Client`],
28/// which is itself a clone-friendly handle backed by a shared connection pool.
29#[derive(Clone, Debug)]
30pub struct Client {
31    inner: aws_sdk_glue::Client,
32}
33
34impl Client {
35    pub(crate) fn from_sdk_config(sdk_config: SdkConfig) -> Self {
36        Client {
37            inner: aws_sdk_glue::Client::new(&sdk_config),
38        }
39    }
40
41    /// Look up a registry by name.
42    ///
43    /// Returns [`GetRegistryError::NotFound`] if the registry does not exist
44    /// in the configured account and region. Other errors (auth failures,
45    /// throttling, transport) surface as [`GetRegistryError::Other`].
46    pub async fn get_registry(&self, name: &str) -> Result<Registry, GetRegistryError> {
47        let id = RegistryId::builder().registry_name(name).build();
48        let output = self
49            .inner
50            .get_registry()
51            .registry_id(id)
52            .send()
53            .await
54            .map_err(classify_get_registry_error)?;
55        Ok(Registry {
56            name: output.registry_name,
57            arn: output.registry_arn,
58            description: output.description,
59            lifecycle_status: output.status.map(RegistryLifecycleStatus::from_sdk),
60        })
61    }
62
63    /// Fetch a schema version by its UUID.
64    ///
65    /// This is the source-decode path: the UUID is read from the Glue
66    /// wire-format header, and the returned `SchemaVersion::definition`
67    /// carries the writer schema (Avro JSON, for our usage).
68    ///
69    /// Glue schema-version UUIDs are globally unique within an AWS account
70    /// and this call does **not** scope to any registry — it returns the
71    /// matching version from anywhere the configured credentials can read.
72    /// Returns [`GetSchemaVersionError::NotFound`] only when no schema
73    /// version with this UUID exists in any visible registry. Callers that
74    /// need to enforce a specific registry must check the returned
75    /// `SchemaArn` themselves.
76    ///
77    /// The AWS `GetSchemaVersion` API accepts *either* a `SchemaVersionId`
78    /// (the UUID, registry-agnostic) *or* `SchemaId + SchemaVersionNumber`,
79    /// never both — which is why looking up by name is a separate method,
80    /// [`Client::get_schema_version_latest_by_name`].
81    pub async fn get_schema_version_by_id(
82        &self,
83        id: Uuid,
84    ) -> Result<SchemaVersion, GetSchemaVersionError> {
85        let output = self
86            .inner
87            .get_schema_version()
88            .schema_version_id(id.to_string())
89            .send()
90            .await
91            .map_err(classify_get_schema_version_error)?;
92        Ok(SchemaVersion::from_sdk(output))
93    }
94
95    /// Fetch the latest version of a schema by `(registry_name, schema_name)`.
96    ///
97    /// This is the DDL-planning path: at `CREATE SOURCE` time we don't yet
98    /// know any per-record UUIDs, so we pin the reader schema to whatever
99    /// the registry currently calls "latest". Runtime schema resolution
100    /// for individual records still goes through
101    /// [`Client::get_schema_version_by_id`] using the UUID in each Kafka
102    /// payload's Glue header.
103    pub async fn get_schema_version_latest_by_name(
104        &self,
105        registry_name: &str,
106        schema_name: &str,
107    ) -> Result<SchemaVersion, GetSchemaVersionError> {
108        let schema_id = SchemaId::builder()
109            .registry_name(registry_name)
110            .schema_name(schema_name)
111            .build();
112        let version_number = SchemaVersionNumber::builder().latest_version(true).build();
113        let output = self
114            .inner
115            .get_schema_version()
116            .schema_id(schema_id)
117            .schema_version_number(version_number)
118            .send()
119            .await
120            .map_err(classify_get_schema_version_error)?;
121        Ok(SchemaVersion::from_sdk(output))
122    }
123}
124
125/// A Glue Schema Registry, as returned by [`Client::get_registry`].
126///
127/// Only the fields Materialize currently cares about are surfaced; the full
128/// SDK type carries a few additional timestamps that we ignore.
129#[derive(Debug, Clone, PartialEq, Eq)]
130pub struct Registry {
131    pub name: Option<String>,
132    pub arn: Option<String>,
133    pub description: Option<String>,
134    pub lifecycle_status: Option<RegistryLifecycleStatus>,
135}
136
137/// Lifecycle status of a Glue registry.
138///
139/// Mirrors `aws_sdk_glue::types::RegistryStatus`, with `Unknown(String)` as
140/// the forward-compat escape hatch for variants AWS may add later. Keeping
141/// our own enum means callers get exhaustive matching without taking a
142/// direct dependency on the SDK type.
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub enum RegistryLifecycleStatus {
145    Available,
146    Deleting,
147    /// A value the SDK reported that this crate does not yet know about.
148    Unknown(String),
149}
150
151impl RegistryLifecycleStatus {
152    fn from_sdk(status: SdkRegistryStatus) -> Self {
153        match &status {
154            SdkRegistryStatus::Available => RegistryLifecycleStatus::Available,
155            SdkRegistryStatus::Deleting => RegistryLifecycleStatus::Deleting,
156            _ => RegistryLifecycleStatus::Unknown(status.as_str().to_string()),
157        }
158    }
159}
160
161/// Errors returned by [`Client::get_registry`].
162#[derive(Debug, Error)]
163pub enum GetRegistryError {
164    /// The named registry does not exist in the configured account/region.
165    /// Maps from Glue's `EntityNotFoundException`.
166    #[error("registry not found")]
167    NotFound,
168    /// Anything else: auth failure, throttling, transport error, etc.
169    /// The wrapped message preserves the upstream SDK's diagnostic.
170    #[error("AWS Glue error: {0}")]
171    Other(String),
172}
173
174fn classify_get_registry_error(err: SdkError<SdkGetRegistryError>) -> GetRegistryError {
175    if let SdkError::ServiceError(service_err) = &err
176        && matches!(
177            service_err.err(),
178            SdkGetRegistryError::EntityNotFoundException(_)
179        )
180    {
181        return GetRegistryError::NotFound;
182    }
183    GetRegistryError::Other(DisplayErrorContext(&err).to_string())
184}
185
186/// A Glue schema version, as returned by [`Client::get_schema_version_by_id`]
187/// and [`Client::get_schema_version_latest_by_name`].
188///
189/// `definition` is the format-specific schema text; for Avro it is a JSON
190/// document the Avro parser can ingest directly. The remaining fields are
191/// informational and exist for debug logging.
192#[derive(Debug, Clone, PartialEq, Eq)]
193pub struct SchemaVersion {
194    pub schema_version_id: Option<String>,
195    pub schema_arn: Option<String>,
196    /// The format-specific schema text (Avro JSON, JSON Schema, etc.).
197    pub definition: Option<String>,
198    /// Glue's data-format tag — `AVRO`, `JSON`, or `PROTOBUF`. Mirrors the
199    /// SDK enum so callers get exhaustive matching without depending on the
200    /// SDK type directly.
201    pub data_format: Option<DataFormat>,
202    pub version_number: Option<i64>,
203    pub lifecycle_status: Option<SchemaVersionLifecycleStatus>,
204}
205
206impl SchemaVersion {
207    fn from_sdk(output: GetSchemaVersionOutput) -> Self {
208        SchemaVersion {
209            schema_version_id: output.schema_version_id,
210            schema_arn: output.schema_arn,
211            definition: output.schema_definition,
212            data_format: output.data_format.map(DataFormat::from_sdk),
213            version_number: output.version_number,
214            lifecycle_status: output.status.map(SchemaVersionLifecycleStatus::from_sdk),
215        }
216    }
217}
218
219/// Data format of a Glue schema.
220///
221/// Mirrors `aws_sdk_glue::types::DataFormat`, with `Unknown(String)` as the
222/// forward-compat escape hatch for variants AWS may add later. See
223/// [`RegistryLifecycleStatus`] for the rationale.
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub enum DataFormat {
226    Avro,
227    Json,
228    Protobuf,
229    /// A value the SDK reported that this crate does not yet know about.
230    Unknown(String),
231}
232
233impl DataFormat {
234    fn from_sdk(format: SdkDataFormat) -> Self {
235        match &format {
236            SdkDataFormat::Avro => DataFormat::Avro,
237            SdkDataFormat::Json => DataFormat::Json,
238            SdkDataFormat::Protobuf => DataFormat::Protobuf,
239            _ => DataFormat::Unknown(format.as_str().to_string()),
240        }
241    }
242}
243
244/// Lifecycle status of a Glue schema version.
245///
246/// Mirrors `aws_sdk_glue::types::SchemaVersionStatus`. See
247/// [`RegistryLifecycleStatus`] for the rationale.
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum SchemaVersionLifecycleStatus {
250    Available,
251    Pending,
252    Failure,
253    Deleting,
254    /// A value the SDK reported that this crate does not yet know about.
255    Unknown(String),
256}
257
258impl SchemaVersionLifecycleStatus {
259    fn from_sdk(status: SdkSchemaVersionStatus) -> Self {
260        match &status {
261            SdkSchemaVersionStatus::Available => SchemaVersionLifecycleStatus::Available,
262            SdkSchemaVersionStatus::Pending => SchemaVersionLifecycleStatus::Pending,
263            SdkSchemaVersionStatus::Failure => SchemaVersionLifecycleStatus::Failure,
264            SdkSchemaVersionStatus::Deleting => SchemaVersionLifecycleStatus::Deleting,
265            _ => SchemaVersionLifecycleStatus::Unknown(status.as_str().to_string()),
266        }
267    }
268}
269
270/// Errors returned by [`Client::get_schema_version_by_id`] and
271/// [`Client::get_schema_version_latest_by_name`].
272#[derive(Debug, Error)]
273pub enum GetSchemaVersionError {
274    /// No matching schema version exists in the configured account/region.
275    /// Maps from Glue's `EntityNotFoundException`.
276    #[error("schema version not found")]
277    NotFound,
278    /// Anything else: auth failure, throttling, transport error, etc.
279    #[error("AWS Glue error: {0}")]
280    Other(String),
281}
282
283fn classify_get_schema_version_error(
284    err: SdkError<SdkGetSchemaVersionError>,
285) -> GetSchemaVersionError {
286    if let SdkError::ServiceError(service_err) = &err
287        && matches!(
288            service_err.err(),
289            SdkGetSchemaVersionError::EntityNotFoundException(_)
290        )
291    {
292        return GetSchemaVersionError::NotFound;
293    }
294    GetSchemaVersionError::Other(DisplayErrorContext(&err).to_string())
295}