Skip to main content

mz_avro/
schema.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for parsing and interacting with schemas in Avro format.
25
26use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92/// Describes errors happened while parsing Avro schemas.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113/// Represents an Avro schema fingerprint
114/// More information about Avro schema fingerprints can be found in the
115/// [Avro Schema Resolution documentation](https://avro.apache.org/docs/++version++/specification/#schema-resolution)
116#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    /// A `null` Avro schema.
174    Null,
175    /// A `boolean` Avro schema.
176    Boolean,
177    /// An `int` Avro schema.
178    Int,
179    /// A `long` Avro schema.
180    Long,
181    /// A `float` Avro schema.
182    Float,
183    /// A `double` Avro schema.
184    Double,
185    /// An `Int` Avro schema with a semantic type being days since the unix epoch.
186    Date,
187    /// An `Int64` Avro schema with a semantic type being milliseconds since the unix epoch.
188    ///
189    /// <https://avro.apache.org/docs/++version++/specification/#time_ms>
190    TimestampMilli,
191    /// An `Int64` Avro schema with a semantic type being microseconds since the unix epoch.
192    ///
193    /// <https://avro.apache.org/docs/++version++/specification/#time-microsecond-precision>
194    TimestampMicro,
195    /// A `bytes` or `fixed` Avro schema with a logical type of `decimal` and
196    /// the specified precision and scale.
197    ///
198    /// If the underlying type is `fixed`,
199    /// the `fixed_size` field specifies the size.
200    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    /// A `bytes` Avro schema.
206    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
207    Bytes,
208    /// A `string` Avro schema.
209    /// `String` represents a unicode character sequence.
210    String,
211    /// A `string` Avro schema that is tagged as representing JSON data
212    Json,
213    /// A `string` Avro schema with a logical type of `uuid`.
214    Uuid,
215    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
216    /// This variant holds the `Schema` for the array element type.
217    Array(Box<SchemaPieceOrNamed>),
218    /// A `map` Avro schema.
219    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
220    /// `Map` keys are assumed to be `string`.
221    Map(Box<SchemaPieceOrNamed>),
222    /// A `union` Avro schema.
223    Union(UnionSchema),
224    /// A value written as `int` and read as `long`,
225    /// for the timestamp-millis logicalType.
226    ResolveIntTsMilli,
227    /// A value written as `int` and read as `long`,
228    /// for the timestamp-micros logicalType.
229    ResolveIntTsMicro,
230    /// A value written as an `int` with `date` logical type,
231    /// and read as any timestamp type
232    ResolveDateTimestamp,
233    /// A value written as `int` and read as `long`
234    ResolveIntLong,
235    /// A value written as `int` and read as `float`
236    ResolveIntFloat,
237    /// A value written as `int` and read as `double`
238    ResolveIntDouble,
239    /// A value written as `long` and read as `float`
240    ResolveLongFloat,
241    /// A value written as `long` and read as `double`
242    ResolveLongDouble,
243    /// A value written as `float` and read as `double`
244    ResolveFloatDouble,
245    /// A concrete (i.e., non-`union`) type in the writer,
246    /// resolved against one specific variant of a `union` in the reader.
247    ResolveConcreteUnion {
248        /// The index of the variant in the reader
249        index: usize,
250        /// The concrete type
251        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    /// A union in the writer, resolved against a union in the reader.
256    /// The two schemas may have different variants and the variants may be in a different order.
257    ResolveUnionUnion {
258        /// A mapping of the fields in the writer to those in the reader.
259        /// If the `i`th element is `Err(e)`, the `i`th field in the writer
260        /// did not match any field in the reader (or even if it matched by name, resolution failed).
261        /// If the `i`th element is `Ok((j, piece))`, then the `i`th field of the writer
262        /// matched the `j`th field of the reader, and `piece` is their resolved node.
263        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    /// The inverse of `ResolveConcreteUnion`
268    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    /// A `record` Avro schema.
273    ///
274    /// The `lookup` table maps field names to their position in the `Vec`
275    /// of `fields`.
276    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    /// An `enum` Avro schema.
282    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        /// The index of the default value.
286        ///
287        /// This is only used in schema resolution: it is the value that
288        /// will be read by a reader when a writer writes a value that the reader
289        /// does not expect.
290        default_idx: Option<usize>,
291    },
292    /// A `fixed` Avro schema.
293    Fixed { size: usize },
294    /// A record in the writer, resolved against a record in the reader.
295    /// The two schemas may have different fields and the fields may be in a different order.
296    ResolveRecord {
297        /// Fields that do not exist in the writer schema, but had a default
298        /// value specified in the reader schema, which we use.
299        defaults: Vec<ResolvedDefaultValueField>,
300        /// Fields in the order of their appearance in the writer schema.
301        /// `Present` if they could be resolved against a field in the reader schema;
302        /// `Absent` otherwise.
303        fields: Vec<ResolvedRecordField>,
304        /// The size of `defaults`, plus the number of `Present` values in `fields`.
305        n_reader_fields: usize,
306    },
307    /// An enum in the writer, resolved against an enum in the reader.
308    /// The two schemas may have different values and the values may be in a different order.
309    ResolveEnum {
310        doc: Documentation,
311        /// Symbols in order of the writer schema along with their index in the reader schema,
312        /// or `Err(symbol_name)` if they don't exist in the reader schema.
313        symbols: Vec<Result<(usize, String), String>>,
314        /// The value to decode if the writer writes some value not expected by the reader.
315        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    /// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
321    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    /// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
325    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    /// Constructs an `avro::Value` if this is of underlying int type.
329    /// Guaranteed to be `Some` when `is_underlying_int` is `true`.
330    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            // TODO[btv] - should we bounds-check the date here? We
334            // don't elsewhere... maybe we should everywhere.
335            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    /// Constructs an `avro::Value` if this is of underlying long type.
340    /// Guaranteed to be `Some` when `is_underlying_long` is `true`.
341    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351/// Represents any valid Avro schema
352/// More information about Avro schemas can be found in the
353/// [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration)
354#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode<'_> {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407/// This type is used to simplify enum variant comparison between `Schema` and `types::Value`.
408///
409/// **NOTE** This type was introduced due to a limitation of `mem::discriminant` requiring a _value_
410/// be constructed in order to get the discriminant, which makes it difficult to implement a
411/// function that maps from `Discriminant<Schema> -> Discriminant<Value>`. Conversion into this
412/// intermediate type should be especially fast, as the number of enum variants is small, which
413/// _should_ compile into a jump-table for the conversion.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    // Fixed-length types
417    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    // Variable-length types
424    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    // This can arise in resolved schemas, particularly when a union resolves to a non-union.
433    // We would need to do a lookup to find the actual type.
434    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521/// Represents names for `record`, `enum` and `fixed` Avro schemas.
522///
523/// Each of these `Schema`s have a `fullname` composed of two parts:
524///   * a name
525///   * a namespace
526///
527/// `aliases` can also be defined, to facilitate schema evolution.
528///
529/// More information about schema names can be found in the
530/// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
531#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    // [XXX] btv - what happens if `name` contains dots, _and_ `namespace` is `Some` ?
546    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    /// Get the shortest unambiguous synonym of this name
573    /// at a given point in the schema graph. If this name
574    /// is in the same namespace as the enclosing node, this
575    /// returns the short name; otherwise, it returns the fully qualified name.
576    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    /// Returns the namespace of the name
584    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595/// Represents documentation for complex Avro schemas.
596pub type Documentation = Option<String>;
597
598impl Name {
599    /// Reports whether the given string is a valid Avro name.
600    ///
601    /// See: <https://avro.apache.org/docs/++version++/specification/#names>
602    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    /// Returns an error if the given name is invalid.
609    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    /// Rewrites `name` to be valid.
622    ///
623    /// Any non alphanumeric characters are replaced with underscores. If the
624    /// name begins with a number, it is prefixed with `_`.
625    ///
626    /// `make_valid` is not injective. Multiple invalid names are mapped to the
627    /// the same valid name.
628    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    /// Parse a `serde_json::map::Map` into a `Name`.
653    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    /// Parses a name from a simple string.
702    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    /// Return the `fullname` of this `Name`
709    ///
710    /// More information about fullnames can be found in the
711    /// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
712    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732/// Represents a `field` in a `record` Avro schema.
733#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    /// Name of the field.
736    pub name: String,
737    /// Documentation of the field.
738    pub doc: Documentation,
739    /// Default value of the field.
740    /// This value will be used when reading Avro datum if schema resolution
741    /// is enabled.
742    pub default: Option<Value>,
743    /// Schema of the field.
744    pub schema: SchemaPieceOrNamed,
745    /// Order of the field.
746    ///
747    /// **NOTE** This currently has no effect.
748    pub order: RecordFieldOrder,
749    /// Position of the field in the list of `field` of its parent `Schema`
750    pub position: usize,
751}
752
753/// Represents any valid order for a `field` in a `record` Avro schema.
754#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    // Used to ensure uniqueness of anonymous schema inputs, and provide constant time finding of the
768    // schema index given a value.
769    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    // Same as above, for named input references
772    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    /// Returns a slice to all variants of this schema.
812    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    /// Returns a mutable slice to all variants of this schema.
817    pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818        &mut self.schemas
819    }
820
821    /// Returns true if the first variant of this `UnionSchema` is `Null`.
822    pub fn is_nullable(&self) -> bool {
823        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824    }
825
826    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827        self.anon_variant_index
828            .get(&SchemaKind::from(sp))
829            .map(|idx| (*idx, &self.schemas[*idx]))
830    }
831
832    pub fn match_ref(
833        &self,
834        other: SchemaPieceRefOrNamed,
835        names_map: &BTreeMap<usize, usize>,
836    ) -> Option<(usize, &SchemaPieceOrNamed)> {
837        match other {
838            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839            SchemaPieceRefOrNamed::Named(idx) => names_map
840                .get(&idx)
841                .and_then(|idx| self.named_variant_index.get(idx))
842                .map(|idx| (*idx, &self.schemas[*idx])),
843        }
844    }
845
846    #[inline(always)]
847    pub fn match_(
848        &self,
849        other: &SchemaPieceOrNamed,
850        names_map: &BTreeMap<usize, usize>,
851    ) -> Option<(usize, &SchemaPieceOrNamed)> {
852        self.match_ref(other.as_ref(), names_map)
853    }
854}
855
856// No need to compare variant_index, it is derivative of schemas.
857impl PartialEq for UnionSchema {
858    fn eq(&self, other: &UnionSchema) -> bool {
859        self.schemas.eq(&other.schemas)
860    }
861}
862
863#[derive(Default)]
864struct SchemaParser {
865    named: Vec<Option<NamedSchemaPiece>>,
866    indices: BTreeMap<FullName, usize>,
867}
868
869impl SchemaParser {
870    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
871        let top = self.parse_inner("", value)?;
872        let SchemaParser { named, indices } = self;
873        Ok(Schema {
874            named: named.into_iter().map(|o| o.unwrap()).collect(),
875            indices,
876            top,
877        })
878    }
879
880    fn parse_inner(
881        &mut self,
882        default_namespace: &str,
883        value: &Value,
884    ) -> Result<SchemaPieceOrNamed, AvroError> {
885        match *value {
886            Value::String(ref t) => {
887                let name = FullName::from_parts(t.as_str(), None, default_namespace);
888                if let Some(idx) = self.indices.get(&name) {
889                    Ok(SchemaPieceOrNamed::Named(*idx))
890                } else {
891                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
892                        t.as_str(),
893                    )?))
894                }
895            }
896            Value::Object(ref data) => self.parse_complex(default_namespace, data),
897            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
898                self.parse_union(default_namespace, data)?,
899            )),
900            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
901        }
902    }
903
904    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
905        let idx = match self.indices.entry(fullname) {
906            Entry::Vacant(ve) => *ve.insert(self.named.len()),
907            Entry::Occupied(oe) => {
908                return Err(ParseSchemaError::new(format!(
909                    "Sub-schema with name {:?} encountered multiple times",
910                    oe.key()
911                ))
912                .into());
913            }
914        };
915        self.named.push(None);
916        Ok(idx)
917    }
918
919    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
920        assert_none!(self.named[index]);
921        self.named[index] = Some(schema);
922    }
923
924    fn parse_named_type(
925        &mut self,
926        type_name: &str,
927        default_namespace: &str,
928        complex: &Map<String, Value>,
929    ) -> Result<usize, AvroError> {
930        let name = Name::parse(complex)?;
931        match name.name.as_str() {
932            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
933                return Err(ParseSchemaError::new(format!(
934                    "{} may not be used as a custom type name",
935                    name.name
936                ))
937                .into());
938            }
939            _ => {}
940        };
941        let fullname = name.fullname(default_namespace);
942        let default_namespace = fullname.namespace.clone();
943        let idx = self.alloc_name(fullname.clone())?;
944        let piece = match type_name {
945            "record" => self.parse_record(&default_namespace, complex),
946            "enum" => self.parse_enum(complex),
947            "fixed" => self.parse_fixed(&default_namespace, complex),
948            _ => unreachable!("Unknown named type kind: {}", type_name),
949        }?;
950
951        self.insert(
952            idx,
953            NamedSchemaPiece {
954                name: fullname,
955                piece,
956            },
957        );
958
959        Ok(idx)
960    }
961
962    /// Parse a `serde_json::Value` representing a complex Avro type into a
963    /// `Schema`.
964    ///
965    /// Avro supports "recursive" definition of types.
966    /// e.g: {"type": {"type": "string"}}
967    fn parse_complex(
968        &mut self,
969        default_namespace: &str,
970        complex: &Map<String, Value>,
971    ) -> Result<SchemaPieceOrNamed, AvroError> {
972        match complex.get("type") {
973            Some(&Value::String(ref t)) => Ok(match t.as_str() {
974                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
975                    t,
976                    default_namespace,
977                    complex,
978                )?),
979                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
980                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
981                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
982                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
983                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
984                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
985                other => {
986                    let name = FullName {
987                        name: other.into(),
988                        namespace: default_namespace.into(),
989                    };
990                    if let Some(idx) = self.indices.get(&name) {
991                        SchemaPieceOrNamed::Named(*idx)
992                    } else {
993                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
994                    }
995                }
996            }),
997            Some(&Value::Object(ref data)) => match data.get("type") {
998                Some(value) => self.parse_inner(default_namespace, value),
999                None => Err(
1000                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1001                ),
1002            },
1003            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1004        }
1005    }
1006
1007    /// Parse a `serde_json::Value` representing a Avro record type into a
1008    /// `Schema`.
1009    fn parse_record(
1010        &mut self,
1011        default_namespace: &str,
1012        complex: &Map<String, Value>,
1013    ) -> Result<SchemaPiece, AvroError> {
1014        let mut lookup = BTreeMap::new();
1015
1016        let fields: Vec<RecordField> = complex
1017            .get("fields")
1018            .and_then(|fields| fields.as_array())
1019            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1020            .and_then(|fields| {
1021                fields
1022                    .iter()
1023                    .filter_map(|field| field.as_object())
1024                    .enumerate()
1025                    .map(|(position, field)| {
1026                        self.parse_record_field(default_namespace, field, position)
1027                    })
1028                    .collect::<Result<_, _>>()
1029            })?;
1030
1031        for field in &fields {
1032            lookup.insert(field.name.clone(), field.position);
1033        }
1034
1035        Ok(SchemaPiece::Record {
1036            doc: complex.doc(),
1037            fields,
1038            lookup,
1039        })
1040    }
1041
1042    /// Parse a `serde_json::Value` into a `RecordField`.
1043    fn parse_record_field(
1044        &mut self,
1045        default_namespace: &str,
1046        field: &Map<String, Value>,
1047        position: usize,
1048    ) -> Result<RecordField, AvroError> {
1049        let name = field
1050            .name()
1051            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1052
1053        Name::validate(&name)?;
1054
1055        let schema = field
1056            .get("type")
1057            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1058            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1059
1060        let default = field.get("default").cloned();
1061
1062        let order = field
1063            .get("order")
1064            .and_then(|order| order.as_str())
1065            .and_then(|order| match order {
1066                "ascending" => Some(RecordFieldOrder::Ascending),
1067                "descending" => Some(RecordFieldOrder::Descending),
1068                "ignore" => Some(RecordFieldOrder::Ignore),
1069                _ => None,
1070            })
1071            .unwrap_or(RecordFieldOrder::Ascending);
1072
1073        Ok(RecordField {
1074            name,
1075            doc: field.doc(),
1076            default,
1077            schema,
1078            order,
1079            position,
1080        })
1081    }
1082
1083    /// Parse a `serde_json::Value` representing a Avro enum type into a
1084    /// `Schema`.
1085    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1086        let symbols: Vec<String> = complex
1087            .get("symbols")
1088            .and_then(|v| v.as_array())
1089            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1090            .and_then(|symbols| {
1091                symbols
1092                    .iter()
1093                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1094                    .collect::<Option<_>>()
1095                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1096            })?;
1097
1098        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1099        for symbol in symbols.iter() {
1100            if unique_symbols.contains(symbol) {
1101                return Err(ParseSchemaError::new(format!(
1102                    "Enum symbols must be unique, found multiple: {}",
1103                    symbol
1104                ))
1105                .into());
1106            } else {
1107                unique_symbols.insert(symbol);
1108            }
1109        }
1110
1111        let default_idx = if let Some(default) = complex.get("default") {
1112            let default_str = default.as_str().ok_or_else(|| {
1113                ParseSchemaError::new(format!(
1114                    "Enum default should be a string, got: {:?}",
1115                    default
1116                ))
1117            })?;
1118            let default_idx = symbols
1119                .iter()
1120                .position(|x| x == default_str)
1121                .ok_or_else(|| {
1122                    ParseSchemaError::new(format!(
1123                        "Enum default not found in list of symbols: {}",
1124                        default_str
1125                    ))
1126                })?;
1127            Some(default_idx)
1128        } else {
1129            None
1130        };
1131
1132        Ok(SchemaPiece::Enum {
1133            doc: complex.doc(),
1134            symbols,
1135            default_idx,
1136        })
1137    }
1138
1139    /// Parse a `serde_json::Value` representing a Avro array type into a
1140    /// `Schema`.
1141    fn parse_array(
1142        &mut self,
1143        default_namespace: &str,
1144        complex: &Map<String, Value>,
1145    ) -> Result<SchemaPiece, AvroError> {
1146        complex
1147            .get("items")
1148            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1149            .and_then(|items| self.parse_inner(default_namespace, items))
1150            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1151    }
1152
1153    /// Parse a `serde_json::Value` representing a Avro map type into a
1154    /// `Schema`.
1155    fn parse_map(
1156        &mut self,
1157        default_namespace: &str,
1158        complex: &Map<String, Value>,
1159    ) -> Result<SchemaPiece, AvroError> {
1160        complex
1161            .get("values")
1162            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1163            .and_then(|items| self.parse_inner(default_namespace, items))
1164            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1165    }
1166
1167    /// Parse a `serde_json::Value` representing a Avro union type into a
1168    /// `Schema`.
1169    fn parse_union(
1170        &mut self,
1171        default_namespace: &str,
1172        items: &[Value],
1173    ) -> Result<SchemaPiece, AvroError> {
1174        items
1175            .iter()
1176            .map(|value| self.parse_inner(default_namespace, value))
1177            .collect::<Result<Vec<_>, _>>()
1178            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1179    }
1180
1181    /// Parse a `serde_json::Value` representing a logical decimal type into a
1182    /// `Schema`.
1183    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1184        let precision = complex
1185            .get("precision")
1186            .and_then(|v| v.as_i64())
1187            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1188
1189        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1190
1191        if scale < 0 {
1192            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1193        }
1194
1195        if precision < 0 {
1196            return Err(
1197                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1198            );
1199        }
1200
1201        if scale > precision {
1202            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1203        }
1204
1205        Ok((precision as usize, scale as usize))
1206    }
1207
1208    /// Parse a `serde_json::Value` representing an Avro bytes type into a
1209    /// `Schema`.
1210    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1211        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1212
1213        if let Some("decimal") = logical_type {
1214            match Self::parse_decimal(complex) {
1215                Ok((precision, scale)) => {
1216                    return Ok(SchemaPiece::Decimal {
1217                        precision,
1218                        scale,
1219                        fixed_size: None,
1220                    });
1221                }
1222                Err(e) => warn!(
1223                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1224                    complex, e
1225                ),
1226            }
1227        }
1228
1229        Ok(SchemaPiece::Bytes)
1230    }
1231
1232    /// Parse a [`serde_json::Value`] representing an Avro Int type
1233    ///
1234    /// If the complex type has a `connect.name` tag (as [emitted by
1235    /// Debezium][1]) that matches a `Date` tag, we specify that the correct
1236    /// schema to use is `Date`.
1237    ///
1238    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1239    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1240        const AVRO_DATE: &str = "date";
1241        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1242        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1243        if let Some(name) = complex.get("connect.name") {
1244            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1245                if name == KAFKA_DATE {
1246                    warn!("using deprecated debezium date format");
1247                }
1248                return Ok(SchemaPiece::Date);
1249            }
1250        }
1251        // Put this after the custom semantic types so that the debezium
1252        // warning is emitted, since the logicalType tag shows up in the
1253        // deprecated debezium format :-/
1254        if let Some(name) = complex.get("logicalType") {
1255            if name == AVRO_DATE {
1256                return Ok(SchemaPiece::Date);
1257            }
1258        }
1259        if !complex.is_empty() {
1260            debug!("parsing complex type as regular int: {:?}", complex);
1261        }
1262        Ok(SchemaPiece::Int)
1263    }
1264
1265    /// Parse a [`serde_json::Value`] representing an Avro Int64/Long type
1266    ///
1267    /// The debezium/kafka types are document at [the debezium site][1], and the
1268    /// avro ones are documented at [Avro][2].
1269    ///
1270    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1271    /// [2]: https://avro.apache.org/docs/++version++/specification/
1272    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1273        const AVRO_MILLI_TS: &str = "timestamp-millis";
1274        const AVRO_MICRO_TS: &str = "timestamp-micros";
1275
1276        const CONNECT_MILLI_TS: &[&str] = &[
1277            "io.debezium.time.Timestamp",
1278            "org.apache.kafka.connect.data.Timestamp",
1279        ];
1280        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1281
1282        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1283            if CONNECT_MILLI_TS.contains(&&**name) {
1284                return Ok(SchemaPiece::TimestampMilli);
1285            }
1286            if name == CONNECT_MICRO_TS {
1287                return Ok(SchemaPiece::TimestampMicro);
1288            }
1289        }
1290        if let Some(name) = complex.get("logicalType") {
1291            if name == AVRO_MILLI_TS {
1292                return Ok(SchemaPiece::TimestampMilli);
1293            }
1294            if name == AVRO_MICRO_TS {
1295                return Ok(SchemaPiece::TimestampMicro);
1296            }
1297        }
1298        if !complex.is_empty() {
1299            debug!("parsing complex type as regular long: {:?}", complex);
1300        }
1301        Ok(SchemaPiece::Long)
1302    }
1303
1304    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1305        const CONNECT_JSON: &str = "io.debezium.data.Json";
1306
1307        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1308            if CONNECT_JSON == name.as_str() {
1309                return SchemaPiece::Json;
1310            }
1311        }
1312        if let Some(name) = complex.get("logicalType") {
1313            if name == "uuid" {
1314                return SchemaPiece::Uuid;
1315            }
1316        }
1317        debug!("parsing complex type as regular string: {:?}", complex);
1318        SchemaPiece::String
1319    }
1320
1321    /// Parse a `serde_json::Value` representing a Avro fixed type into a
1322    /// `Schema`.
1323    fn parse_fixed(
1324        &self,
1325        _default_namespace: &str,
1326        complex: &Map<String, Value>,
1327    ) -> Result<SchemaPiece, AvroError> {
1328        let _name = Name::parse(complex)?;
1329
1330        let size = complex
1331            .get("size")
1332            .and_then(|v| v.as_i64())
1333            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1334        if size <= 0 {
1335            return Err(ParseSchemaError::new(format!(
1336                "Fixed values require a positive size attribute, found: {}",
1337                size
1338            ))
1339            .into());
1340        }
1341
1342        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1343
1344        if let Some("decimal") = logical_type {
1345            match Self::parse_decimal(complex) {
1346                Ok((precision, scale)) => {
1347                    let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1348                    if precision > max {
1349                        warn!(
1350                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1351                            precision, size
1352                        );
1353                    } else {
1354                        return Ok(SchemaPiece::Decimal {
1355                            precision,
1356                            scale,
1357                            fixed_size: Some(size as usize),
1358                        });
1359                    }
1360                }
1361                Err(e) => warn!(
1362                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1363                    complex, e
1364                ),
1365            }
1366        }
1367
1368        Ok(SchemaPiece::Fixed {
1369            size: size as usize,
1370        })
1371    }
1372}
1373
1374impl Schema {
1375    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
1376    /// schema.
1377    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1378        Self::parse_with_references(value, &[])
1379    }
1380
1381    /// Parse an JSON Avro schema with referenced named types.
1382    ///
1383    /// This is used when parsing a schema that references types defined in other
1384    /// schemas (Confluent Schema Registry schema references). Referenced schemas
1385    /// should be provided in dependency order (dependencies first).
1386    ///
1387    /// # Arguments
1388    ///
1389    /// * `primary` - The primary schema JSON value to parse
1390    /// * `reference_schemas` - Schemas whose named types should be available during parsing
1391    pub fn parse_with_references(
1392        primary: &Value,
1393        reference_schemas: &[Schema],
1394    ) -> Result<Self, AvroError> {
1395        // Collect and remap named types from all referenced schemas
1396        let (named, indices) = Self::collect_named_types(reference_schemas);
1397
1398        // Create parser with pre-populated named types
1399        let p = SchemaParser {
1400            named: named.into_iter().map(Some).collect(),
1401            indices,
1402        };
1403        p.parse(primary)
1404    }
1405
1406    /// Collect all named types from a list of schemas, remapping indices as needed.
1407    ///
1408    /// When combining named types from multiple schemas, each schema's internal
1409    /// indices need to be remapped to account for duplicates and new positions.
1410    fn collect_named_types(
1411        schemas: &[Schema],
1412    ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1413        let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1414        let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1415
1416        for schema in schemas {
1417            // First pass: Build the index_map from this schema's indices to combined indices.
1418            // For types that already exist: map to existing combined index
1419            // For new types: map to their future position in combined_named
1420            let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1421            let mut new_type_offset = combined_named.len();
1422
1423            for named_piece in &schema.named {
1424                if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1425                    // Type already exists, use existing index
1426                    index_map.push(existing_idx);
1427                } else {
1428                    // New type, assign next available index
1429                    index_map.push(new_type_offset);
1430                    new_type_offset += 1;
1431                }
1432            }
1433
1434            // Second pass: Add new types with proper remapping
1435            for named_piece in &schema.named {
1436                if combined_indices.contains_key(&named_piece.name) {
1437                    continue;
1438                }
1439
1440                let mut remapped = named_piece.clone();
1441                Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1442
1443                let new_idx = combined_named.len();
1444                combined_indices.insert(remapped.name.clone(), new_idx);
1445                combined_named.push(remapped);
1446            }
1447        }
1448
1449        (combined_named, combined_indices)
1450    }
1451
1452    /// Recursively remap Named indices in a SchemaPiece using an index map.
1453    fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1454        match piece {
1455            SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1456            SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1457            SchemaPiece::Union(union) => {
1458                for variant in union.variants_mut() {
1459                    Self::remap_indices_with_map(variant, index_map);
1460                }
1461            }
1462            SchemaPiece::Record { fields, .. } => {
1463                for field in fields {
1464                    Self::remap_indices_with_map(&mut field.schema, index_map);
1465                }
1466            }
1467            _ => {}
1468        }
1469    }
1470
1471    /// Remap a single SchemaPieceOrNamed using an index map.
1472    fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1473        match item {
1474            SchemaPieceOrNamed::Named(idx) => {
1475                if let Some(&new_idx) = index_map.get(*idx) {
1476                    *idx = new_idx;
1477                }
1478            }
1479            SchemaPieceOrNamed::Piece(piece) => {
1480                Self::remap_indices_in_piece_with_map(piece, index_map)
1481            }
1482        }
1483    }
1484
1485    /// Converts `self` into its [Parsing Canonical Form].
1486    ///
1487    /// [Parsing Canonical Form]:
1488    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1489    pub fn canonical_form(&self) -> String {
1490        let json = serde_json::to_value(self).unwrap();
1491        parsing_canonical_form(&json)
1492    }
1493
1494    /// Generate fingerprint of Schema's [Parsing Canonical Form].
1495    ///
1496    /// [Parsing Canonical Form]:
1497    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1498    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1499        let mut d = D::new();
1500        d.update(self.canonical_form());
1501        SchemaFingerprint {
1502            bytes: d.finalize().to_vec(),
1503        }
1504    }
1505
1506    /// Parse a `serde_json::Value` representing a primitive Avro type into a
1507    /// `Schema`.
1508    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1509        match primitive {
1510            "null" => Ok(SchemaPiece::Null),
1511            "boolean" => Ok(SchemaPiece::Boolean),
1512            "int" => Ok(SchemaPiece::Int),
1513            "long" => Ok(SchemaPiece::Long),
1514            "double" => Ok(SchemaPiece::Double),
1515            "float" => Ok(SchemaPiece::Float),
1516            "bytes" => Ok(SchemaPiece::Bytes),
1517            "string" => Ok(SchemaPiece::String),
1518            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1519        }
1520    }
1521}
1522
1523impl FromStr for Schema {
1524    type Err = AvroError;
1525
1526    /// Create a `Schema` from a string representing a JSON Avro schema.
1527    fn from_str(input: &str) -> Result<Self, AvroError> {
1528        let value = serde_json::from_str(input)
1529            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1530        Self::parse(&value)
1531    }
1532}
1533
1534#[derive(Clone, Debug, PartialEq)]
1535pub struct NamedSchemaPiece {
1536    pub name: FullName,
1537    pub piece: SchemaPiece,
1538}
1539
1540#[derive(Copy, Clone, Debug)]
1541pub struct SchemaNode<'a> {
1542    pub root: &'a Schema,
1543    pub inner: &'a SchemaPiece,
1544    pub name: Option<&'a FullName>,
1545}
1546
1547#[derive(Copy, Clone, Debug)]
1548pub enum SchemaPieceRefOrNamed<'a> {
1549    Piece(&'a SchemaPiece),
1550    Named(usize),
1551}
1552
1553impl<'a> SchemaPieceRefOrNamed<'a> {
1554    pub fn get_human_name(&self, root: &Schema) -> String {
1555        match self {
1556            Self::Piece(piece) => format!("{:?}", piece),
1557            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1558        }
1559    }
1560
1561    #[inline(always)]
1562    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1563        match self {
1564            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1565            SchemaPieceRefOrNamed::Named(index) => {
1566                let named_piece = root.lookup(index);
1567                (&named_piece.piece, Some(&named_piece.name))
1568            }
1569        }
1570    }
1571}
1572
1573#[derive(Copy, Clone, Debug)]
1574pub struct SchemaNodeOrNamed<'a> {
1575    pub root: &'a Schema,
1576    pub inner: SchemaPieceRefOrNamed<'a>,
1577}
1578
1579impl<'a> SchemaNodeOrNamed<'a> {
1580    #[inline(always)]
1581    pub fn lookup(self) -> SchemaNode<'a> {
1582        let (inner, name) = self.inner.get_piece_and_name(self.root);
1583        SchemaNode {
1584            root: self.root,
1585            inner,
1586            name,
1587        }
1588    }
1589    #[inline(always)]
1590    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1591        self.step_ref(next.as_ref())
1592    }
1593    #[inline(always)]
1594    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1595        Self {
1596            root: self.root,
1597            inner: match next {
1598                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1599                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1600            },
1601        }
1602    }
1603
1604    pub fn to_schema(self) -> Schema {
1605        let mut cloner = SchemaSubtreeDeepCloner {
1606            old_root: self.root,
1607            old_to_new_names: Default::default(),
1608            named: Default::default(),
1609        };
1610        let piece = cloner.clone_piece_or_named(self.inner);
1611        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1612        let indices: BTreeMap<FullName, usize> = named
1613            .iter()
1614            .enumerate()
1615            .map(|(i, nsp)| (nsp.name.clone(), i))
1616            .collect();
1617        Schema {
1618            named,
1619            indices,
1620            top: piece,
1621        }
1622    }
1623
1624    pub fn namespace(self) -> Option<&'a str> {
1625        let SchemaNode { name, .. } = self.lookup();
1626        name.map(|FullName { namespace, .. }| namespace.as_str())
1627    }
1628}
1629
1630struct SchemaSubtreeDeepCloner<'a> {
1631    old_root: &'a Schema,
1632    old_to_new_names: BTreeMap<usize, usize>,
1633    named: Vec<Option<NamedSchemaPiece>>,
1634}
1635
1636impl<'a> SchemaSubtreeDeepCloner<'a> {
1637    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1638        match piece {
1639            SchemaPiece::Null => SchemaPiece::Null,
1640            SchemaPiece::Boolean => SchemaPiece::Boolean,
1641            SchemaPiece::Int => SchemaPiece::Int,
1642            SchemaPiece::Long => SchemaPiece::Long,
1643            SchemaPiece::Float => SchemaPiece::Float,
1644            SchemaPiece::Double => SchemaPiece::Double,
1645            SchemaPiece::Date => SchemaPiece::Date,
1646            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1647            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1648            SchemaPiece::Json => SchemaPiece::Json,
1649            SchemaPiece::Decimal {
1650                scale,
1651                precision,
1652                fixed_size,
1653            } => SchemaPiece::Decimal {
1654                scale: *scale,
1655                precision: *precision,
1656                fixed_size: *fixed_size,
1657            },
1658            SchemaPiece::Bytes => SchemaPiece::Bytes,
1659            SchemaPiece::String => SchemaPiece::String,
1660            SchemaPiece::Uuid => SchemaPiece::Uuid,
1661            SchemaPiece::Array(inner) => {
1662                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1663            }
1664            SchemaPiece::Map(inner) => {
1665                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1666            }
1667            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1668                schemas: us
1669                    .schemas
1670                    .iter()
1671                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1672                    .collect(),
1673                anon_variant_index: us.anon_variant_index.clone(),
1674                named_variant_index: us.named_variant_index.clone(),
1675            }),
1676            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1677            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1678            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1679            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1680            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1681            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1682            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1683            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1684            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1685            SchemaPiece::ResolveConcreteUnion {
1686                index,
1687                inner,
1688                n_reader_variants,
1689                reader_null_variant,
1690            } => SchemaPiece::ResolveConcreteUnion {
1691                index: *index,
1692                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1693                n_reader_variants: *n_reader_variants,
1694                reader_null_variant: *reader_null_variant,
1695            },
1696            SchemaPiece::ResolveUnionUnion {
1697                permutation,
1698                n_reader_variants,
1699                reader_null_variant,
1700            } => SchemaPiece::ResolveUnionUnion {
1701                permutation: permutation
1702                    .clone()
1703                    .into_iter()
1704                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1705                    .collect(),
1706                n_reader_variants: *n_reader_variants,
1707                reader_null_variant: *reader_null_variant,
1708            },
1709            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1710                SchemaPiece::ResolveUnionConcrete {
1711                    index: *index,
1712                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1713                }
1714            }
1715            SchemaPiece::Record {
1716                doc,
1717                fields,
1718                lookup,
1719            } => SchemaPiece::Record {
1720                doc: doc.clone(),
1721                fields: fields
1722                    .iter()
1723                    .map(|rf| RecordField {
1724                        name: rf.name.clone(),
1725                        doc: rf.doc.clone(),
1726                        default: rf.default.clone(),
1727                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1728                        order: rf.order,
1729                        position: rf.position,
1730                    })
1731                    .collect(),
1732                lookup: lookup.clone(),
1733            },
1734            SchemaPiece::Enum {
1735                doc,
1736                symbols,
1737                default_idx,
1738            } => SchemaPiece::Enum {
1739                doc: doc.clone(),
1740                symbols: symbols.clone(),
1741                default_idx: *default_idx,
1742            },
1743            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1744            SchemaPiece::ResolveRecord {
1745                defaults,
1746                fields,
1747                n_reader_fields,
1748            } => SchemaPiece::ResolveRecord {
1749                defaults: defaults.clone(),
1750                fields: fields
1751                    .iter()
1752                    .map(|rf| match rf {
1753                        ResolvedRecordField::Present(rf) => {
1754                            ResolvedRecordField::Present(RecordField {
1755                                name: rf.name.clone(),
1756                                doc: rf.doc.clone(),
1757                                default: rf.default.clone(),
1758                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1759                                order: rf.order,
1760                                position: rf.position,
1761                            })
1762                        }
1763                        ResolvedRecordField::Absent(writer_schema) => {
1764                            ResolvedRecordField::Absent(writer_schema.clone())
1765                        }
1766                    })
1767                    .collect(),
1768                n_reader_fields: *n_reader_fields,
1769            },
1770            SchemaPiece::ResolveEnum {
1771                doc,
1772                symbols,
1773                default,
1774            } => SchemaPiece::ResolveEnum {
1775                doc: doc.clone(),
1776                symbols: symbols.clone(),
1777                default: default.clone(),
1778            },
1779        }
1780    }
1781    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1782        match piece {
1783            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1784            SchemaPieceRefOrNamed::Named(index) => {
1785                let new_index = match self.old_to_new_names.entry(index) {
1786                    Entry::Vacant(ve) => {
1787                        let new_index = self.named.len();
1788                        self.named.push(None);
1789                        ve.insert(new_index);
1790                        let old_named_piece = self.old_root.lookup(index);
1791                        let new_named_piece = NamedSchemaPiece {
1792                            name: old_named_piece.name.clone(),
1793                            piece: self.clone_piece(&old_named_piece.piece),
1794                        };
1795                        self.named[new_index] = Some(new_named_piece);
1796                        new_index
1797                    }
1798                    Entry::Occupied(oe) => *oe.get(),
1799                };
1800                SchemaPieceOrNamed::Named(new_index)
1801            }
1802        }
1803    }
1804}
1805
1806impl<'a> SchemaNode<'a> {
1807    #[inline(always)]
1808    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1809        let (inner, name) = next.get_piece_and_name(self.root);
1810        Self {
1811            root: self.root,
1812            inner,
1813            name,
1814        }
1815    }
1816
1817    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1818        use serde_json::Value::*;
1819        let val = match (json, self.inner) {
1820            // A default value always matches the first variant of a union
1821            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1822                Some(variant) => AvroValue::Union {
1823                    index: 0,
1824                    inner: Box::new(self.step(variant).json_to_value(json)?),
1825                    n_variants: us.schemas.len(),
1826                    null_variant: us
1827                        .schemas
1828                        .iter()
1829                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1830                },
1831                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1832            },
1833            (Null, SchemaPiece::Null) => AvroValue::Null,
1834            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1835            (Number(n), piece) => {
1836                match piece {
1837                    piece if piece.is_underlying_int() => {
1838                        let i =
1839                            n.as_i64()
1840                                .and_then(|i| i32::try_from(i).ok())
1841                                .ok_or_else(|| {
1842                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1843                                })?;
1844                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1845                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1846                        })?
1847                    }
1848                    piece if piece.is_underlying_long() => {
1849                        let i = n.as_i64().ok_or_else(|| {
1850                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1851                        })?;
1852                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1853                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1854                        })?
1855                    }
1856                    SchemaPiece::Float => {
1857                        let f = n.as_f64().ok_or_else(|| {
1858                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1859                        })?;
1860                        AvroValue::Float(f as f32)
1861                    }
1862                    SchemaPiece::Double => {
1863                        let f = n.as_f64().ok_or_else(|| {
1864                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1865                        })?;
1866                        AvroValue::Double(f)
1867                    }
1868                    _ => {
1869                        return Err(ParseSchemaError(format!(
1870                            "Unexpected number in default: {}",
1871                            n
1872                        )));
1873                    }
1874                }
1875            }
1876            (String(s), piece)
1877                if s.eq_ignore_ascii_case("nan")
1878                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1879            {
1880                match piece {
1881                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1882                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1883                    _ => unreachable!(),
1884                }
1885            }
1886            (String(s), piece)
1887                if s.eq_ignore_ascii_case("infinity")
1888                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1889            {
1890                match piece {
1891                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1892                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1893                    _ => unreachable!(),
1894                }
1895            }
1896            (String(s), piece)
1897                if s.eq_ignore_ascii_case("-infinity")
1898                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1899            {
1900                match piece {
1901                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1902                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1903                    _ => unreachable!(),
1904                }
1905            }
1906            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1907            (
1908                String(s),
1909                SchemaPiece::Decimal {
1910                    precision, scale, ..
1911                },
1912            ) => AvroValue::Decimal(DecimalValue {
1913                precision: *precision,
1914                scale: *scale,
1915                unscaled: s.clone().into_bytes(),
1916            }),
1917            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1918            (Object(map), SchemaPiece::Record { fields, .. }) => {
1919                let field_values = fields
1920                    .iter()
1921                    .map(|rf| {
1922                        let jval = map.get(&rf.name).ok_or_else(|| {
1923                            ParseSchemaError(format!(
1924                                "Field not found in default value: {}",
1925                                rf.name
1926                            ))
1927                        })?;
1928                        let value = self.step(&rf.schema).json_to_value(jval)?;
1929                        Ok((rf.name.clone(), value))
1930                    })
1931                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1932                AvroValue::Record(field_values)
1933            }
1934            (String(s), SchemaPiece::Enum { symbols, .. }) => {
1935                match symbols.iter().find_position(|sym| s == *sym) {
1936                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1937                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1938                }
1939            }
1940            (Array(vals), SchemaPiece::Array(inner)) => {
1941                let node = self.step(&**inner);
1942                let vals = vals
1943                    .iter()
1944                    .map(|val| node.json_to_value(val))
1945                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1946                AvroValue::Array(vals)
1947            }
1948            (Object(map), SchemaPiece::Map(inner)) => {
1949                let node = self.step(&**inner);
1950                let map = map
1951                    .iter()
1952                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1953                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1954                AvroValue::Map(map)
1955            }
1956            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1957                AvroValue::Fixed(*size, s.clone().into_bytes())
1958            }
1959            _ => {
1960                return Err(ParseSchemaError(format!(
1961                    "Json default value {} does not match schema",
1962                    json
1963                )));
1964            }
1965        };
1966        Ok(val)
1967    }
1968}
1969
1970#[derive(Clone)]
1971struct SchemaSerContext<'a> {
1972    node: SchemaNodeOrNamed<'a>,
1973    // This does not logically need Rc<RefCell<_>> semantics --
1974    // it is only ever mutated in one stack frame at a time.
1975    // But AFAICT serde doesn't expose a way to
1976    // provide some mutable context to every node in the tree...
1977    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1978    /// The namespace of this node's parent, or "" by default
1979    enclosing_ns: &'a str,
1980}
1981
1982#[derive(Clone)]
1983struct RecordFieldSerContext<'a> {
1984    outer: &'a SchemaSerContext<'a>,
1985    inner: &'a RecordField,
1986}
1987
1988impl<'a> SchemaSerContext<'a> {
1989    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1990        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1991        Self {
1992            node: self.node.step_ref(next),
1993            seen_named: Rc::clone(&self.seen_named),
1994            enclosing_ns: ns,
1995        }
1996    }
1997}
1998
1999impl<'a> Serialize for SchemaSerContext<'a> {
2000    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2001    where
2002        S: Serializer,
2003    {
2004        match self.node.inner {
2005            SchemaPieceRefOrNamed::Piece(piece) => match piece {
2006                SchemaPiece::Null => serializer.serialize_str("null"),
2007                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2008                SchemaPiece::Int => serializer.serialize_str("int"),
2009                SchemaPiece::Long => serializer.serialize_str("long"),
2010                SchemaPiece::Float => serializer.serialize_str("float"),
2011                SchemaPiece::Double => serializer.serialize_str("double"),
2012                SchemaPiece::Date => {
2013                    let mut map = serializer.serialize_map(Some(2))?;
2014                    map.serialize_entry("type", "int")?;
2015                    map.serialize_entry("logicalType", "date")?;
2016                    map.end()
2017                }
2018                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2019                    let mut map = serializer.serialize_map(Some(2))?;
2020                    map.serialize_entry("type", "long")?;
2021                    if piece == &SchemaPiece::TimestampMilli {
2022                        map.serialize_entry("logicalType", "timestamp-millis")?;
2023                    } else {
2024                        map.serialize_entry("logicalType", "timestamp-micros")?;
2025                    }
2026                    map.end()
2027                }
2028                SchemaPiece::Decimal {
2029                    precision,
2030                    scale,
2031                    fixed_size: None,
2032                } => {
2033                    let mut map = serializer.serialize_map(Some(4))?;
2034                    map.serialize_entry("type", "bytes")?;
2035                    map.serialize_entry("precision", precision)?;
2036                    map.serialize_entry("scale", scale)?;
2037                    map.serialize_entry("logicalType", "decimal")?;
2038                    map.end()
2039                }
2040                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2041                SchemaPiece::String => serializer.serialize_str("string"),
2042                SchemaPiece::Array(inner) => {
2043                    let mut map = serializer.serialize_map(Some(2))?;
2044                    map.serialize_entry("type", "array")?;
2045                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2046                    map.end()
2047                }
2048                SchemaPiece::Map(inner) => {
2049                    let mut map = serializer.serialize_map(Some(2))?;
2050                    map.serialize_entry("type", "map")?;
2051                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2052                    map.end()
2053                }
2054                SchemaPiece::Union(inner) => {
2055                    let variants = inner.variants();
2056                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2057                    for v in variants {
2058                        seq.serialize_element(&self.step(v.as_ref()))?;
2059                    }
2060                    seq.end()
2061                }
2062                SchemaPiece::Json => {
2063                    let mut map = serializer.serialize_map(Some(2))?;
2064                    map.serialize_entry("type", "string")?;
2065                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2066                    map.end()
2067                }
2068                SchemaPiece::Uuid => {
2069                    let mut map = serializer.serialize_map(Some(4))?;
2070                    map.serialize_entry("type", "string")?;
2071                    map.serialize_entry("logicalType", "uuid")?;
2072                    map.end()
2073                }
2074                SchemaPiece::Record { .. }
2075                | SchemaPiece::Decimal {
2076                    fixed_size: Some(_),
2077                    ..
2078                }
2079                | SchemaPiece::Enum { .. }
2080                | SchemaPiece::Fixed { .. } => {
2081                    unreachable!("Unexpected named schema piece in anonymous schema position")
2082                }
2083                SchemaPiece::ResolveIntLong
2084                | SchemaPiece::ResolveDateTimestamp
2085                | SchemaPiece::ResolveIntFloat
2086                | SchemaPiece::ResolveIntDouble
2087                | SchemaPiece::ResolveLongFloat
2088                | SchemaPiece::ResolveLongDouble
2089                | SchemaPiece::ResolveFloatDouble
2090                | SchemaPiece::ResolveConcreteUnion { .. }
2091                | SchemaPiece::ResolveUnionUnion { .. }
2092                | SchemaPiece::ResolveUnionConcrete { .. }
2093                | SchemaPiece::ResolveRecord { .. }
2094                | SchemaPiece::ResolveIntTsMicro
2095                | SchemaPiece::ResolveIntTsMilli
2096                | SchemaPiece::ResolveEnum { .. } => {
2097                    panic!("Attempted to serialize resolved schema")
2098                }
2099            },
2100            SchemaPieceRefOrNamed::Named(index) => {
2101                let mut map = self.seen_named.borrow_mut();
2102                let named_piece = match map.get(&index) {
2103                    Some(name) => {
2104                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2105                    }
2106                    None => self.node.root.lookup(index),
2107                };
2108                let name = &named_piece.name;
2109                map.insert(index, name.clone());
2110                std::mem::drop(map);
2111                match &named_piece.piece {
2112                    SchemaPiece::Record { doc, fields, .. } => {
2113                        let mut map = serializer.serialize_map(None)?;
2114                        map.serialize_entry("type", "record")?;
2115                        map.serialize_entry("name", &name.name)?;
2116                        if self.enclosing_ns != &name.namespace {
2117                            map.serialize_entry("namespace", &name.namespace)?;
2118                        }
2119                        if let Some(docstr) = doc {
2120                            map.serialize_entry("doc", docstr)?;
2121                        }
2122                        // TODO (brennan) - serialize aliases
2123                        map.serialize_entry(
2124                            "fields",
2125                            &fields
2126                                .iter()
2127                                .map(|f| RecordFieldSerContext {
2128                                    outer: self,
2129                                    inner: f,
2130                                })
2131                                .collect::<Vec<_>>(),
2132                        )?;
2133                        map.end()
2134                    }
2135                    SchemaPiece::Enum {
2136                        symbols,
2137                        default_idx,
2138                        ..
2139                    } => {
2140                        let mut map = serializer.serialize_map(None)?;
2141                        map.serialize_entry("type", "enum")?;
2142                        map.serialize_entry("name", &name.name)?;
2143                        if self.enclosing_ns != &name.namespace {
2144                            map.serialize_entry("namespace", &name.namespace)?;
2145                        }
2146                        map.serialize_entry("symbols", symbols)?;
2147                        if let Some(default_idx) = *default_idx {
2148                            assert!(default_idx < symbols.len());
2149                            map.serialize_entry("default", &symbols[default_idx])?;
2150                        }
2151                        map.end()
2152                    }
2153                    SchemaPiece::Fixed { size } => {
2154                        let mut map = serializer.serialize_map(None)?;
2155                        map.serialize_entry("type", "fixed")?;
2156                        map.serialize_entry("name", &name.name)?;
2157                        if self.enclosing_ns != &name.namespace {
2158                            map.serialize_entry("namespace", &name.namespace)?;
2159                        }
2160                        map.serialize_entry("size", size)?;
2161                        map.end()
2162                    }
2163                    SchemaPiece::Decimal {
2164                        scale,
2165                        precision,
2166                        fixed_size: Some(size),
2167                    } => {
2168                        let mut map = serializer.serialize_map(Some(6))?;
2169                        map.serialize_entry("type", "fixed")?;
2170                        map.serialize_entry("logicalType", "decimal")?;
2171                        map.serialize_entry("name", &name.name)?;
2172                        if self.enclosing_ns != &name.namespace {
2173                            map.serialize_entry("namespace", &name.namespace)?;
2174                        }
2175                        map.serialize_entry("size", size)?;
2176                        map.serialize_entry("precision", precision)?;
2177                        map.serialize_entry("scale", scale)?;
2178                        map.end()
2179                    }
2180                    SchemaPiece::Null
2181                    | SchemaPiece::Boolean
2182                    | SchemaPiece::Int
2183                    | SchemaPiece::Long
2184                    | SchemaPiece::Float
2185                    | SchemaPiece::Double
2186                    | SchemaPiece::Date
2187                    | SchemaPiece::TimestampMilli
2188                    | SchemaPiece::TimestampMicro
2189                    | SchemaPiece::Decimal {
2190                        fixed_size: None, ..
2191                    }
2192                    | SchemaPiece::Bytes
2193                    | SchemaPiece::String
2194                    | SchemaPiece::Array(_)
2195                    | SchemaPiece::Map(_)
2196                    | SchemaPiece::Union(_)
2197                    | SchemaPiece::Uuid
2198                    | SchemaPiece::Json => {
2199                        unreachable!("Unexpected anonymous schema piece in named schema position")
2200                    }
2201                    SchemaPiece::ResolveIntLong
2202                    | SchemaPiece::ResolveDateTimestamp
2203                    | SchemaPiece::ResolveIntFloat
2204                    | SchemaPiece::ResolveIntDouble
2205                    | SchemaPiece::ResolveLongFloat
2206                    | SchemaPiece::ResolveLongDouble
2207                    | SchemaPiece::ResolveFloatDouble
2208                    | SchemaPiece::ResolveConcreteUnion { .. }
2209                    | SchemaPiece::ResolveUnionUnion { .. }
2210                    | SchemaPiece::ResolveUnionConcrete { .. }
2211                    | SchemaPiece::ResolveRecord { .. }
2212                    | SchemaPiece::ResolveIntTsMilli
2213                    | SchemaPiece::ResolveIntTsMicro
2214                    | SchemaPiece::ResolveEnum { .. } => {
2215                        panic!("Attempted to serialize resolved schema")
2216                    }
2217                }
2218            }
2219        }
2220    }
2221}
2222
2223impl Serialize for Schema {
2224    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2225    where
2226        S: Serializer,
2227    {
2228        let ctx = SchemaSerContext {
2229            node: SchemaNodeOrNamed {
2230                root: self,
2231                inner: self.top.as_ref(),
2232            },
2233            seen_named: Rc::new(RefCell::new(Default::default())),
2234            enclosing_ns: "",
2235        };
2236        ctx.serialize(serializer)
2237    }
2238}
2239
2240impl<'a> Serialize for RecordFieldSerContext<'a> {
2241    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2242    where
2243        S: Serializer,
2244    {
2245        let mut map = serializer.serialize_map(None)?;
2246        map.serialize_entry("name", &self.inner.name)?;
2247        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2248        if let Some(default) = &self.inner.default {
2249            map.serialize_entry("default", default)?;
2250        }
2251        if let Some(doc) = &self.inner.doc {
2252            map.serialize_entry("doc", doc)?;
2253        }
2254        map.end()
2255    }
2256}
2257
2258/// Parses a **valid** avro schema into the Parsing Canonical Form.
2259/// <https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas>
2260fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2261    pcf(schema, "", false)
2262}
2263
2264fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2265    match schema {
2266        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2267        serde_json::Value::String(s) => pcf_string(s),
2268        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2269        serde_json::Value::Number(n) => n.to_string(),
2270        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2271    }
2272}
2273
2274fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2275    // Look for the namespace variant up front.
2276    let default_ns = schema
2277        .get("namespace")
2278        .and_then(|v| v.as_str())
2279        .unwrap_or(enclosing_ns);
2280    let mut fields = Vec::new();
2281    let mut found_next_ns = None;
2282    let mut deferred_values = vec![];
2283    for (k, v) in schema {
2284        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
2285        if schema.len() == 1 && k == "type" {
2286            // Invariant: function is only callable from a valid schema, so this is acceptable.
2287            if let serde_json::Value::String(s) = v {
2288                return pcf_string(s);
2289            }
2290        }
2291
2292        // Strip out unused fields ([STRIP] rule)
2293        if field_ordering_position(k).is_none() {
2294            continue;
2295        }
2296
2297        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
2298        if k == "name" {
2299            // The `fields` stanza needs special handling, as it has "name"
2300            // fields that don't get canonicalized (since they are not type names).
2301            if in_fields {
2302                fields.push((
2303                    k,
2304                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2305                ));
2306                continue;
2307            }
2308            // Invariant: Only valid schemas. Must be a string.
2309            let name = v.as_str().unwrap();
2310            assert!(
2311                found_next_ns.is_none(),
2312                "`name` must not be specified multiple times"
2313            );
2314            let next_ns = match name.rsplit_once('.') {
2315                None => default_ns,
2316                Some((ns, _name)) => ns,
2317            };
2318            found_next_ns = Some(next_ns);
2319            let n = if next_ns.is_empty() {
2320                Cow::Borrowed(name)
2321            } else {
2322                Cow::Owned(format!("{next_ns}.{name}"))
2323            };
2324            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2325            continue;
2326        }
2327
2328        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
2329        if k == "size" {
2330            let i = match v.as_str() {
2331                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2332                None => v.as_i64().unwrap(),
2333            };
2334            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2335            continue;
2336        }
2337
2338        // For anything else, recursively process the result
2339        // (deferred until we know the namespace)
2340        deferred_values.push((k, v));
2341    }
2342
2343    let next_ns = found_next_ns.unwrap_or(default_ns);
2344    for (k, v) in deferred_values {
2345        fields.push((
2346            k,
2347            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2348        ));
2349    }
2350
2351    // Sort the fields by their canonical ordering ([ORDER] rule).
2352    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2353    let inter = fields
2354        .into_iter()
2355        .map(|(_, v)| v)
2356        .collect::<Vec<_>>()
2357        .join(",");
2358    format!("{{{}}}", inter)
2359}
2360
2361fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2362    let inter = arr
2363        .iter()
2364        .map(|s| pcf(s, enclosing_ns, in_fields))
2365        .collect::<Vec<String>>()
2366        .join(",");
2367    format!("[{}]", inter)
2368}
2369
2370fn pcf_string(s: &str) -> String {
2371    format!("\"{}\"", s)
2372}
2373
2374// Used to define the ordering and inclusion of fields.
2375fn field_ordering_position(field: &str) -> Option<usize> {
2376    let v = match field {
2377        "name" => 1,
2378        "type" => 2,
2379        "fields" => 3,
2380        "symbols" => 4,
2381        "items" => 5,
2382        "values" => 6,
2383        "size" => 7,
2384        _ => return None,
2385    };
2386
2387    Some(v)
2388}
2389
2390#[cfg(test)]
2391mod tests {
2392    use mz_ore::{assert_err, assert_ok};
2393
2394    use crate::types::{Record, ToAvro};
2395
2396    use super::*;
2397
2398    fn check_schema(schema: &str, expected: SchemaPiece) {
2399        let schema = Schema::from_str(schema).unwrap();
2400        assert_eq!(&expected, schema.top_node().inner);
2401
2402        // Test serialization round trip
2403        let schema = serde_json::to_string(&schema).unwrap();
2404        let schema = Schema::from_str(&schema).unwrap();
2405        assert_eq!(&expected, schema.top_node().inner);
2406    }
2407
2408    #[mz_ore::test]
2409    fn test_primitive_schema() {
2410        check_schema("\"null\"", SchemaPiece::Null);
2411        check_schema("\"int\"", SchemaPiece::Int);
2412        check_schema("\"double\"", SchemaPiece::Double);
2413    }
2414
2415    #[mz_ore::test]
2416    fn test_array_schema() {
2417        check_schema(
2418            r#"{"type": "array", "items": "string"}"#,
2419            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2420        );
2421    }
2422
2423    #[mz_ore::test]
2424    fn test_map_schema() {
2425        check_schema(
2426            r#"{"type": "map", "values": "double"}"#,
2427            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2428        );
2429    }
2430
2431    #[mz_ore::test]
2432    fn test_union_schema() {
2433        check_schema(
2434            r#"["null", "int"]"#,
2435            SchemaPiece::Union(
2436                UnionSchema::new(vec![
2437                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2438                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2439                ])
2440                .unwrap(),
2441            ),
2442        );
2443    }
2444
2445    #[mz_ore::test]
2446    fn test_multi_union_schema() {
2447        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2448        assert_ok!(schema);
2449        let schema = schema.unwrap();
2450        let node = schema.top_node();
2451        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2452        let union_schema = match node.inner {
2453            SchemaPiece::Union(u) => u,
2454            _ => unreachable!(),
2455        };
2456        assert_eq!(union_schema.variants().len(), 5);
2457        let mut variants = union_schema.variants().iter();
2458        assert_eq!(
2459            SchemaKind::from(node.step(variants.next().unwrap())),
2460            SchemaKind::Null
2461        );
2462        assert_eq!(
2463            SchemaKind::from(node.step(variants.next().unwrap())),
2464            SchemaKind::Int
2465        );
2466        assert_eq!(
2467            SchemaKind::from(node.step(variants.next().unwrap())),
2468            SchemaKind::Float
2469        );
2470        assert_eq!(
2471            SchemaKind::from(node.step(variants.next().unwrap())),
2472            SchemaKind::String
2473        );
2474        assert_eq!(
2475            SchemaKind::from(node.step(variants.next().unwrap())),
2476            SchemaKind::Bytes
2477        );
2478        assert_eq!(variants.next(), None);
2479    }
2480
2481    #[mz_ore::test]
2482    fn test_record_schema() {
2483        let schema = r#"
2484                {
2485                    "type": "record",
2486                    "name": "test",
2487                    "doc": "record doc",
2488                    "fields": [
2489                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2490                        {"name": "b", "doc": "b doc", "type": "string"}
2491                    ]
2492                }
2493            "#;
2494
2495        let mut lookup = BTreeMap::new();
2496        lookup.insert("a".to_owned(), 0);
2497        lookup.insert("b".to_owned(), 1);
2498
2499        let expected = SchemaPiece::Record {
2500            doc: Some("record doc".to_string()),
2501            fields: vec![
2502                RecordField {
2503                    name: "a".to_string(),
2504                    doc: Some("a doc".to_string()),
2505                    default: Some(Value::Number(42i64.into())),
2506                    schema: SchemaPiece::Long.into(),
2507                    order: RecordFieldOrder::Ascending,
2508                    position: 0,
2509                },
2510                RecordField {
2511                    name: "b".to_string(),
2512                    doc: Some("b doc".to_string()),
2513                    default: None,
2514                    schema: SchemaPiece::String.into(),
2515                    order: RecordFieldOrder::Ascending,
2516                    position: 1,
2517                },
2518            ],
2519            lookup,
2520        };
2521
2522        check_schema(schema, expected);
2523    }
2524
2525    #[mz_ore::test]
2526    fn test_enum_schema() {
2527        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2528
2529        let expected = SchemaPiece::Enum {
2530            doc: None,
2531            symbols: vec![
2532                "diamonds".to_owned(),
2533                "spades".to_owned(),
2534                "jokers".to_owned(),
2535                "clubs".to_owned(),
2536                "hearts".to_owned(),
2537            ],
2538            default_idx: Some(2),
2539        };
2540
2541        check_schema(schema, expected);
2542
2543        let bad_schema = Schema::from_str(
2544            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2545        );
2546
2547        assert_err!(bad_schema);
2548    }
2549
2550    #[mz_ore::test]
2551    fn test_fixed_schema() {
2552        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2553
2554        let expected = SchemaPiece::Fixed { size: 16usize };
2555
2556        check_schema(schema, expected);
2557    }
2558
2559    #[mz_ore::test]
2560    fn test_date_schema() {
2561        let kinds = &[
2562            r#"{
2563                    "type": "int",
2564                    "name": "datish",
2565                    "logicalType": "date"
2566                }"#,
2567            r#"{
2568                    "type": "int",
2569                    "name": "datish",
2570                    "connect.name": "io.debezium.time.Date"
2571                }"#,
2572            r#"{
2573                    "type": "int",
2574                    "name": "datish",
2575                    "connect.name": "org.apache.kafka.connect.data.Date"
2576                }"#,
2577        ];
2578        for kind in kinds {
2579            check_schema(*kind, SchemaPiece::Date);
2580
2581            let schema = Schema::from_str(*kind).unwrap();
2582            assert_eq!(
2583                serde_json::to_string(&schema).unwrap(),
2584                r#"{"type":"int","logicalType":"date"}"#
2585            );
2586        }
2587    }
2588
2589    #[mz_ore::test]
2590    fn new_field_in_middle() {
2591        let reader = r#"{
2592            "type": "record",
2593            "name": "MyRecord",
2594            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2595        }"#;
2596        let writer = r#"{
2597            "type": "record",
2598            "name": "MyRecord",
2599            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2600        }"#;
2601        let reader = Schema::from_str(reader).unwrap();
2602        let writer = Schema::from_str(writer).unwrap();
2603
2604        let mut record = Record::new(writer.top_node()).unwrap();
2605        record.put("f1", 1);
2606        record.put("f2", 2);
2607        record.put("f_interposed", 42);
2608
2609        let value = record.avro();
2610
2611        let mut buf = vec![];
2612        crate::encode::encode(&value, &writer, &mut buf);
2613
2614        let resolved = resolve_schemas(&writer, &reader).unwrap();
2615
2616        let reader = &mut &buf[..];
2617        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2618        let expected = crate::types::Value::Record(vec![
2619            ("f1".to_string(), crate::types::Value::Int(1)),
2620            ("f2".to_string(), crate::types::Value::Int(2)),
2621        ]);
2622        assert_eq!(reader_value, expected);
2623        assert!(reader.is_empty()); // all bytes should have been consumed
2624    }
2625
2626    #[mz_ore::test]
2627    fn new_field_at_end() {
2628        let reader = r#"{
2629            "type": "record",
2630            "name": "MyRecord",
2631            "fields": [{"name": "f1", "type": "int"}]
2632        }"#;
2633        let writer = r#"{
2634            "type": "record",
2635            "name": "MyRecord",
2636            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2637        }"#;
2638        let reader = Schema::from_str(reader).unwrap();
2639        let writer = Schema::from_str(writer).unwrap();
2640
2641        let mut record = Record::new(writer.top_node()).unwrap();
2642        record.put("f1", 1);
2643        record.put("f2", 2);
2644
2645        let value = record.avro();
2646
2647        let mut buf = vec![];
2648        crate::encode::encode(&value, &writer, &mut buf);
2649
2650        let resolved = resolve_schemas(&writer, &reader).unwrap();
2651
2652        let reader = &mut &buf[..];
2653        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2654        let expected =
2655            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2656        assert_eq!(reader_value, expected);
2657        assert!(reader.is_empty()); // all bytes should have been consumed
2658    }
2659
2660    #[mz_ore::test]
2661    fn default_non_nums() {
2662        let reader = r#"{
2663            "type": "record",
2664            "name": "MyRecord",
2665            "fields": [
2666                {"name": "f1", "type": "double", "default": "NaN"},
2667                {"name": "f2", "type": "double", "default": "Infinity"},
2668                {"name": "f3", "type": "double", "default": "-Infinity"}
2669            ]
2670        }
2671        "#;
2672        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2673
2674        let writer_schema = Schema::from_str(writer).unwrap();
2675        let reader_schema = Schema::from_str(reader).unwrap();
2676        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2677
2678        let record = Record::new(writer_schema.top_node()).unwrap();
2679
2680        let value = record.avro();
2681        let mut buf = vec![];
2682        crate::encode::encode(&value, &writer_schema, &mut buf);
2683
2684        let reader = &mut &buf[..];
2685        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2686        let expected = crate::types::Value::Record(vec![
2687            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2688            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2689            (
2690                "f3".to_string(),
2691                crate::types::Value::Double(f64::NEG_INFINITY),
2692            ),
2693        ]);
2694
2695        #[derive(Debug)]
2696        struct NanEq(crate::types::Value);
2697        impl std::cmp::PartialEq for NanEq {
2698            fn eq(&self, other: &Self) -> bool {
2699                match (self, other) {
2700                    (
2701                        NanEq(crate::types::Value::Double(x)),
2702                        NanEq(crate::types::Value::Double(y)),
2703                    ) if x.is_nan() && y.is_nan() => true,
2704                    (
2705                        NanEq(crate::types::Value::Float(x)),
2706                        NanEq(crate::types::Value::Float(y)),
2707                    ) if x.is_nan() && y.is_nan() => true,
2708                    (
2709                        NanEq(crate::types::Value::Record(xs)),
2710                        NanEq(crate::types::Value::Record(ys)),
2711                    ) => {
2712                        let xs = xs
2713                            .iter()
2714                            .cloned()
2715                            .map(|(k, v)| (k, NanEq(v)))
2716                            .collect::<Vec<_>>();
2717                        let ys = ys
2718                            .iter()
2719                            .cloned()
2720                            .map(|(k, v)| (k, NanEq(v)))
2721                            .collect::<Vec<_>>();
2722
2723                        xs == ys
2724                    }
2725                    (NanEq(x), NanEq(y)) => x == y,
2726                }
2727            }
2728        }
2729
2730        assert_eq!(NanEq(reader_value), NanEq(expected));
2731        assert!(reader.is_empty());
2732    }
2733
2734    #[mz_ore::test]
2735    fn test_decimal_schemas() {
2736        let schema = r#"{
2737                "type": "fixed",
2738                "name": "dec",
2739                "size": 8,
2740                "logicalType": "decimal",
2741                "precision": 12,
2742                "scale": 5
2743            }"#;
2744        let expected = SchemaPiece::Decimal {
2745            precision: 12,
2746            scale: 5,
2747            fixed_size: Some(8),
2748        };
2749        check_schema(schema, expected);
2750
2751        let schema = r#"{
2752                "type": "bytes",
2753                "logicalType": "decimal",
2754                "precision": 12,
2755                "scale": 5
2756            }"#;
2757        let expected = SchemaPiece::Decimal {
2758            precision: 12,
2759            scale: 5,
2760            fixed_size: None,
2761        };
2762        check_schema(schema, expected);
2763
2764        let res = Schema::from_str(
2765            r#"["bytes", {
2766                "type": "bytes",
2767                "logicalType": "decimal",
2768                "precision": 12,
2769                "scale": 5
2770            }]"#,
2771        );
2772        assert_eq!(
2773            res.unwrap_err().to_string(),
2774            "Schema parse error: Unions cannot contain duplicate types"
2775        );
2776
2777        let writer_schema = Schema::from_str(
2778            r#"["null", {
2779                "type": "bytes"
2780            }]"#,
2781        )
2782        .unwrap();
2783        let reader_schema = Schema::from_str(
2784            r#"["null", {
2785                "type": "bytes",
2786                "logicalType": "decimal",
2787                "precision": 12,
2788                "scale": 5
2789            }]"#,
2790        )
2791        .unwrap();
2792        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2793
2794        let expected = SchemaPiece::ResolveUnionUnion {
2795            permutation: vec![
2796                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2797                Ok((
2798                    1,
2799                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2800                        precision: 12,
2801                        scale: 5,
2802                        fixed_size: None,
2803                    }),
2804                )),
2805            ],
2806            n_reader_variants: 2,
2807            reader_null_variant: Some(0),
2808        };
2809        assert_eq!(resolved.top_node().inner, &expected);
2810    }
2811
2812    #[mz_ore::test]
2813    fn test_no_documentation() {
2814        let schema =
2815            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2816                .unwrap();
2817
2818        let doc = match schema.top_node().inner {
2819            SchemaPiece::Enum { doc, .. } => doc.clone(),
2820            _ => panic!(),
2821        };
2822
2823        assert_none!(doc);
2824    }
2825
2826    #[mz_ore::test]
2827    fn test_documentation() {
2828        let schema = Schema::from_str(
2829                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2830            ).unwrap();
2831
2832        let doc = match schema.top_node().inner {
2833            SchemaPiece::Enum { doc, .. } => doc.clone(),
2834            _ => None,
2835        };
2836
2837        assert_eq!("Some documentation".to_owned(), doc.unwrap());
2838    }
2839
2840    #[mz_ore::test]
2841    fn test_namespaces_and_names() {
2842        // When name and namespace specified, full name should contain both.
2843        let schema = Schema::from_str(
2844            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2845        )
2846        .unwrap();
2847        assert_eq!(schema.named.len(), 1);
2848        assert_eq!(
2849            schema.named[0].name,
2850            FullName {
2851                name: "name".into(),
2852                namespace: "namespace".into()
2853            }
2854        );
2855
2856        // When name contains dots, parse the dot-separated name as the namespace.
2857        let schema =
2858            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2859                .unwrap();
2860        assert_eq!(schema.named.len(), 1);
2861        assert_eq!(
2862            schema.named[0].name,
2863            FullName {
2864                name: "dots".into(),
2865                namespace: "name.has".into()
2866            }
2867        );
2868
2869        // Same as above, ignore any provided namespace.
2870        let schema = Schema::from_str(
2871            r#"{"type": "enum", "namespace": "namespace",
2872            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2873        )
2874        .unwrap();
2875        assert_eq!(schema.named.len(), 1);
2876        assert_eq!(
2877            schema.named[0].name,
2878            FullName {
2879                name: "dots".into(),
2880                namespace: "name.has".into()
2881            }
2882        );
2883
2884        // Use default namespace when namespace is not provided.
2885        // Materialize uses "" as the default namespace.
2886        let schema = Schema::from_str(
2887            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2888            "fields": [{"name": "name", "type": "string"}]}"#,
2889        )
2890        .unwrap();
2891        assert_eq!(schema.named.len(), 1);
2892        assert_eq!(
2893            schema.named[0].name,
2894            FullName {
2895                name: "TestDoc".into(),
2896                namespace: "".into()
2897            }
2898        );
2899
2900        // Empty namespace strings should be allowed.
2901        let schema = Schema::from_str(
2902            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2903            "fields": [{"name": "name", "type": "string"}]}"#,
2904        )
2905        .unwrap();
2906        assert_eq!(schema.named.len(), 1);
2907        assert_eq!(
2908            schema.named[0].name,
2909            FullName {
2910                name: "TestDoc".into(),
2911                namespace: "".into()
2912            }
2913        );
2914
2915        // Equality of names is defined on the FullName and is case-sensitive.
2916        let first = Schema::from_str(
2917            r#"{"type": "fixed", "namespace": "namespace",
2918            "name": "name", "size": 1}"#,
2919        )
2920        .unwrap();
2921        let second = Schema::from_str(
2922            r#"{"type": "fixed", "name": "namespace.name",
2923            "size": 1}"#,
2924        )
2925        .unwrap();
2926        assert_eq!(first.named[0].name, second.named[0].name);
2927
2928        let first = Schema::from_str(
2929            r#"{"type": "fixed", "namespace": "namespace",
2930            "name": "name", "size": 1}"#,
2931        )
2932        .unwrap();
2933        let second = Schema::from_str(
2934            r#"{"type": "fixed", "name": "namespace.Name",
2935            "size": 1}"#,
2936        )
2937        .unwrap();
2938        assert_ne!(first.named[0].name, second.named[0].name);
2939
2940        let first = Schema::from_str(
2941            r#"{"type": "fixed", "namespace": "Namespace",
2942            "name": "name", "size": 1}"#,
2943        )
2944        .unwrap();
2945        let second = Schema::from_str(
2946            r#"{"type": "fixed", "namespace": "namespace",
2947            "name": "name", "size": 1}"#,
2948        )
2949        .unwrap();
2950        assert_ne!(first.named[0].name, second.named[0].name);
2951
2952        // The name portion of a fullname, record field names, and enum symbols must:
2953        // start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
2954        assert!(
2955            Schema::from_str(
2956                r#"{"type": "record", "name": "99 problems but a name aint one",
2957            "fields": [{"name": "name", "type": "string"}]}"#
2958            )
2959            .is_err()
2960        );
2961
2962        assert!(
2963            Schema::from_str(
2964                r#"{"type": "record", "name": "!!!",
2965            "fields": [{"name": "name", "type": "string"}]}"#
2966            )
2967            .is_err()
2968        );
2969
2970        assert!(
2971            Schema::from_str(
2972                r#"{"type": "record", "name": "_valid_until_©",
2973            "fields": [{"name": "name", "type": "string"}]}"#
2974            )
2975            .is_err()
2976        );
2977
2978        // Use previously defined names and namespaces as type.
2979        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2980              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2981              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2982              {"name": "f3", "type": "MyEnum"},
2983              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2984              {"name": "f5", "type": "other.namespace.OtherEnum"},
2985              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2986              {"name": "f7", "type": "some.other.ThirdEnum"}
2987             ]}"#).unwrap();
2988        assert_eq!(schema.named.len(), 4);
2989
2990        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2991            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // f1
2992            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // f2
2993            assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); // f3
2994            assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); // f4
2995            assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); // f5
2996            assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); // f6
2997            assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); // f7
2998        } else {
2999            panic!("Expected SchemaPiece::Record, found something else");
3000        }
3001
3002        let schema = Schema::from_str(
3003            r#"{"type": "record", "name": "x.Y", "fields": [
3004              {"name": "e", "type":
3005                {"type": "record", "name": "Z", "fields": [
3006                  {"name": "f", "type": "x.Y"},
3007                  {"name": "g", "type": "x.Z"}
3008                ]}
3009              }
3010            ]}"#,
3011        )
3012        .unwrap();
3013        assert_eq!(schema.named.len(), 2);
3014
3015        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3016            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // e
3017        } else {
3018            panic!("Expected SchemaPiece::Record, found something else");
3019        }
3020
3021        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3022            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); // f
3023            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // g
3024        } else {
3025            panic!("Expected SchemaPiece::Record, found something else");
3026        }
3027
3028        let schema = Schema::from_str(
3029            r#"{"type": "record", "name": "R", "fields": [
3030              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3031                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3032                 "symbols": ["Foo", "Bar"]}
3033                }
3034              ]}},
3035              {"name": "t", "type": "Z"}
3036            ]}"#,
3037        )
3038        .unwrap();
3039        assert_eq!(schema.named.len(), 3);
3040
3041        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3042            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // s
3043            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); // t - refers to "".Z
3044        } else {
3045            panic!("Expected SchemaPiece::Record, found something else");
3046        }
3047    }
3048
3049    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
3050    // compile, they pass.
3051    #[mz_ore::test]
3052    fn test_schema_is_send() {
3053        fn send<S: Send>(_s: S) {}
3054
3055        let schema = Schema {
3056            named: vec![],
3057            indices: Default::default(),
3058            top: SchemaPiece::Null.into(),
3059        };
3060        send(schema);
3061    }
3062
3063    #[mz_ore::test]
3064    fn test_schema_is_sync() {
3065        fn sync<S: Sync>(_s: S) {}
3066
3067        let schema = Schema {
3068            named: vec![],
3069            indices: Default::default(),
3070            top: SchemaPiece::Null.into(),
3071        };
3072        sync(&schema);
3073        sync(schema);
3074    }
3075
3076    #[mz_ore::test]
3077    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
3078    fn test_schema_fingerprint() {
3079        use sha2::Sha256;
3080
3081        let raw_schema = r#"
3082        {
3083            "type": "record",
3084            "name": "test",
3085            "fields": [
3086                {"name": "a", "type": "long", "default": 42},
3087                {"name": "b", "type": "string"}
3088            ]
3089        }
3090    "#;
3091        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3092        let schema = Schema::from_str(raw_schema).unwrap();
3093        assert_eq!(&schema.canonical_form(), expected_canonical);
3094        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3095        assert_eq!(
3096            format!("{}", schema.fingerprint::<Sha256>()),
3097            expected_fingerprint
3098        );
3099
3100        let raw_schema = r#"
3101{
3102  "type": "record",
3103  "name": "ns.r1",
3104  "namespace": "ignored",
3105  "fields": [
3106    {
3107      "name": "f1",
3108      "type": {
3109        "type": "fixed",
3110        "name": "r2",
3111        "size": 1
3112      }
3113    }
3114  ]
3115}
3116"#;
3117        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3118        let schema = Schema::from_str(raw_schema).unwrap();
3119        assert_eq!(&schema.canonical_form(), expected_canonical);
3120        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3121        assert_eq!(
3122            format!("{}", schema.fingerprint::<Sha256>()),
3123            expected_fingerprint
3124        );
3125    }
3126
3127    #[mz_ore::test]
3128    fn test_make_valid() {
3129        for (input, expected) in [
3130            ("foo", "foo"),
3131            ("az99", "az99"),
3132            ("99az", "_99az"),
3133            ("is,bad", "is_bad"),
3134            ("@#$%", "____"),
3135            ("i-amMisBehaved!", "i_amMisBehaved_"),
3136            ("", "_"),
3137        ] {
3138            let actual = Name::make_valid(input);
3139            assert_eq!(expected, actual, "Name::make_valid({input})")
3140        }
3141    }
3142
3143    #[mz_ore::test]
3144    fn test_parse_with_simple_reference() {
3145        // Schema A defines a User record
3146        let ref_schema_json = r#"{
3147            "type": "record",
3148            "name": "User",
3149            "namespace": "com.example",
3150            "fields": [{"name": "id", "type": "int"}]
3151        }"#;
3152
3153        // Schema B references User
3154        let primary_json = r#"{
3155            "type": "record",
3156            "name": "Event",
3157            "namespace": "com.example",
3158            "fields": [{"name": "user", "type": "com.example.User"}]
3159        }"#;
3160
3161        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3162        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3163
3164        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3165
3166        // Verify both Event and User types are in the schema
3167        let user_name = FullName {
3168            name: "User".to_string(),
3169            namespace: "com.example".to_string(),
3170        };
3171        let event_name = FullName {
3172            name: "Event".to_string(),
3173            namespace: "com.example".to_string(),
3174        };
3175
3176        assert!(
3177            schema.indices.contains_key(&user_name),
3178            "User type should be in schema indices"
3179        );
3180        assert!(
3181            schema.indices.contains_key(&event_name),
3182            "Event type should be in schema indices"
3183        );
3184
3185        // Verify Event's user field references User
3186        if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3187            let event_piece = &schema.named[*event_idx].piece;
3188            if let SchemaPiece::Record { fields, .. } = event_piece {
3189                assert_eq!(fields.len(), 1);
3190                assert_eq!(fields[0].name, "user");
3191                // The user field should reference the User type (Named)
3192                assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3193            } else {
3194                panic!("Expected Event to be a record");
3195            }
3196        } else {
3197            panic!("Expected top to be Named");
3198        }
3199    }
3200
3201    #[mz_ore::test]
3202    fn test_parse_with_nested_references() {
3203        // Schema A defines Address
3204        let schema_a = r#"{
3205            "type": "record",
3206            "name": "Address",
3207            "namespace": "com.example",
3208            "fields": [
3209                {"name": "street", "type": "string"},
3210                {"name": "city", "type": "string"}
3211            ]
3212        }"#;
3213
3214        // Schema B defines User with Address field (references A)
3215        let schema_b = r#"{
3216            "type": "record",
3217            "name": "User",
3218            "namespace": "com.example",
3219            "fields": [
3220                {"name": "id", "type": "int"},
3221                {"name": "address", "type": "com.example.Address"}
3222            ]
3223        }"#;
3224
3225        // Schema C defines Event with User field (references B, transitively A)
3226        let schema_c = r#"{
3227            "type": "record",
3228            "name": "Event",
3229            "namespace": "com.example",
3230            "fields": [
3231                {"name": "user", "type": "com.example.User"},
3232                {"name": "timestamp", "type": "long"}
3233            ]
3234        }"#;
3235
3236        // Parse A first
3237        let ref_schema_a = Schema::from_str(schema_a).unwrap();
3238
3239        // Parse B with reference to A
3240        let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3241        let ref_schema_b =
3242            Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3243                .unwrap();
3244
3245        // Parse C with references to A and B (in dependency order)
3246        let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3247        let final_schema =
3248            Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3249
3250        // Verify all three types are in the schema
3251        for name in ["Address", "User", "Event"] {
3252            let full_name = FullName {
3253                name: name.to_string(),
3254                namespace: "com.example".to_string(),
3255            };
3256            assert!(
3257                final_schema.indices.contains_key(&full_name),
3258                "{} type should be in schema indices",
3259                name
3260            );
3261        }
3262    }
3263
3264    #[mz_ore::test]
3265    fn test_parse_with_multiple_types_in_reference() {
3266        // Schema A defines both Address and PhoneNumber
3267        let ref_schema_json = r#"{
3268            "type": "record",
3269            "name": "ContactInfo",
3270            "namespace": "com.example",
3271            "fields": [
3272                {
3273                    "name": "address",
3274                    "type": {
3275                        "type": "record",
3276                        "name": "Address",
3277                        "fields": [{"name": "street", "type": "string"}]
3278                    }
3279                },
3280                {
3281                    "name": "phone",
3282                    "type": {
3283                        "type": "record",
3284                        "name": "PhoneNumber",
3285                        "fields": [{"name": "number", "type": "string"}]
3286                    }
3287                }
3288            ]
3289        }"#;
3290
3291        // Schema B references both Address and PhoneNumber
3292        let primary_json = r#"{
3293            "type": "record",
3294            "name": "User",
3295            "namespace": "com.example",
3296            "fields": [
3297                {"name": "id", "type": "int"},
3298                {"name": "home", "type": "com.example.Address"},
3299                {"name": "mobile", "type": "com.example.PhoneNumber"}
3300            ]
3301        }"#;
3302
3303        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3304        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3305
3306        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3307
3308        // Verify all types are in the schema
3309        for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3310            let full_name = FullName {
3311                name: name.to_string(),
3312                namespace: "com.example".to_string(),
3313            };
3314            assert!(
3315                schema.indices.contains_key(&full_name),
3316                "{} type should be in schema indices",
3317                name
3318            );
3319        }
3320    }
3321
3322    #[mz_ore::test]
3323    fn test_parse_with_no_references() {
3324        // When no references are provided, it should behave like regular parse
3325        let schema_json = r#"{
3326            "type": "record",
3327            "name": "Simple",
3328            "fields": [{"name": "id", "type": "int"}]
3329        }"#;
3330
3331        let value: Value = serde_json::from_str(schema_json).unwrap();
3332
3333        let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3334        let schema_normal = Schema::parse(&value).unwrap();
3335
3336        // Both should produce equivalent schemas
3337        assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3338        assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3339    }
3340
3341    #[mz_ore::test]
3342    fn test_parse_with_reference_in_array() {
3343        // Schema A defines an Item record
3344        let ref_schema_json = r#"{
3345            "type": "record",
3346            "name": "Item",
3347            "namespace": "com.example",
3348            "fields": [{"name": "name", "type": "string"}]
3349        }"#;
3350
3351        // Schema B has an array of Items
3352        let primary_json = r#"{
3353            "type": "record",
3354            "name": "Order",
3355            "namespace": "com.example",
3356            "fields": [
3357                {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3358            ]
3359        }"#;
3360
3361        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3362        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3363
3364        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3365
3366        // Verify both types exist
3367        let item_name = FullName {
3368            name: "Item".to_string(),
3369            namespace: "com.example".to_string(),
3370        };
3371        assert!(schema.indices.contains_key(&item_name));
3372
3373        // Verify the array items type is a Named reference
3374        if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3375            let order_piece = &schema.named[*order_idx].piece;
3376            if let SchemaPiece::Record { fields, .. } = order_piece {
3377                if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3378                    assert!(
3379                        matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3380                        "Array items should be a Named reference to Item"
3381                    );
3382                } else {
3383                    panic!("Expected items field to be an array");
3384                }
3385            } else {
3386                panic!("Expected Order to be a record");
3387            }
3388        } else {
3389            panic!("Expected top to be Named");
3390        }
3391    }
3392}