Skip to main content

mz_interchange/avro/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Conversion from Avro schemas to Materialize `RelationDesc`s.
11//!
12//! A few notes for posterity on how this conversion happens are in order.
13//!
14//! If the schema is an Avro record, we flatten it to its fields, which become the columns
15//! of the relation.
16//!
17//! Each individual field is then converted to its SQL equivalent. For most types, this
18//! conversion is the obvious one. The only non-trivial counterexample is Avro unions.
19//!
20//! Since Avro types are not nullable by default, the typical way normal (i.e., nullable)
21//! SQL fields are represented in Avro is by a union of the underlying type with the
22//! singleton type { Null }; in Avro schema notation, this is `["null", "TheType"]`.
23//! We shall call union types following this pattern _Nullability-Pattern Unions_.
24//! We shall call all other union types (e.g. `["MyType1", "MyType2"]` or `["null", "MyType1", "MyType2"]`) _Essential Unions_.
25//! Since there is an obvious way to represent Nullability-Pattern Unions, but not Essential Unions, in the SQL type system,
26//! we must handle Essential Unions with a bit of a hack (at least until Materialize supports union or sum types, which may be never).
27//!
28//! When an Essential Union appears as one of the fields of a record, we expand
29//! it to _n_ columns in SQL, where _n_ is the number of non-null variants in the union. These
30//! columns will be given names created by pasting their index at the end of the overall name
31//! of the field. For example, if an Essential Union in a field named `"Foo"` has schema `[int, bool]`, it will expand to the columns `"Foo1": bool, "Foo2": int`. There is an implicit constraint upheld be the source pipeline that only one such column will be non-`null` at a time
32//!
33//! When an Essential Union appears _elsewhere_ than as one of the fields of a record,
34//! there is nothing we can do, because we expect to be able to turn it into exactly one
35//! SQL type, not a series of them. Thus, in these cases, we just bail. For example, it's
36//! not possible to ingest an array or map whose element type is an Essential Union.
37
38use std::collections::btree_map::Entry;
39use std::collections::{BTreeMap, BTreeSet};
40use std::fmt;
41use std::sync::Arc;
42
43use anyhow::{Context, anyhow, bail};
44use mz_avro::error::Error as AvroError;
45use mz_avro::schema::{
46    ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas,
47};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_ore::future::OreFutureExt;
51use mz_ore::retry::Retry;
52use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
53use mz_repr::adt::timestamp::TimestampPrecision;
54use mz_repr::{ColumnName, RelationDesc, SqlColumnType, SqlScalarType, UNKNOWN_COLUMN_NAME};
55use tracing::warn;
56use uuid::Uuid;
57
58use crate::avro::is_null;
59
60pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result<Schema> {
61    let schema: serde_json::Value = serde_json::from_str(schema)?;
62    // Parse reference schemas incrementally: each reference may depend on previous ones.
63    // References must be provided in dependency order (dependencies first).
64    let mut parsed_refs: Vec<Schema> = Vec::with_capacity(references.len());
65    for reference in references {
66        let ref_json: serde_json::Value = serde_json::from_str(reference)?;
67        let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?;
68        parsed_refs.push(parsed);
69    }
70    Ok(Schema::parse_with_references(&schema, &parsed_refs)?)
71}
72
73/// Converts an Apache Avro schema into a list of column names and types.
74// TODO(petrosagg): find a way to make this a TryFrom impl somewhere
75pub fn schema_to_relationdesc(schema: Schema) -> Result<RelationDesc, anyhow::Error> {
76    // TODO(petrosagg): call directly into validate_schema_2 and do the Record flattening once
77    // we're in RelationDesc land
78    Ok(RelationDesc::from_names_and_types(validate_schema_1(
79        schema.top_node(),
80    )?))
81}
82
83/// Convert an Avro schema to a series of columns and names, flattening the top-level record,
84/// if the top node is indeed a record.
85fn validate_schema_1(schema: SchemaNode) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
86    let mut columns = vec![];
87    let mut seen_avro_nodes = Default::default();
88    match schema.inner {
89        SchemaPiece::Record { fields, .. } => {
90            for f in fields {
91                columns.extend(get_named_columns(
92                    &mut seen_avro_nodes,
93                    schema.step(&f.schema),
94                    Some(&f.name),
95                )?);
96            }
97        }
98        _ => {
99            columns.extend(get_named_columns(&mut seen_avro_nodes, schema, None)?);
100        }
101    }
102    Ok(columns)
103}
104
105/// Get the series of (one or more) SQL columns corresponding to an Avro union.
106/// See module comments for details.
107fn get_union_columns<'a>(
108    seen_avro_nodes: &mut BTreeSet<usize>,
109    schema: SchemaNode<'a>,
110    base_name: Option<&str>,
111) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
112    let us = match schema.inner {
113        SchemaPiece::Union(us) => us,
114        _ => panic!("This function should only be called on unions."),
115    };
116    let mut columns = vec![];
117    let vs = us.variants();
118    if vs.is_empty() || (vs.len() == 1 && is_null(&vs[0])) {
119        bail!(anyhow!("Empty or null-only unions are not supported"));
120    } else {
121        for (i, v) in vs.iter().filter(|v| !is_null(v)).enumerate() {
122            with_recursion_guard(seen_avro_nodes, schema.root, v, |seen| {
123                let node = schema.step(v);
124                if let SchemaPiece::Union(_) = node.inner {
125                    unreachable!("Internal error: directly nested avro union!");
126                }
127
128                let name = if vs.len() == 1 || (vs.len() == 2 && vs.iter().any(is_null)) {
129                    // There is only one non-null variant in the
130                    // union, so we can use the field name directly.
131                    base_name
132                        .map(|n| n.to_owned())
133                        .or_else(|| {
134                            v.get_piece_and_name(schema.root)
135                                .1
136                                .map(|full_name| full_name.base_name().to_owned())
137                        })
138                        .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
139                } else {
140                    // There are multiple non-null variants in the
141                    // union, so we need to invent field names for
142                    // each variant.
143                    base_name
144                        .map(|n| format!("{}{}", n, i + 1))
145                        .or_else(|| {
146                            v.get_piece_and_name(schema.root)
147                                .1
148                                .map(|full_name| full_name.base_name().to_owned())
149                        })
150                        .unwrap_or_else(|| UNKNOWN_COLUMN_NAME.into())
151                };
152
153                // If there is more than one variant in the union,
154                // the column's output type is nullable, as this
155                // column will be null whenever it is uninhabited.
156                let ty = validate_schema_2(seen, node)?;
157                columns.push((name.into(), ty.nullable(vs.len() > 1)));
158                Ok(())
159            })?;
160        }
161    }
162    Ok(columns)
163}
164
165fn get_named_columns<'a>(
166    seen_avro_nodes: &mut BTreeSet<usize>,
167    schema: SchemaNode<'a>,
168    base_name: Option<&str>,
169) -> anyhow::Result<Vec<(ColumnName, SqlColumnType)>> {
170    if let SchemaPiece::Union(_) = schema.inner {
171        get_union_columns(seen_avro_nodes, schema, base_name)
172    } else {
173        let scalar_type = validate_schema_2(seen_avro_nodes, schema)?;
174        Ok(vec![(
175            // TODO(benesch): we should do better than this when there's no base
176            // name, e.g., invent a name based on the type.
177            base_name.unwrap_or(UNKNOWN_COLUMN_NAME).into(),
178            scalar_type.nullable(false),
179        )])
180    }
181}
182
183/// Get the single column corresponding to a schema node.
184/// It is an error if this node should correspond to more than one column
185/// (because it is an Essential Union in the sense described in the module docs).
186fn validate_schema_2(
187    seen_avro_nodes: &mut BTreeSet<usize>,
188    schema: SchemaNode,
189) -> anyhow::Result<SqlScalarType> {
190    Ok(match schema.inner {
191        SchemaPiece::Union(_) => {
192            let columns = get_union_columns(seen_avro_nodes, schema, None)?;
193            if columns.len() != 1 {
194                bail!("Union of more than one non-null type not valid here");
195            }
196            let (_column_name, column_type) = columns.into_element();
197            // It's okay to lose the nullability information here, as it's not relevant to
198            // any higher layer. This will either be included in an array or map type,
199            // where all values are nullable. It can't be included as a top-level column
200            // or as a record type, where nullability is actually tracked, because in
201            // those cases we will have already gone through the `Union` code path in
202            // `get_named_columns`.
203            column_type.scalar_type
204        }
205        SchemaPiece::Null => bail!("null outside of union types is not supported"),
206        SchemaPiece::Boolean => SqlScalarType::Bool,
207        SchemaPiece::Int => SqlScalarType::Int32,
208        SchemaPiece::Long => SqlScalarType::Int64,
209        SchemaPiece::Float => SqlScalarType::Float32,
210        SchemaPiece::Double => SqlScalarType::Float64,
211        SchemaPiece::Date => SqlScalarType::Date,
212        SchemaPiece::TimestampMilli => SqlScalarType::Timestamp {
213            precision: Some(TimestampPrecision::try_from(3).unwrap()),
214        },
215        SchemaPiece::TimestampMicro => SqlScalarType::Timestamp {
216            precision: Some(TimestampPrecision::try_from(6).unwrap()),
217        },
218        SchemaPiece::Decimal {
219            precision, scale, ..
220        } => {
221            if *precision > usize::cast_from(NUMERIC_DATUM_MAX_PRECISION) {
222                bail!(
223                    "decimals with precision greater than {} are not supported",
224                    NUMERIC_DATUM_MAX_PRECISION
225                )
226            }
227            SqlScalarType::Numeric {
228                max_scale: Some(NumericMaxScale::try_from(*scale)?),
229            }
230        }
231        SchemaPiece::Bytes | SchemaPiece::Fixed { .. } => SqlScalarType::Bytes,
232        SchemaPiece::String | SchemaPiece::Enum { .. } => SqlScalarType::String,
233
234        SchemaPiece::Json => SqlScalarType::Jsonb,
235        SchemaPiece::Uuid => SqlScalarType::Uuid,
236        SchemaPiece::Record { fields, .. } => {
237            let mut columns = vec![];
238            for f in fields {
239                with_recursion_guard(seen_avro_nodes, schema.root, &f.schema, |seen| {
240                    columns.extend(get_named_columns(
241                        seen,
242                        schema.step(&f.schema),
243                        Some(&f.name),
244                    )?);
245                    Ok(())
246                })?;
247            }
248            SqlScalarType::Record {
249                fields: columns.into(),
250                custom_id: None,
251            }
252        }
253        SchemaPiece::Array(inner) => {
254            with_recursion_guard(seen_avro_nodes, schema.root, inner.as_ref(), |seen| {
255                Ok(SqlScalarType::List {
256                    element_type: Box::new(validate_schema_2(seen, schema.step(inner))?),
257                    custom_id: None,
258                })
259            })?
260        }
261        SchemaPiece::Map(inner) => {
262            with_recursion_guard(seen_avro_nodes, schema.root, inner.as_ref(), |seen| {
263                Ok(SqlScalarType::Map {
264                    value_type: Box::new(validate_schema_2(seen, schema.step(inner))?),
265                    custom_id: None,
266                })
267            })?
268        }
269        _ => bail!("Unsupported type in schema: {:?}", schema.inner),
270    })
271}
272
273/// Runs `f` with `node` marked as on the current resolution path, bailing if it's
274/// already on the path (a cycle). The mark is cleared on exit so sibling reuse of a
275/// named type isn't flagged.
276fn with_recursion_guard<T>(
277    seen: &mut BTreeSet<usize>,
278    root: &Schema,
279    node: &SchemaPieceOrNamed,
280    f: impl FnOnce(&mut BTreeSet<usize>) -> anyhow::Result<T>,
281) -> anyhow::Result<T> {
282    let named_idx = match node {
283        SchemaPieceOrNamed::Named(idx) => Some(*idx),
284        SchemaPieceOrNamed::Piece(_) => None,
285    };
286    if let Some(named_idx) = named_idx {
287        if !seen.insert(named_idx) {
288            bail!(
289                "Recursive types are not supported: {}",
290                node.get_human_name(root)
291            );
292        }
293    }
294    let result = f(seen);
295    if let Some(named_idx) = named_idx {
296        seen.remove(&named_idx);
297    }
298    result
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    /// A named type that refers back to itself cannot be represented in the SQL
306    /// type system. Recursion can be introduced through any container that holds
307    /// a named reference: record fields (directly or via a union), arrays, and
308    /// maps. Each should be rejected rather than recursed into forever.
309    fn assert_recursive(schema: &str) {
310        let err = schema_to_relationdesc(parse_schema(schema, &[]).expect("schema should parse"))
311            .expect_err("recursive schema should be rejected");
312        assert!(
313            err.to_string()
314                .contains("Recursive types are not supported"),
315            "unexpected error: {err}"
316        );
317    }
318
319    #[mz_ore::test]
320    fn recursive_record_field() {
321        assert_recursive(r#"{"type":"record","name":"a","fields":[{"name":"f","type":"a"}]}"#);
322    }
323
324    #[mz_ore::test]
325    fn recursive_union() {
326        assert_recursive(
327            r#"{"type":"record","name":"a","fields":[{"name":"f","type":["a","null"]}]}"#,
328        );
329    }
330
331    #[mz_ore::test]
332    fn recursive_array() {
333        assert_recursive(
334            r#"{"type":"record","name":"a","fields":[{"name":"f","type":{"type":"array","items":"a"}}]}"#,
335        );
336    }
337
338    #[mz_ore::test]
339    fn recursive_map() {
340        assert_recursive(
341            r#"{"type":"record","name":"a","fields":[{"name":"f","type":{"type":"map","values":"a"}}]}"#,
342        );
343    }
344
345    /// Reusing a named type in sibling positions is a diamond, not a cycle, and
346    /// must not be flagged as recursive. Guards against the path-tracking set
347    /// failing to release a node after it leaves the current path.
348    #[mz_ore::test]
349    fn repeated_named_type_is_not_recursive() {
350        let schema = r#"{
351            "type": "record",
352            "name": "outer",
353            "fields": [
354                {"name": "a", "type": {"type": "record", "name": "inner", "fields": [{"name": "x", "type": "int"}]}},
355                {"name": "b", "type": "inner"}
356            ]
357        }"#;
358        let desc = schema_to_relationdesc(parse_schema(schema, &[]).expect("schema should parse"))
359            .expect("diamond reuse of a named type should be allowed");
360        assert_eq!(desc.arity(), 2);
361    }
362
363    #[mz_ore::test]
364    fn registry_name_from_schema_arn_commercial() {
365        assert_eq!(
366            registry_name_from_schema_arn(
367                "arn:aws:glue:us-east-1:123456789012:schema/myreg/myschema"
368            ),
369            Some("myreg")
370        );
371    }
372
373    #[mz_ore::test]
374    fn registry_name_from_schema_arn_china_partition() {
375        assert_eq!(
376            registry_name_from_schema_arn(
377                "arn:aws-cn:glue:cn-north-1:123456789012:schema/myreg/myschema"
378            ),
379            Some("myreg")
380        );
381    }
382
383    #[mz_ore::test]
384    fn registry_name_from_schema_arn_govcloud_partition() {
385        assert_eq!(
386            registry_name_from_schema_arn(
387                "arn:aws-us-gov:glue:us-gov-west-1:123456789012:schema/myreg/myschema"
388            ),
389            Some("myreg")
390        );
391    }
392
393    #[mz_ore::test]
394    fn registry_name_from_schema_arn_rejects_non_aws_partition() {
395        assert_eq!(
396            registry_name_from_schema_arn(
397                "arn:gcp:glue:us-east-1:123456789012:schema/myreg/myschema"
398            ),
399            None
400        );
401    }
402
403    #[mz_ore::test]
404    fn registry_name_from_schema_arn_rejects_missing_arn_prefix() {
405        // Bare `schema/...` fragments must not parse — see the anchor on
406        // `arn:` in [`registry_name_from_schema_arn`].
407        assert_eq!(registry_name_from_schema_arn("schema/myreg/myschema"), None);
408    }
409
410    #[mz_ore::test]
411    fn registry_name_from_schema_arn_rejects_non_glue_service() {
412        assert_eq!(
413            registry_name_from_schema_arn(
414                "arn:aws:s3:us-east-1:123456789012:schema/myreg/myschema"
415            ),
416            None
417        );
418    }
419
420    #[mz_ore::test]
421    fn registry_name_from_schema_arn_rejects_missing_schema_segment() {
422        assert_eq!(
423            registry_name_from_schema_arn("arn:aws:glue:us-east-1:123456789012:table/myreg/foo"),
424            None
425        );
426    }
427
428    #[mz_ore::test]
429    fn registry_name_from_schema_arn_rejects_empty_registry() {
430        assert_eq!(
431            registry_name_from_schema_arn("arn:aws:glue:us-east-1:123456789012:schema//myschema"),
432            None
433        );
434    }
435
436    #[mz_ore::test]
437    fn registry_name_from_schema_arn_allows_schema_name_with_slashes() {
438        // Glue schema *names* aren't restricted the way registry names are.
439        // Only the segment before the first `/` matters for the registry.
440        assert_eq!(
441            registry_name_from_schema_arn(
442                "arn:aws:glue:us-east-1:123456789012:schema/myreg/path/like/name"
443            ),
444            Some("myreg")
445        );
446    }
447}
448
449/// Identifier carried in a wire-format header that points at the writer's
450/// schema. Different schema registries key their writer schemas differently:
451/// Confluent uses a sequential `i32`, AWS Glue uses a UUID. Callers do not
452/// have to care which kind of key they're holding — the resolver routes it
453/// back to the matching cache.
454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
455pub enum WriterSchemaKey {
456    Confluent(i32),
457    Glue(Uuid),
458}
459
460impl fmt::Display for WriterSchemaKey {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        match self {
463            WriterSchemaKey::Confluent(id) => write!(f, "Confluent schema id {}", id),
464            WriterSchemaKey::Glue(uuid) => write!(f, "Glue schema-version {}", uuid),
465        }
466    }
467}
468
469/// Provides writer schemas to an [`AvroSchemaResolver`].
470///
471/// Mirrors the `WireFormat<C>` enum on the catalog side: a decoder can run
472/// without any wire-format framing, with Confluent framing, or with AWS
473/// Glue framing (each of the framed variants optionally without a
474/// registry to fetch from). Each variant owns its cache type by
475/// construction, so the resolver cannot mis-route a key to the wrong
476/// cache.
477pub enum WriterSchemaProvider {
478    /// No wire-format framing. The resolver always returns the reader
479    /// schema and never consumes header bytes.
480    None,
481    /// Confluent framing. `cache: None` means strip-and-discard the
482    /// schema id (no registry attached); `cache: Some` means fetch from
483    /// the cache.
484    Confluent { cache: Option<SchemaCache> },
485    /// AWS Glue framing. `cache: None` means strip-and-discard the UUID
486    /// (no registry attached); `cache: Some` means fetch from the cache.
487    Glue { cache: Option<GlueSchemaCache> },
488}
489
490impl fmt::Debug for WriterSchemaProvider {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        let tag = match self {
493            WriterSchemaProvider::None => "none",
494            WriterSchemaProvider::Confluent { cache: None } => "confluent (no cache)",
495            WriterSchemaProvider::Confluent { cache: Some(_) } => "confluent",
496            WriterSchemaProvider::Glue { cache: None } => "glue (no cache)",
497            WriterSchemaProvider::Glue { cache: Some(_) } => "glue",
498        };
499        f.debug_tuple("WriterSchemaProvider").field(&tag).finish()
500    }
501}
502
503impl WriterSchemaProvider {
504    /// Build the Confluent variant from an optional CCSR client. `None`
505    /// means "Confluent framing but no registry to fetch from" — the
506    /// resolver will strip the header and fall back to the reader schema.
507    pub fn confluent(ccsr_client: Option<mz_ccsr::Client>) -> Self {
508        let cache = ccsr_client.map(SchemaCache::new);
509        WriterSchemaProvider::Confluent { cache }
510    }
511
512    /// Build the Glue variant from an optional (client, registry-name)
513    /// pair. `None` means "Glue framing but no registry to fetch from" —
514    /// the resolver strips the UUID header and falls back to the reader
515    /// schema. When the pair is supplied, the registry name is the one
516    /// the catalog connection points at; the cache rejects any UUID
517    /// whose `SchemaArn` doesn't sit under that registry. (Schema
518    /// *names* aren't unique across Glue registries, so the registry
519    /// name is the only meaningful scope here.) Pairing the two in a
520    /// tuple makes the "no client → no registry name" case unrepresentable.
521    pub fn glue(
522        client_with_registry: Option<(mz_aws_glue_schema_registry::Client, String)>,
523    ) -> Self {
524        let cache = client_with_registry.map(|(c, registry)| GlueSchemaCache::new(c, registry));
525        WriterSchemaProvider::Glue { cache }
526    }
527}
528
529pub struct AvroSchemaResolver {
530    reader_schema: Schema,
531    writer_schemas: WriterSchemaProvider,
532}
533
534impl AvroSchemaResolver {
535    pub fn new(
536        reader_schema: &str,
537        reader_reference_schemas: &[String],
538        writer_schemas: WriterSchemaProvider,
539    ) -> anyhow::Result<Self> {
540        // parse_schema handles incremental parsing of references (dependencies first)
541        let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?;
542        Ok(Self {
543            reader_schema,
544            writer_schemas,
545        })
546    }
547
548    pub async fn resolve<'a, 'b>(
549        &'a mut self,
550        mut bytes: &'b [u8],
551    ) -> anyhow::Result<anyhow::Result<(&'b [u8], &'a Schema, Option<WriterSchemaKey>)>> {
552        let (resolved_schema, key) = match &mut self.writer_schemas {
553            WriterSchemaProvider::None => (&self.reader_schema, None),
554
555            WriterSchemaProvider::Confluent { cache: None } => {
556                // Validate the header (so we surface producer/consumer
557                // framing mismatches early) and discard the schema id —
558                // there is no registry to look it up in.
559                match crate::confluent::extract_avro_header(bytes) {
560                    Ok((_id, adjusted_bytes)) => {
561                        bytes = adjusted_bytes;
562                        (&self.reader_schema, None)
563                    }
564                    Err(err) => return Ok(Err(err)),
565                }
566            }
567
568            WriterSchemaProvider::Confluent { cache: Some(cache) } => {
569                let (id, adjusted_bytes) = match crate::confluent::extract_avro_header(bytes) {
570                    Ok(ok) => ok,
571                    Err(err) => return Ok(Err(err)),
572                };
573                bytes = adjusted_bytes;
574                let result = cache
575                    .get(id, &self.reader_schema)
576                    // The outer Result describes transient errors so use ?
577                    // here to propagate; the inner Result is the cached
578                    // permanent outcome (parsed schema or parse error) and
579                    // is handled below.
580                    .await?
581                    .with_context(|| format!("failed to resolve Avro schema (id = {id})"));
582                let schema = match result {
583                    Ok(schema) => schema,
584                    Err(err) => return Ok(Err(err)),
585                };
586                (schema, Some(WriterSchemaKey::Confluent(id)))
587            }
588
589            WriterSchemaProvider::Glue { cache: None } => {
590                // Strip + discard the header; no registry to look up.
591                match crate::glue::extract_avro_header(bytes) {
592                    Ok((_uuid, adjusted_bytes)) => {
593                        bytes = adjusted_bytes;
594                        (&self.reader_schema, None)
595                    }
596                    Err(err) => return Ok(Err(err)),
597                }
598            }
599
600            WriterSchemaProvider::Glue { cache: Some(cache) } => {
601                let (uuid, adjusted_bytes) = match crate::glue::extract_avro_header(bytes) {
602                    Ok(ok) => ok,
603                    Err(err) => return Ok(Err(err)),
604                };
605                bytes = adjusted_bytes;
606                let result = cache
607                    .get(uuid, &self.reader_schema)
608                    .await?
609                    .with_context(|| format!("failed to resolve Avro schema (uuid = {uuid})"));
610                let schema = match result {
611                    Ok(schema) => schema,
612                    Err(err) => return Ok(Err(err)),
613                };
614                (schema, Some(WriterSchemaKey::Glue(uuid)))
615            }
616        };
617        Ok(Ok((bytes, resolved_schema, key)))
618    }
619}
620
621impl fmt::Debug for AvroSchemaResolver {
622    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623        f.debug_struct("AvroSchemaResolver")
624            .field("reader_schema", &self.reader_schema)
625            .field("writer_schemas", &self.writer_schemas)
626            .finish()
627    }
628}
629
630/// Glue-side analogue of [`SchemaCache`].
631///
632/// Differences from the CSR cache:
633/// * Keys are UUIDs (Glue schema-version IDs), not `i32`s.
634/// * Glue schemas are single definitions — no `references` field on
635///   `GetSchemaVersion`, so the cache does not chase a dependency graph.
636/// * No outer retry layer. `aws-sdk-glue` ships a "standard" retry policy
637///   by default that handles transient errors; layering our own
638///   `Retry::default()` on top would only amplify backoff.
639#[derive(Debug)]
640pub struct GlueSchemaCache {
641    cache: BTreeMap<Uuid, Result<Schema, AvroError>>,
642    glue_client: mz_aws_glue_schema_registry::Client,
643    /// The registry name the catalog connection points at. Per-UUID
644    /// fetches are cross-checked against this; mismatches become decode
645    /// errors. See type-level docs.
646    expected_registry: String,
647}
648
649impl GlueSchemaCache {
650    fn new(glue_client: mz_aws_glue_schema_registry::Client, expected_registry: String) -> Self {
651        GlueSchemaCache {
652            cache: BTreeMap::new(),
653            glue_client,
654            expected_registry,
655        }
656    }
657
658    /// Look up the writer schema for `uuid`, fetching from Glue on a
659    /// cache miss. Mirrors [`SchemaCache::get`]: the outer `Result`
660    /// surfaces transient errors (network, auth, throttling — already
661    /// retried by the SDK) that the caller may retry on; the inner
662    /// `Result` carries permanent failures (schema not found,
663    /// non-`Available` lifecycle, registry mismatch, parse failure) that
664    /// get cached so the same bad UUID does not re-hit Glue on every
665    /// record.
666    async fn get(
667        &mut self,
668        uuid: Uuid,
669        reader_schema: &Schema,
670    ) -> anyhow::Result<anyhow::Result<&Schema>> {
671        let entry = match self.cache.entry(uuid) {
672            Entry::Occupied(o) => o.into_mut(),
673            Entry::Vacant(v) => {
674                let parsed: Result<Schema, AvroError> = match self
675                    .glue_client
676                    .get_schema_version_by_id(uuid)
677                    .await
678                {
679                    Ok(version) => {
680                        Self::parse_version(version, uuid, &self.expected_registry, reader_schema)
681                    }
682                    // Permanent: UUID does not exist in any visible
683                    // registry. Cache so we don't re-fetch on every
684                    // record carrying this UUID.
685                    Err(mz_aws_glue_schema_registry::GetSchemaVersionError::NotFound) => Err(
686                        ParseSchemaError::new(format!("Glue schema version {uuid} not found"))
687                            .into(),
688                    ),
689                    // Transient: SDK has already retried; surface to the
690                    // outer Result so the source can decide. The explicit
691                    // `Other` arm (rather than a wildcard) makes any
692                    // future variant a compile error rather than a
693                    // silent reclassification.
694                    Err(e @ mz_aws_glue_schema_registry::GetSchemaVersionError::Other(_)) => {
695                        return Err(e.into());
696                    }
697                };
698                v.insert(parsed)
699            }
700        };
701        Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
702    }
703
704    /// Validate and parse a fetched [`mz_aws_glue_schema_registry::SchemaVersion`] into an Avro schema.
705    ///
706    /// All failures here are permanent (a retry would return the same
707    /// `SchemaVersion`) and get cached by the caller.
708    fn parse_version(
709        version: mz_aws_glue_schema_registry::SchemaVersion,
710        uuid: Uuid,
711        expected_registry: &str,
712        reader_schema: &Schema,
713    ) -> Result<Schema, AvroError> {
714        use mz_aws_glue_schema_registry::SchemaVersionLifecycleStatus;
715
716        // Reject anything not Available. Pending/Failure versions may
717        // still carry a `definition` from the SDK, but decoding records
718        // against them would yield garbage.
719        if !matches!(
720            version.lifecycle_status,
721            Some(SchemaVersionLifecycleStatus::Available)
722        ) {
723            return Err(ParseSchemaError::new(format!(
724                "Glue schema version {uuid} is not Available (status: {:?}); \
725                 refusing to decode",
726                version.lifecycle_status
727            ))
728            .into());
729        }
730        let definition = version.definition.ok_or_else(|| {
731            ParseSchemaError::new(format!(
732                "Glue schema version {uuid} returned without a definition"
733            ))
734        })?;
735        // Enforce that the fetched version actually lives in the
736        // registry our catalog connection points at. Without this, any
737        // UUID the credentials can resolve would decode silently,
738        // defeating the per-connection scope.
739        let arn = version.schema_arn.as_deref().ok_or_else(|| {
740            ParseSchemaError::new(format!(
741                "Glue schema version {uuid} returned without a SchemaArn; \
742                 cannot verify registry membership"
743            ))
744        })?;
745        let actual_registry = registry_name_from_schema_arn(arn).ok_or_else(|| {
746            ParseSchemaError::new(format!(
747                "Glue SchemaArn {arn:?} did not match the expected \
748                 arn:aws[-...]:glue:<region>:<account>:schema/<registry>/<schema> form"
749            ))
750        })?;
751        if actual_registry != expected_registry {
752            return Err(ParseSchemaError::new(format!(
753                "Glue schema version {uuid} lives in registry {actual_registry:?} \
754                 but this source is configured for registry {expected:?}; \
755                 refusing to decode",
756                expected = expected_registry,
757            ))
758            .into());
759        }
760        let value: serde_json::Value = serde_json::from_str(&definition)
761            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {e}")))?;
762        let schema = Schema::parse_with_references(&value, &[])?;
763        resolve_schemas(&schema, reader_schema)
764    }
765}
766
767/// Parse the registry name out of a Glue `SchemaArn`.
768///
769/// Glue schema ARNs have the shape
770/// `arn:<partition>:glue:<region>:<account>:schema/<registry>/<schema>`,
771/// where `<partition>` is `aws`, `aws-cn`, `aws-us-gov`, etc. We anchor
772/// on `arn:` and the `:glue:` segment so a fragment like
773/// `:schema/foo/bar` alone won't parse. Returns `None` if the input
774/// doesn't match — we surface that as a decode error rather than
775/// panicking, so schemas in unexpected partitions or future ARN shapes
776/// don't crash the source.
777fn registry_name_from_schema_arn(arn: &str) -> Option<&str> {
778    let rest = arn.strip_prefix("arn:")?;
779    let (partition, after_partition) = rest.split_once(":glue:")?;
780    if !partition.starts_with("aws") {
781        return None;
782    }
783    let (_, after_schema) = after_partition.split_once(":schema/")?;
784    let (registry, _) = after_schema.split_once('/')?;
785    if registry.is_empty() {
786        return None;
787    }
788    Some(registry)
789}
790
791/// Cache of writer schemas fetched from a Confluent Schema Registry. Held
792/// inside [`WriterSchemaProvider::Confluent`]; the type is named pub because that
793/// variant's field is reachable through the pub enum, but it has no pub
794/// constructor or methods — only [`WriterSchemaProvider::confluent`] can build one.
795#[derive(Debug)]
796pub struct SchemaCache {
797    cache: BTreeMap<i32, Result<Schema, AvroError>>,
798    ccsr_client: Arc<mz_ccsr::Client>,
799}
800
801impl SchemaCache {
802    fn new(ccsr_client: mz_ccsr::Client) -> SchemaCache {
803        SchemaCache {
804            cache: BTreeMap::new(),
805            ccsr_client: Arc::new(ccsr_client),
806        }
807    }
808
809    /// Looks up the writer schema for ID. If the schema is literally identical
810    /// to the reader schema, as determined by the reader schema fingerprint
811    /// that this schema cache was initialized with, returns the schema directly.
812    /// If not, performs schema resolution on the reader and writer and
813    /// returns the result.
814    ///
815    /// This method also handles schema references: if the schema references types
816    /// defined in other schemas, those schemas are fetched and their types are made
817    /// available during parsing.
818    async fn get(
819        &mut self,
820        id: i32,
821        reader_schema: &Schema,
822    ) -> anyhow::Result<anyhow::Result<&Schema>> {
823        let entry = match self.cache.entry(id) {
824            Entry::Occupied(o) => o.into_mut(),
825            Entry::Vacant(v) => {
826                // An issue with _fetching_ the schema should be returned
827                // immediately, and not cached, since it might get better on the
828                // next retry.
829                let ccsr_client = Arc::clone(&self.ccsr_client);
830
831                // Fetch schema with its references (if any)
832                let (primary_subject, reference_subjects) = Retry::default()
833                    // Twice the timeout of the ccsr client so we can attempt 2 requests.
834                    .max_duration(ccsr_client.timeout() * 2)
835                    // Canceling because ultimately it's just non-mutating HTTP requests.
836                    .retry_async_canceling(move |state| {
837                        let ccsr_client = Arc::clone(&ccsr_client);
838                        async move {
839                            let res = ccsr_client.get_subject_and_references_by_id(id).await;
840                            match res {
841                                Err(e) => {
842                                    if let Some(timeout) = state.next_backoff {
843                                        warn!(
844                                            "transient failure fetching \
845                                                schema id {}: {:?}, retrying in {:?}",
846                                            id, e, timeout
847                                        );
848                                    }
849                                    Err(anyhow::Error::from(e))
850                                }
851                                _ => Ok(res?),
852                            }
853                        }
854                    })
855                    .run_in_task(|| format!("fetch_avro_schema:{}", id))
856                    .await?;
857
858                // Now, we've gotten some json back, so we want to cache it (regardless of whether it's a valid
859                // avro schema, it won't change).
860                //
861                // However, we can't just cache it directly, since resolving schemas takes significant CPU work,
862                // which we don't want to repeat for every record. So, parse and resolve it, and cache the
863                // result (whether schema or error).
864                let result = Self::parse_with_references(
865                    &primary_subject,
866                    &reference_subjects,
867                    reader_schema,
868                );
869                v.insert(result)
870            }
871        };
872        Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone())))
873    }
874
875    /// Parse a schema along with its references and resolve against the reader schema.
876    fn parse_with_references(
877        primary_subject: &mz_ccsr::Subject,
878        reference_subjects: &[mz_ccsr::Subject],
879        reader_schema: &Schema,
880    ) -> Result<Schema, AvroError> {
881        // Parse referenced schemas incrementally: each reference may depend on previous ones.
882        let mut reference_schemas: Vec<Schema> = Vec::with_capacity(reference_subjects.len());
883        for subject in reference_subjects {
884            let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw)
885                .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
886            let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?;
887            reference_schemas.push(parsed);
888        }
889
890        // Parse primary schema, using references, if present.
891        let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw)
892            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
893        let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?;
894
895        // Schema fingerprints don't actually capture whether two schemas are meaningfully
896        // different, because they strip out logical types. Thus, resolve in all cases.
897        let resolved = resolve_schemas(&schema, reader_schema)?;
898        Ok(resolved)
899    }
900}