Skip to main content

mz_avro/
schema.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for parsing and interacting with schemas in Avro format.
25
26use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use aws_lc_rs::digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92/// Describes errors happened while parsing Avro schemas.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113/// Represents an Avro schema fingerprint
114/// More information about Avro schema fingerprints can be found in the
115/// [Avro Schema Resolution documentation](https://avro.apache.org/docs/++version++/specification/#schema-resolution)
116#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    /// A `null` Avro schema.
174    Null,
175    /// A `boolean` Avro schema.
176    Boolean,
177    /// An `int` Avro schema.
178    Int,
179    /// A `long` Avro schema.
180    Long,
181    /// A `float` Avro schema.
182    Float,
183    /// A `double` Avro schema.
184    Double,
185    /// An `Int` Avro schema with a semantic type being days since the unix epoch.
186    Date,
187    /// An `Int64` Avro schema with a semantic type being milliseconds since the unix epoch.
188    ///
189    /// <https://avro.apache.org/docs/++version++/specification/#time_ms>
190    TimestampMilli,
191    /// An `Int64` Avro schema with a semantic type being microseconds since the unix epoch.
192    ///
193    /// <https://avro.apache.org/docs/++version++/specification/#time-microsecond-precision>
194    TimestampMicro,
195    /// A `bytes` or `fixed` Avro schema with a logical type of `decimal` and
196    /// the specified precision and scale.
197    ///
198    /// If the underlying type is `fixed`,
199    /// the `fixed_size` field specifies the size.
200    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    /// A `bytes` Avro schema.
206    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
207    Bytes,
208    /// A `string` Avro schema.
209    /// `String` represents a unicode character sequence.
210    String,
211    /// A `string` Avro schema that is tagged as representing JSON data
212    Json,
213    /// A `string` Avro schema with a logical type of `uuid`.
214    Uuid,
215    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
216    /// This variant holds the `Schema` for the array element type.
217    Array(Box<SchemaPieceOrNamed>),
218    /// A `map` Avro schema.
219    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
220    /// `Map` keys are assumed to be `string`.
221    Map(Box<SchemaPieceOrNamed>),
222    /// A `union` Avro schema.
223    Union(UnionSchema),
224    /// A value written as `int` and read as `long`,
225    /// for the timestamp-millis logicalType.
226    ResolveIntTsMilli,
227    /// A value written as `int` and read as `long`,
228    /// for the timestamp-micros logicalType.
229    ResolveIntTsMicro,
230    /// A value written as an `int` with `date` logical type,
231    /// and read as any timestamp type
232    ResolveDateTimestamp,
233    /// A value written as `int` and read as `long`
234    ResolveIntLong,
235    /// A value written as `int` and read as `float`
236    ResolveIntFloat,
237    /// A value written as `int` and read as `double`
238    ResolveIntDouble,
239    /// A value written as `long` and read as `float`
240    ResolveLongFloat,
241    /// A value written as `long` and read as `double`
242    ResolveLongDouble,
243    /// A value written as `float` and read as `double`
244    ResolveFloatDouble,
245    /// A concrete (i.e., non-`union`) type in the writer,
246    /// resolved against one specific variant of a `union` in the reader.
247    ResolveConcreteUnion {
248        /// The index of the variant in the reader
249        index: usize,
250        /// The concrete type
251        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    /// A union in the writer, resolved against a union in the reader.
256    /// The two schemas may have different variants and the variants may be in a different order.
257    ResolveUnionUnion {
258        /// A mapping of the fields in the writer to those in the reader.
259        /// If the `i`th element is `Err(e)`, the `i`th field in the writer
260        /// did not match any field in the reader (or even if it matched by name, resolution failed).
261        /// If the `i`th element is `Ok((j, piece))`, then the `i`th field of the writer
262        /// matched the `j`th field of the reader, and `piece` is their resolved node.
263        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    /// The inverse of `ResolveConcreteUnion`
268    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    /// A `record` Avro schema.
273    ///
274    /// The `lookup` table maps field names to their position in the `Vec`
275    /// of `fields`.
276    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    /// An `enum` Avro schema.
282    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        /// The index of the default value.
286        ///
287        /// This is only used in schema resolution: it is the value that
288        /// will be read by a reader when a writer writes a value that the reader
289        /// does not expect.
290        default_idx: Option<usize>,
291    },
292    /// A `fixed` Avro schema.
293    Fixed { size: usize },
294    /// A record in the writer, resolved against a record in the reader.
295    /// The two schemas may have different fields and the fields may be in a different order.
296    ResolveRecord {
297        /// Fields that do not exist in the writer schema, but had a default
298        /// value specified in the reader schema, which we use.
299        defaults: Vec<ResolvedDefaultValueField>,
300        /// Fields in the order of their appearance in the writer schema.
301        /// `Present` if they could be resolved against a field in the reader schema;
302        /// `Absent` otherwise.
303        fields: Vec<ResolvedRecordField>,
304        /// The size of `defaults`, plus the number of `Present` values in `fields`.
305        n_reader_fields: usize,
306    },
307    /// An enum in the writer, resolved against an enum in the reader.
308    /// The two schemas may have different values and the values may be in a different order.
309    ResolveEnum {
310        doc: Documentation,
311        /// Symbols in order of the writer schema along with their index in the reader schema,
312        /// or `Err(symbol_name)` if they don't exist in the reader schema.
313        symbols: Vec<Result<(usize, String), String>>,
314        /// The value to decode if the writer writes some value not expected by the reader.
315        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    /// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
321    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    /// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
325    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    /// Constructs an `avro::Value` if this is of underlying int type.
329    /// Guaranteed to be `Some` when `is_underlying_int` is `true`.
330    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            // TODO[btv] - should we bounds-check the date here? We
334            // don't elsewhere... maybe we should everywhere.
335            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    /// Constructs an `avro::Value` if this is of underlying long type.
340    /// Guaranteed to be `Some` when `is_underlying_long` is `true`.
341    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351/// Represents any valid Avro schema
352/// More information about Avro schemas can be found in the
353/// [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration)
354#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode<'_> {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407/// This type is used to simplify enum variant comparison between `Schema` and `types::Value`.
408///
409/// **NOTE** This type was introduced due to a limitation of `mem::discriminant` requiring a _value_
410/// be constructed in order to get the discriminant, which makes it difficult to implement a
411/// function that maps from `Discriminant<Schema> -> Discriminant<Value>`. Conversion into this
412/// intermediate type should be especially fast, as the number of enum variants is small, which
413/// _should_ compile into a jump-table for the conversion.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    // Fixed-length types
417    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    // Variable-length types
424    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    // This can arise in resolved schemas, particularly when a union resolves to a non-union.
433    // We would need to do a lookup to find the actual type.
434    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521/// Represents names for `record`, `enum` and `fixed` Avro schemas.
522///
523/// Each of these `Schema`s have a `fullname` composed of two parts:
524///   * a name
525///   * a namespace
526///
527/// `aliases` can also be defined, to facilitate schema evolution.
528///
529/// More information about schema names can be found in the
530/// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
531#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    // [XXX] btv - what happens if `name` contains dots, _and_ `namespace` is `Some` ?
546    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    /// Get the shortest unambiguous synonym of this name
573    /// at a given point in the schema graph. If this name
574    /// is in the same namespace as the enclosing node, this
575    /// returns the short name; otherwise, it returns the fully qualified name.
576    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    /// Returns the namespace of the name
584    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595/// Represents documentation for complex Avro schemas.
596pub type Documentation = Option<String>;
597
598impl Name {
599    /// Reports whether the given string is a valid Avro name.
600    ///
601    /// See: <https://avro.apache.org/docs/++version++/specification/#names>
602    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    /// Returns an error if the given name is invalid.
609    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    /// Rewrites `name` to be valid.
622    ///
623    /// Any non alphanumeric characters are replaced with underscores. If the
624    /// name begins with a number, it is prefixed with `_`.
625    ///
626    /// `make_valid` is not injective. Multiple invalid names are mapped to the
627    /// the same valid name.
628    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    /// Parse a `serde_json::map::Map` into a `Name`.
653    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    /// Parses a name from a simple string.
702    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    /// Return the `fullname` of this `Name`
709    ///
710    /// More information about fullnames can be found in the
711    /// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
712    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732/// Represents a `field` in a `record` Avro schema.
733#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    /// Name of the field.
736    pub name: String,
737    /// Documentation of the field.
738    pub doc: Documentation,
739    /// Default value of the field.
740    /// This value will be used when reading Avro datum if schema resolution
741    /// is enabled.
742    pub default: Option<Value>,
743    /// Schema of the field.
744    pub schema: SchemaPieceOrNamed,
745    /// Order of the field.
746    ///
747    /// **NOTE** This currently has no effect.
748    pub order: RecordFieldOrder,
749    /// Position of the field in the list of `field` of its parent `Schema`
750    pub position: usize,
751}
752
753/// Represents any valid order for a `field` in a `record` Avro schema.
754#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    // Used to ensure uniqueness of anonymous schema inputs, and provide constant time finding of the
768    // schema index given a value.
769    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    // Same as above, for named input references
772    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    /// Returns a slice to all variants of this schema.
812    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    /// Returns a mutable slice to all variants of this schema.
817    pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818        &mut self.schemas
819    }
820
821    /// Returns true if the first variant of this `UnionSchema` is `Null`.
822    pub fn is_nullable(&self) -> bool {
823        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824    }
825
826    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827        self.anon_variant_index
828            .get(&SchemaKind::from(sp))
829            .map(|idx| (*idx, &self.schemas[*idx]))
830    }
831
832    pub fn match_ref(
833        &self,
834        other: SchemaPieceRefOrNamed,
835        names_map: &BTreeMap<usize, usize>,
836    ) -> Option<(usize, &SchemaPieceOrNamed)> {
837        match other {
838            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839            SchemaPieceRefOrNamed::Named(idx) => names_map
840                .get(&idx)
841                .and_then(|idx| self.named_variant_index.get(idx))
842                .map(|idx| (*idx, &self.schemas[*idx])),
843        }
844    }
845
846    #[inline(always)]
847    pub fn match_(
848        &self,
849        other: &SchemaPieceOrNamed,
850        names_map: &BTreeMap<usize, usize>,
851    ) -> Option<(usize, &SchemaPieceOrNamed)> {
852        self.match_ref(other.as_ref(), names_map)
853    }
854}
855
856// No need to compare variant_index, it is derivative of schemas.
857impl PartialEq for UnionSchema {
858    fn eq(&self, other: &UnionSchema) -> bool {
859        self.schemas.eq(&other.schemas)
860    }
861}
862
863#[derive(Default)]
864struct SchemaParser {
865    named: Vec<Option<NamedSchemaPiece>>,
866    indices: BTreeMap<FullName, usize>,
867}
868
869impl SchemaParser {
870    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
871        let top = self.parse_inner("", value)?;
872        let SchemaParser { named, indices } = self;
873        Ok(Schema {
874            named: named.into_iter().map(|o| o.unwrap()).collect(),
875            indices,
876            top,
877        })
878    }
879
880    fn parse_inner(
881        &mut self,
882        default_namespace: &str,
883        value: &Value,
884    ) -> Result<SchemaPieceOrNamed, AvroError> {
885        match *value {
886            Value::String(ref t) => {
887                let name = FullName::from_parts(t.as_str(), None, default_namespace);
888                if let Some(idx) = self.indices.get(&name) {
889                    Ok(SchemaPieceOrNamed::Named(*idx))
890                } else {
891                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
892                        t.as_str(),
893                    )?))
894                }
895            }
896            Value::Object(ref data) => self.parse_complex(default_namespace, data),
897            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
898                self.parse_union(default_namespace, data)?,
899            )),
900            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
901        }
902    }
903
904    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
905        let idx = match self.indices.entry(fullname) {
906            Entry::Vacant(ve) => *ve.insert(self.named.len()),
907            Entry::Occupied(oe) => {
908                return Err(ParseSchemaError::new(format!(
909                    "Sub-schema with name {:?} encountered multiple times",
910                    oe.key()
911                ))
912                .into());
913            }
914        };
915        self.named.push(None);
916        Ok(idx)
917    }
918
919    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
920        assert_none!(self.named[index]);
921        self.named[index] = Some(schema);
922    }
923
924    fn parse_named_type(
925        &mut self,
926        type_name: &str,
927        default_namespace: &str,
928        complex: &Map<String, Value>,
929    ) -> Result<usize, AvroError> {
930        let name = Name::parse(complex)?;
931        match name.name.as_str() {
932            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
933                return Err(ParseSchemaError::new(format!(
934                    "{} may not be used as a custom type name",
935                    name.name
936                ))
937                .into());
938            }
939            _ => {}
940        };
941        let fullname = name.fullname(default_namespace);
942        let default_namespace = fullname.namespace.clone();
943        let idx = self.alloc_name(fullname.clone())?;
944        let piece = match type_name {
945            "record" => self.parse_record(&default_namespace, complex),
946            "enum" => self.parse_enum(complex),
947            "fixed" => self.parse_fixed(&default_namespace, complex),
948            _ => unreachable!("Unknown named type kind: {}", type_name),
949        }?;
950
951        self.insert(
952            idx,
953            NamedSchemaPiece {
954                name: fullname,
955                piece,
956            },
957        );
958
959        Ok(idx)
960    }
961
962    /// Parse a `serde_json::Value` representing a complex Avro type into a
963    /// `Schema`.
964    ///
965    /// Avro supports "recursive" definition of types.
966    /// e.g: {"type": {"type": "string"}}
967    fn parse_complex(
968        &mut self,
969        default_namespace: &str,
970        complex: &Map<String, Value>,
971    ) -> Result<SchemaPieceOrNamed, AvroError> {
972        match complex.get("type") {
973            Some(&Value::String(ref t)) => Ok(match t.as_str() {
974                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
975                    t,
976                    default_namespace,
977                    complex,
978                )?),
979                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
980                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
981                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
982                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
983                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
984                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
985                other => {
986                    let name = FullName {
987                        name: other.into(),
988                        namespace: default_namespace.into(),
989                    };
990                    if let Some(idx) = self.indices.get(&name) {
991                        SchemaPieceOrNamed::Named(*idx)
992                    } else {
993                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
994                    }
995                }
996            }),
997            Some(&Value::Object(ref data)) => match data.get("type") {
998                Some(value) => self.parse_inner(default_namespace, value),
999                None => Err(
1000                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1001                ),
1002            },
1003            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1004        }
1005    }
1006
1007    /// Parse a `serde_json::Value` representing a Avro record type into a
1008    /// `Schema`.
1009    fn parse_record(
1010        &mut self,
1011        default_namespace: &str,
1012        complex: &Map<String, Value>,
1013    ) -> Result<SchemaPiece, AvroError> {
1014        let mut lookup = BTreeMap::new();
1015
1016        let fields: Vec<RecordField> = complex
1017            .get("fields")
1018            .and_then(|fields| fields.as_array())
1019            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1020            .and_then(|fields| {
1021                fields
1022                    .iter()
1023                    .filter_map(|field| field.as_object())
1024                    .enumerate()
1025                    .map(|(position, field)| {
1026                        self.parse_record_field(default_namespace, field, position)
1027                    })
1028                    .collect::<Result<_, _>>()
1029            })?;
1030
1031        for field in &fields {
1032            lookup.insert(field.name.clone(), field.position);
1033        }
1034
1035        Ok(SchemaPiece::Record {
1036            doc: complex.doc(),
1037            fields,
1038            lookup,
1039        })
1040    }
1041
1042    /// Parse a `serde_json::Value` into a `RecordField`.
1043    fn parse_record_field(
1044        &mut self,
1045        default_namespace: &str,
1046        field: &Map<String, Value>,
1047        position: usize,
1048    ) -> Result<RecordField, AvroError> {
1049        let name = field
1050            .name()
1051            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1052
1053        Name::validate(&name)?;
1054
1055        let schema = field
1056            .get("type")
1057            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1058            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1059
1060        let default = field.get("default").cloned();
1061
1062        let order = field
1063            .get("order")
1064            .and_then(|order| order.as_str())
1065            .and_then(|order| match order {
1066                "ascending" => Some(RecordFieldOrder::Ascending),
1067                "descending" => Some(RecordFieldOrder::Descending),
1068                "ignore" => Some(RecordFieldOrder::Ignore),
1069                _ => None,
1070            })
1071            .unwrap_or(RecordFieldOrder::Ascending);
1072
1073        Ok(RecordField {
1074            name,
1075            doc: field.doc(),
1076            default,
1077            schema,
1078            order,
1079            position,
1080        })
1081    }
1082
1083    /// Parse a `serde_json::Value` representing a Avro enum type into a
1084    /// `Schema`.
1085    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1086        let symbols: Vec<String> = complex
1087            .get("symbols")
1088            .and_then(|v| v.as_array())
1089            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1090            .and_then(|symbols| {
1091                symbols
1092                    .iter()
1093                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1094                    .collect::<Option<_>>()
1095                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1096            })?;
1097
1098        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1099        for symbol in symbols.iter() {
1100            if unique_symbols.contains(symbol) {
1101                return Err(ParseSchemaError::new(format!(
1102                    "Enum symbols must be unique, found multiple: {}",
1103                    symbol
1104                ))
1105                .into());
1106            } else {
1107                unique_symbols.insert(symbol);
1108            }
1109        }
1110
1111        let default_idx = if let Some(default) = complex.get("default") {
1112            let default_str = default.as_str().ok_or_else(|| {
1113                ParseSchemaError::new(format!(
1114                    "Enum default should be a string, got: {:?}",
1115                    default
1116                ))
1117            })?;
1118            let default_idx = symbols
1119                .iter()
1120                .position(|x| x == default_str)
1121                .ok_or_else(|| {
1122                    ParseSchemaError::new(format!(
1123                        "Enum default not found in list of symbols: {}",
1124                        default_str
1125                    ))
1126                })?;
1127            Some(default_idx)
1128        } else {
1129            None
1130        };
1131
1132        Ok(SchemaPiece::Enum {
1133            doc: complex.doc(),
1134            symbols,
1135            default_idx,
1136        })
1137    }
1138
1139    /// Parse a `serde_json::Value` representing a Avro array type into a
1140    /// `Schema`.
1141    fn parse_array(
1142        &mut self,
1143        default_namespace: &str,
1144        complex: &Map<String, Value>,
1145    ) -> Result<SchemaPiece, AvroError> {
1146        complex
1147            .get("items")
1148            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1149            .and_then(|items| self.parse_inner(default_namespace, items))
1150            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1151    }
1152
1153    /// Parse a `serde_json::Value` representing a Avro map type into a
1154    /// `Schema`.
1155    fn parse_map(
1156        &mut self,
1157        default_namespace: &str,
1158        complex: &Map<String, Value>,
1159    ) -> Result<SchemaPiece, AvroError> {
1160        complex
1161            .get("values")
1162            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1163            .and_then(|items| self.parse_inner(default_namespace, items))
1164            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1165    }
1166
1167    /// Parse a `serde_json::Value` representing a Avro union type into a
1168    /// `Schema`.
1169    fn parse_union(
1170        &mut self,
1171        default_namespace: &str,
1172        items: &[Value],
1173    ) -> Result<SchemaPiece, AvroError> {
1174        items
1175            .iter()
1176            .map(|value| self.parse_inner(default_namespace, value))
1177            .collect::<Result<Vec<_>, _>>()
1178            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1179    }
1180
1181    /// Parse a `serde_json::Value` representing a logical decimal type into a
1182    /// `Schema`.
1183    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1184        let precision = complex
1185            .get("precision")
1186            .and_then(|v| v.as_i64())
1187            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1188
1189        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1190
1191        if scale < 0 {
1192            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1193        }
1194
1195        if precision < 0 {
1196            return Err(
1197                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1198            );
1199        }
1200
1201        if scale > precision {
1202            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1203        }
1204
1205        Ok((precision as usize, scale as usize))
1206    }
1207
1208    /// Parse a `serde_json::Value` representing an Avro bytes type into a
1209    /// `Schema`.
1210    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1211        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1212
1213        if let Some("decimal") = logical_type {
1214            match Self::parse_decimal(complex) {
1215                Ok((precision, scale)) => {
1216                    return Ok(SchemaPiece::Decimal {
1217                        precision,
1218                        scale,
1219                        fixed_size: None,
1220                    });
1221                }
1222                Err(e) => warn!(
1223                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1224                    complex, e
1225                ),
1226            }
1227        }
1228
1229        Ok(SchemaPiece::Bytes)
1230    }
1231
1232    /// Parse a [`serde_json::Value`] representing an Avro Int type
1233    ///
1234    /// If the complex type has a `connect.name` tag (as [emitted by
1235    /// Debezium][1]) that matches a `Date` tag, we specify that the correct
1236    /// schema to use is `Date`.
1237    ///
1238    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1239    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1240        const AVRO_DATE: &str = "date";
1241        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1242        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1243        if let Some(name) = complex.get("connect.name") {
1244            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1245                if name == KAFKA_DATE {
1246                    warn!("using deprecated debezium date format");
1247                }
1248                return Ok(SchemaPiece::Date);
1249            }
1250        }
1251        // Put this after the custom semantic types so that the debezium
1252        // warning is emitted, since the logicalType tag shows up in the
1253        // deprecated debezium format :-/
1254        if let Some(name) = complex.get("logicalType") {
1255            if name == AVRO_DATE {
1256                return Ok(SchemaPiece::Date);
1257            }
1258        }
1259        if !complex.is_empty() {
1260            debug!("parsing complex type as regular int: {:?}", complex);
1261        }
1262        Ok(SchemaPiece::Int)
1263    }
1264
1265    /// Parse a [`serde_json::Value`] representing an Avro Int64/Long type
1266    ///
1267    /// The debezium/kafka types are document at [the debezium site][1], and the
1268    /// avro ones are documented at [Avro][2].
1269    ///
1270    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1271    /// [2]: https://avro.apache.org/docs/++version++/specification/
1272    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1273        const AVRO_MILLI_TS: &str = "timestamp-millis";
1274        const AVRO_MICRO_TS: &str = "timestamp-micros";
1275
1276        const CONNECT_MILLI_TS: &[&str] = &[
1277            "io.debezium.time.Timestamp",
1278            "org.apache.kafka.connect.data.Timestamp",
1279        ];
1280        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1281
1282        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1283            if CONNECT_MILLI_TS.contains(&&**name) {
1284                return Ok(SchemaPiece::TimestampMilli);
1285            }
1286            if name == CONNECT_MICRO_TS {
1287                return Ok(SchemaPiece::TimestampMicro);
1288            }
1289        }
1290        if let Some(name) = complex.get("logicalType") {
1291            if name == AVRO_MILLI_TS {
1292                return Ok(SchemaPiece::TimestampMilli);
1293            }
1294            if name == AVRO_MICRO_TS {
1295                return Ok(SchemaPiece::TimestampMicro);
1296            }
1297        }
1298        if !complex.is_empty() {
1299            debug!("parsing complex type as regular long: {:?}", complex);
1300        }
1301        Ok(SchemaPiece::Long)
1302    }
1303
1304    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1305        const CONNECT_JSON: &str = "io.debezium.data.Json";
1306
1307        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1308            if CONNECT_JSON == name.as_str() {
1309                return SchemaPiece::Json;
1310            }
1311        }
1312        if let Some(name) = complex.get("logicalType") {
1313            if name == "uuid" {
1314                return SchemaPiece::Uuid;
1315            }
1316        }
1317        debug!("parsing complex type as regular string: {:?}", complex);
1318        SchemaPiece::String
1319    }
1320
1321    /// Parse a `serde_json::Value` representing a Avro fixed type into a
1322    /// `Schema`.
1323    fn parse_fixed(
1324        &self,
1325        _default_namespace: &str,
1326        complex: &Map<String, Value>,
1327    ) -> Result<SchemaPiece, AvroError> {
1328        let _name = Name::parse(complex)?;
1329
1330        let size = complex
1331            .get("size")
1332            .and_then(|v| v.as_i64())
1333            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1334        if size <= 0 {
1335            return Err(ParseSchemaError::new(format!(
1336                "Fixed values require a positive size attribute, found: {}",
1337                size
1338            ))
1339            .into());
1340        }
1341
1342        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1343
1344        if let Some("decimal") = logical_type {
1345            match Self::parse_decimal(complex) {
1346                Ok((precision, scale)) => {
1347                    let max = ((8 * size - 1) as f64 * 2_f64.log10()).floor() as usize;
1348                    if precision > max {
1349                        warn!(
1350                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1351                            precision, size
1352                        );
1353                    } else {
1354                        return Ok(SchemaPiece::Decimal {
1355                            precision,
1356                            scale,
1357                            fixed_size: Some(size as usize),
1358                        });
1359                    }
1360                }
1361                Err(e) => warn!(
1362                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1363                    complex, e
1364                ),
1365            }
1366        }
1367
1368        Ok(SchemaPiece::Fixed {
1369            size: size as usize,
1370        })
1371    }
1372}
1373
1374impl Schema {
1375    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
1376    /// schema.
1377    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1378        Self::parse_with_references(value, &[])
1379    }
1380
1381    /// Parse an JSON Avro schema with referenced named types.
1382    ///
1383    /// This is used when parsing a schema that references types defined in other
1384    /// schemas (Confluent Schema Registry schema references). Referenced schemas
1385    /// should be provided in dependency order (dependencies first).
1386    ///
1387    /// # Arguments
1388    ///
1389    /// * `primary` - The primary schema JSON value to parse
1390    /// * `reference_schemas` - Schemas whose named types should be available during parsing
1391    pub fn parse_with_references(
1392        primary: &Value,
1393        reference_schemas: &[Schema],
1394    ) -> Result<Self, AvroError> {
1395        // Collect and remap named types from all referenced schemas
1396        let (named, indices) = Self::collect_named_types(reference_schemas);
1397
1398        // Create parser with pre-populated named types
1399        let p = SchemaParser {
1400            named: named.into_iter().map(Some).collect(),
1401            indices,
1402        };
1403        p.parse(primary)
1404    }
1405
1406    /// Collect all named types from a list of schemas, remapping indices as needed.
1407    ///
1408    /// When combining named types from multiple schemas, each schema's internal
1409    /// indices need to be remapped to account for duplicates and new positions.
1410    fn collect_named_types(
1411        schemas: &[Schema],
1412    ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1413        let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1414        let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1415
1416        for schema in schemas {
1417            // First pass: Build the index_map from this schema's indices to combined indices.
1418            // For types that already exist: map to existing combined index
1419            // For new types: map to their future position in combined_named
1420            let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1421            let mut new_type_offset = combined_named.len();
1422
1423            for named_piece in &schema.named {
1424                if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1425                    // Type already exists, use existing index
1426                    index_map.push(existing_idx);
1427                } else {
1428                    // New type, assign next available index
1429                    index_map.push(new_type_offset);
1430                    new_type_offset += 1;
1431                }
1432            }
1433
1434            // Second pass: Add new types with proper remapping
1435            for named_piece in &schema.named {
1436                if combined_indices.contains_key(&named_piece.name) {
1437                    continue;
1438                }
1439
1440                let mut remapped = named_piece.clone();
1441                Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1442
1443                let new_idx = combined_named.len();
1444                combined_indices.insert(remapped.name.clone(), new_idx);
1445                combined_named.push(remapped);
1446            }
1447        }
1448
1449        (combined_named, combined_indices)
1450    }
1451
1452    /// Recursively remap Named indices in a SchemaPiece using an index map.
1453    fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1454        match piece {
1455            SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1456            SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1457            SchemaPiece::Union(union) => {
1458                for variant in union.variants_mut() {
1459                    Self::remap_indices_with_map(variant, index_map);
1460                }
1461            }
1462            SchemaPiece::Record { fields, .. } => {
1463                for field in fields {
1464                    Self::remap_indices_with_map(&mut field.schema, index_map);
1465                }
1466            }
1467            _ => {}
1468        }
1469    }
1470
1471    /// Remap a single SchemaPieceOrNamed using an index map.
1472    fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1473        match item {
1474            SchemaPieceOrNamed::Named(idx) => {
1475                if let Some(&new_idx) = index_map.get(*idx) {
1476                    *idx = new_idx;
1477                }
1478            }
1479            SchemaPieceOrNamed::Piece(piece) => {
1480                Self::remap_indices_in_piece_with_map(piece, index_map)
1481            }
1482        }
1483    }
1484
1485    /// Converts `self` into its [Parsing Canonical Form].
1486    ///
1487    /// [Parsing Canonical Form]:
1488    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1489    pub fn canonical_form(&self) -> String {
1490        let json = serde_json::to_value(self).unwrap();
1491        parsing_canonical_form(&json)
1492    }
1493
1494    /// Generate fingerprint of Schema's [Parsing Canonical Form].
1495    ///
1496    /// [Parsing Canonical Form]:
1497    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1498    pub fn fingerprint(&self, algorithm: &'static digest::Algorithm) -> SchemaFingerprint {
1499        let hash = digest::digest(algorithm, self.canonical_form().as_bytes());
1500        SchemaFingerprint {
1501            bytes: hash.as_ref().to_vec(),
1502        }
1503    }
1504
1505    /// Parse a `serde_json::Value` representing a primitive Avro type into a
1506    /// `Schema`.
1507    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1508        match primitive {
1509            "null" => Ok(SchemaPiece::Null),
1510            "boolean" => Ok(SchemaPiece::Boolean),
1511            "int" => Ok(SchemaPiece::Int),
1512            "long" => Ok(SchemaPiece::Long),
1513            "double" => Ok(SchemaPiece::Double),
1514            "float" => Ok(SchemaPiece::Float),
1515            "bytes" => Ok(SchemaPiece::Bytes),
1516            "string" => Ok(SchemaPiece::String),
1517            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1518        }
1519    }
1520}
1521
1522impl FromStr for Schema {
1523    type Err = AvroError;
1524
1525    /// Create a `Schema` from a string representing a JSON Avro schema.
1526    fn from_str(input: &str) -> Result<Self, AvroError> {
1527        let value = serde_json::from_str(input)
1528            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1529        Self::parse(&value)
1530    }
1531}
1532
1533#[derive(Clone, Debug, PartialEq)]
1534pub struct NamedSchemaPiece {
1535    pub name: FullName,
1536    pub piece: SchemaPiece,
1537}
1538
1539#[derive(Copy, Clone, Debug)]
1540pub struct SchemaNode<'a> {
1541    pub root: &'a Schema,
1542    pub inner: &'a SchemaPiece,
1543    pub name: Option<&'a FullName>,
1544}
1545
1546#[derive(Copy, Clone, Debug)]
1547pub enum SchemaPieceRefOrNamed<'a> {
1548    Piece(&'a SchemaPiece),
1549    Named(usize),
1550}
1551
1552impl<'a> SchemaPieceRefOrNamed<'a> {
1553    pub fn get_human_name(&self, root: &Schema) -> String {
1554        match self {
1555            Self::Piece(piece) => format!("{:?}", piece),
1556            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1557        }
1558    }
1559
1560    #[inline(always)]
1561    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1562        match self {
1563            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1564            SchemaPieceRefOrNamed::Named(index) => {
1565                let named_piece = root.lookup(index);
1566                (&named_piece.piece, Some(&named_piece.name))
1567            }
1568        }
1569    }
1570}
1571
1572#[derive(Copy, Clone, Debug)]
1573pub struct SchemaNodeOrNamed<'a> {
1574    pub root: &'a Schema,
1575    pub inner: SchemaPieceRefOrNamed<'a>,
1576}
1577
1578impl<'a> SchemaNodeOrNamed<'a> {
1579    #[inline(always)]
1580    pub fn lookup(self) -> SchemaNode<'a> {
1581        let (inner, name) = self.inner.get_piece_and_name(self.root);
1582        SchemaNode {
1583            root: self.root,
1584            inner,
1585            name,
1586        }
1587    }
1588    #[inline(always)]
1589    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1590        self.step_ref(next.as_ref())
1591    }
1592    #[inline(always)]
1593    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1594        Self {
1595            root: self.root,
1596            inner: match next {
1597                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1598                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1599            },
1600        }
1601    }
1602
1603    pub fn to_schema(self) -> Schema {
1604        let mut cloner = SchemaSubtreeDeepCloner {
1605            old_root: self.root,
1606            old_to_new_names: Default::default(),
1607            named: Default::default(),
1608        };
1609        let piece = cloner.clone_piece_or_named(self.inner);
1610        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1611        let indices: BTreeMap<FullName, usize> = named
1612            .iter()
1613            .enumerate()
1614            .map(|(i, nsp)| (nsp.name.clone(), i))
1615            .collect();
1616        Schema {
1617            named,
1618            indices,
1619            top: piece,
1620        }
1621    }
1622
1623    pub fn namespace(self) -> Option<&'a str> {
1624        let SchemaNode { name, .. } = self.lookup();
1625        name.map(|FullName { namespace, .. }| namespace.as_str())
1626    }
1627}
1628
1629struct SchemaSubtreeDeepCloner<'a> {
1630    old_root: &'a Schema,
1631    old_to_new_names: BTreeMap<usize, usize>,
1632    named: Vec<Option<NamedSchemaPiece>>,
1633}
1634
1635impl<'a> SchemaSubtreeDeepCloner<'a> {
1636    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1637        match piece {
1638            SchemaPiece::Null => SchemaPiece::Null,
1639            SchemaPiece::Boolean => SchemaPiece::Boolean,
1640            SchemaPiece::Int => SchemaPiece::Int,
1641            SchemaPiece::Long => SchemaPiece::Long,
1642            SchemaPiece::Float => SchemaPiece::Float,
1643            SchemaPiece::Double => SchemaPiece::Double,
1644            SchemaPiece::Date => SchemaPiece::Date,
1645            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1646            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1647            SchemaPiece::Json => SchemaPiece::Json,
1648            SchemaPiece::Decimal {
1649                scale,
1650                precision,
1651                fixed_size,
1652            } => SchemaPiece::Decimal {
1653                scale: *scale,
1654                precision: *precision,
1655                fixed_size: *fixed_size,
1656            },
1657            SchemaPiece::Bytes => SchemaPiece::Bytes,
1658            SchemaPiece::String => SchemaPiece::String,
1659            SchemaPiece::Uuid => SchemaPiece::Uuid,
1660            SchemaPiece::Array(inner) => {
1661                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1662            }
1663            SchemaPiece::Map(inner) => {
1664                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1665            }
1666            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1667                schemas: us
1668                    .schemas
1669                    .iter()
1670                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1671                    .collect(),
1672                anon_variant_index: us.anon_variant_index.clone(),
1673                named_variant_index: us.named_variant_index.clone(),
1674            }),
1675            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1676            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1677            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1678            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1679            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1680            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1681            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1682            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1683            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1684            SchemaPiece::ResolveConcreteUnion {
1685                index,
1686                inner,
1687                n_reader_variants,
1688                reader_null_variant,
1689            } => SchemaPiece::ResolveConcreteUnion {
1690                index: *index,
1691                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1692                n_reader_variants: *n_reader_variants,
1693                reader_null_variant: *reader_null_variant,
1694            },
1695            SchemaPiece::ResolveUnionUnion {
1696                permutation,
1697                n_reader_variants,
1698                reader_null_variant,
1699            } => SchemaPiece::ResolveUnionUnion {
1700                permutation: permutation
1701                    .clone()
1702                    .into_iter()
1703                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1704                    .collect(),
1705                n_reader_variants: *n_reader_variants,
1706                reader_null_variant: *reader_null_variant,
1707            },
1708            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1709                SchemaPiece::ResolveUnionConcrete {
1710                    index: *index,
1711                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1712                }
1713            }
1714            SchemaPiece::Record {
1715                doc,
1716                fields,
1717                lookup,
1718            } => SchemaPiece::Record {
1719                doc: doc.clone(),
1720                fields: fields
1721                    .iter()
1722                    .map(|rf| RecordField {
1723                        name: rf.name.clone(),
1724                        doc: rf.doc.clone(),
1725                        default: rf.default.clone(),
1726                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1727                        order: rf.order,
1728                        position: rf.position,
1729                    })
1730                    .collect(),
1731                lookup: lookup.clone(),
1732            },
1733            SchemaPiece::Enum {
1734                doc,
1735                symbols,
1736                default_idx,
1737            } => SchemaPiece::Enum {
1738                doc: doc.clone(),
1739                symbols: symbols.clone(),
1740                default_idx: *default_idx,
1741            },
1742            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1743            SchemaPiece::ResolveRecord {
1744                defaults,
1745                fields,
1746                n_reader_fields,
1747            } => SchemaPiece::ResolveRecord {
1748                defaults: defaults.clone(),
1749                fields: fields
1750                    .iter()
1751                    .map(|rf| match rf {
1752                        ResolvedRecordField::Present(rf) => {
1753                            ResolvedRecordField::Present(RecordField {
1754                                name: rf.name.clone(),
1755                                doc: rf.doc.clone(),
1756                                default: rf.default.clone(),
1757                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1758                                order: rf.order,
1759                                position: rf.position,
1760                            })
1761                        }
1762                        ResolvedRecordField::Absent(writer_schema) => {
1763                            ResolvedRecordField::Absent(writer_schema.clone())
1764                        }
1765                    })
1766                    .collect(),
1767                n_reader_fields: *n_reader_fields,
1768            },
1769            SchemaPiece::ResolveEnum {
1770                doc,
1771                symbols,
1772                default,
1773            } => SchemaPiece::ResolveEnum {
1774                doc: doc.clone(),
1775                symbols: symbols.clone(),
1776                default: default.clone(),
1777            },
1778        }
1779    }
1780    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1781        match piece {
1782            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1783            SchemaPieceRefOrNamed::Named(index) => {
1784                let new_index = match self.old_to_new_names.entry(index) {
1785                    Entry::Vacant(ve) => {
1786                        let new_index = self.named.len();
1787                        self.named.push(None);
1788                        ve.insert(new_index);
1789                        let old_named_piece = self.old_root.lookup(index);
1790                        let new_named_piece = NamedSchemaPiece {
1791                            name: old_named_piece.name.clone(),
1792                            piece: self.clone_piece(&old_named_piece.piece),
1793                        };
1794                        self.named[new_index] = Some(new_named_piece);
1795                        new_index
1796                    }
1797                    Entry::Occupied(oe) => *oe.get(),
1798                };
1799                SchemaPieceOrNamed::Named(new_index)
1800            }
1801        }
1802    }
1803}
1804
1805impl<'a> SchemaNode<'a> {
1806    #[inline(always)]
1807    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1808        let (inner, name) = next.get_piece_and_name(self.root);
1809        Self {
1810            root: self.root,
1811            inner,
1812            name,
1813        }
1814    }
1815
1816    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1817        use serde_json::Value::*;
1818        let val = match (json, self.inner) {
1819            // A default value always matches the first variant of a union
1820            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1821                Some(variant) => AvroValue::Union {
1822                    index: 0,
1823                    inner: Box::new(self.step(variant).json_to_value(json)?),
1824                    n_variants: us.schemas.len(),
1825                    null_variant: us
1826                        .schemas
1827                        .iter()
1828                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1829                },
1830                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1831            },
1832            (Null, SchemaPiece::Null) => AvroValue::Null,
1833            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1834            (Number(n), piece) => {
1835                match piece {
1836                    piece if piece.is_underlying_int() => {
1837                        let i =
1838                            n.as_i64()
1839                                .and_then(|i| i32::try_from(i).ok())
1840                                .ok_or_else(|| {
1841                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1842                                })?;
1843                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1844                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1845                        })?
1846                    }
1847                    piece if piece.is_underlying_long() => {
1848                        let i = n.as_i64().ok_or_else(|| {
1849                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1850                        })?;
1851                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1852                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1853                        })?
1854                    }
1855                    SchemaPiece::Float => {
1856                        let f = n.as_f64().ok_or_else(|| {
1857                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1858                        })?;
1859                        AvroValue::Float(f as f32)
1860                    }
1861                    SchemaPiece::Double => {
1862                        let f = n.as_f64().ok_or_else(|| {
1863                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1864                        })?;
1865                        AvroValue::Double(f)
1866                    }
1867                    _ => {
1868                        return Err(ParseSchemaError(format!(
1869                            "Unexpected number in default: {}",
1870                            n
1871                        )));
1872                    }
1873                }
1874            }
1875            (String(s), piece)
1876                if s.eq_ignore_ascii_case("nan")
1877                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1878            {
1879                match piece {
1880                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1881                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1882                    _ => unreachable!(),
1883                }
1884            }
1885            (String(s), piece)
1886                if s.eq_ignore_ascii_case("infinity")
1887                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1888            {
1889                match piece {
1890                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1891                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1892                    _ => unreachable!(),
1893                }
1894            }
1895            (String(s), piece)
1896                if s.eq_ignore_ascii_case("-infinity")
1897                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1898            {
1899                match piece {
1900                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1901                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1902                    _ => unreachable!(),
1903                }
1904            }
1905            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1906            (
1907                String(s),
1908                SchemaPiece::Decimal {
1909                    precision, scale, ..
1910                },
1911            ) => AvroValue::Decimal(DecimalValue {
1912                precision: *precision,
1913                scale: *scale,
1914                unscaled: s.clone().into_bytes(),
1915            }),
1916            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1917            (Object(map), SchemaPiece::Record { fields, .. }) => {
1918                let field_values = fields
1919                    .iter()
1920                    .map(|rf| {
1921                        let jval = map.get(&rf.name).ok_or_else(|| {
1922                            ParseSchemaError(format!(
1923                                "Field not found in default value: {}",
1924                                rf.name
1925                            ))
1926                        })?;
1927                        let value = self.step(&rf.schema).json_to_value(jval)?;
1928                        Ok((rf.name.clone(), value))
1929                    })
1930                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1931                AvroValue::Record(field_values)
1932            }
1933            (String(s), SchemaPiece::Enum { symbols, .. }) => {
1934                match symbols.iter().find_position(|sym| s == *sym) {
1935                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1936                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1937                }
1938            }
1939            (Array(vals), SchemaPiece::Array(inner)) => {
1940                let node = self.step(&**inner);
1941                let vals = vals
1942                    .iter()
1943                    .map(|val| node.json_to_value(val))
1944                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1945                AvroValue::Array(vals)
1946            }
1947            (Object(map), SchemaPiece::Map(inner)) => {
1948                let node = self.step(&**inner);
1949                let map = map
1950                    .iter()
1951                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1952                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1953                AvroValue::Map(map)
1954            }
1955            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1956                AvroValue::Fixed(*size, s.clone().into_bytes())
1957            }
1958            _ => {
1959                return Err(ParseSchemaError(format!(
1960                    "Json default value {} does not match schema",
1961                    json
1962                )));
1963            }
1964        };
1965        Ok(val)
1966    }
1967}
1968
1969#[derive(Clone)]
1970struct SchemaSerContext<'a> {
1971    node: SchemaNodeOrNamed<'a>,
1972    // This does not logically need Rc<RefCell<_>> semantics --
1973    // it is only ever mutated in one stack frame at a time.
1974    // But AFAICT serde doesn't expose a way to
1975    // provide some mutable context to every node in the tree...
1976    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1977    /// The namespace of this node's parent, or "" by default
1978    enclosing_ns: &'a str,
1979}
1980
1981#[derive(Clone)]
1982struct RecordFieldSerContext<'a> {
1983    outer: &'a SchemaSerContext<'a>,
1984    inner: &'a RecordField,
1985}
1986
1987impl<'a> SchemaSerContext<'a> {
1988    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1989        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1990        Self {
1991            node: self.node.step_ref(next),
1992            seen_named: Rc::clone(&self.seen_named),
1993            enclosing_ns: ns,
1994        }
1995    }
1996}
1997
1998impl<'a> Serialize for SchemaSerContext<'a> {
1999    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2000    where
2001        S: Serializer,
2002    {
2003        match self.node.inner {
2004            SchemaPieceRefOrNamed::Piece(piece) => match piece {
2005                SchemaPiece::Null => serializer.serialize_str("null"),
2006                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2007                SchemaPiece::Int => serializer.serialize_str("int"),
2008                SchemaPiece::Long => serializer.serialize_str("long"),
2009                SchemaPiece::Float => serializer.serialize_str("float"),
2010                SchemaPiece::Double => serializer.serialize_str("double"),
2011                SchemaPiece::Date => {
2012                    let mut map = serializer.serialize_map(Some(2))?;
2013                    map.serialize_entry("type", "int")?;
2014                    map.serialize_entry("logicalType", "date")?;
2015                    map.end()
2016                }
2017                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2018                    let mut map = serializer.serialize_map(Some(2))?;
2019                    map.serialize_entry("type", "long")?;
2020                    if piece == &SchemaPiece::TimestampMilli {
2021                        map.serialize_entry("logicalType", "timestamp-millis")?;
2022                    } else {
2023                        map.serialize_entry("logicalType", "timestamp-micros")?;
2024                    }
2025                    map.end()
2026                }
2027                SchemaPiece::Decimal {
2028                    precision,
2029                    scale,
2030                    fixed_size: None,
2031                } => {
2032                    let mut map = serializer.serialize_map(Some(4))?;
2033                    map.serialize_entry("type", "bytes")?;
2034                    map.serialize_entry("precision", precision)?;
2035                    map.serialize_entry("scale", scale)?;
2036                    map.serialize_entry("logicalType", "decimal")?;
2037                    map.end()
2038                }
2039                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2040                SchemaPiece::String => serializer.serialize_str("string"),
2041                SchemaPiece::Array(inner) => {
2042                    let mut map = serializer.serialize_map(Some(2))?;
2043                    map.serialize_entry("type", "array")?;
2044                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2045                    map.end()
2046                }
2047                SchemaPiece::Map(inner) => {
2048                    let mut map = serializer.serialize_map(Some(2))?;
2049                    map.serialize_entry("type", "map")?;
2050                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2051                    map.end()
2052                }
2053                SchemaPiece::Union(inner) => {
2054                    let variants = inner.variants();
2055                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2056                    for v in variants {
2057                        seq.serialize_element(&self.step(v.as_ref()))?;
2058                    }
2059                    seq.end()
2060                }
2061                SchemaPiece::Json => {
2062                    let mut map = serializer.serialize_map(Some(2))?;
2063                    map.serialize_entry("type", "string")?;
2064                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2065                    map.end()
2066                }
2067                SchemaPiece::Uuid => {
2068                    let mut map = serializer.serialize_map(Some(4))?;
2069                    map.serialize_entry("type", "string")?;
2070                    map.serialize_entry("logicalType", "uuid")?;
2071                    map.end()
2072                }
2073                SchemaPiece::Record { .. }
2074                | SchemaPiece::Decimal {
2075                    fixed_size: Some(_),
2076                    ..
2077                }
2078                | SchemaPiece::Enum { .. }
2079                | SchemaPiece::Fixed { .. } => {
2080                    unreachable!("Unexpected named schema piece in anonymous schema position")
2081                }
2082                SchemaPiece::ResolveIntLong
2083                | SchemaPiece::ResolveDateTimestamp
2084                | SchemaPiece::ResolveIntFloat
2085                | SchemaPiece::ResolveIntDouble
2086                | SchemaPiece::ResolveLongFloat
2087                | SchemaPiece::ResolveLongDouble
2088                | SchemaPiece::ResolveFloatDouble
2089                | SchemaPiece::ResolveConcreteUnion { .. }
2090                | SchemaPiece::ResolveUnionUnion { .. }
2091                | SchemaPiece::ResolveUnionConcrete { .. }
2092                | SchemaPiece::ResolveRecord { .. }
2093                | SchemaPiece::ResolveIntTsMicro
2094                | SchemaPiece::ResolveIntTsMilli
2095                | SchemaPiece::ResolveEnum { .. } => {
2096                    panic!("Attempted to serialize resolved schema")
2097                }
2098            },
2099            SchemaPieceRefOrNamed::Named(index) => {
2100                let mut map = self.seen_named.borrow_mut();
2101                let named_piece = match map.get(&index) {
2102                    Some(name) => {
2103                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2104                    }
2105                    None => self.node.root.lookup(index),
2106                };
2107                let name = &named_piece.name;
2108                map.insert(index, name.clone());
2109                std::mem::drop(map);
2110                match &named_piece.piece {
2111                    SchemaPiece::Record { doc, fields, .. } => {
2112                        let mut map = serializer.serialize_map(None)?;
2113                        map.serialize_entry("type", "record")?;
2114                        map.serialize_entry("name", &name.name)?;
2115                        if self.enclosing_ns != &name.namespace {
2116                            map.serialize_entry("namespace", &name.namespace)?;
2117                        }
2118                        if let Some(docstr) = doc {
2119                            map.serialize_entry("doc", docstr)?;
2120                        }
2121                        // TODO (brennan) - serialize aliases
2122                        map.serialize_entry(
2123                            "fields",
2124                            &fields
2125                                .iter()
2126                                .map(|f| RecordFieldSerContext {
2127                                    outer: self,
2128                                    inner: f,
2129                                })
2130                                .collect::<Vec<_>>(),
2131                        )?;
2132                        map.end()
2133                    }
2134                    SchemaPiece::Enum {
2135                        symbols,
2136                        default_idx,
2137                        ..
2138                    } => {
2139                        let mut map = serializer.serialize_map(None)?;
2140                        map.serialize_entry("type", "enum")?;
2141                        map.serialize_entry("name", &name.name)?;
2142                        if self.enclosing_ns != &name.namespace {
2143                            map.serialize_entry("namespace", &name.namespace)?;
2144                        }
2145                        map.serialize_entry("symbols", symbols)?;
2146                        if let Some(default_idx) = *default_idx {
2147                            assert!(default_idx < symbols.len());
2148                            map.serialize_entry("default", &symbols[default_idx])?;
2149                        }
2150                        map.end()
2151                    }
2152                    SchemaPiece::Fixed { size } => {
2153                        let mut map = serializer.serialize_map(None)?;
2154                        map.serialize_entry("type", "fixed")?;
2155                        map.serialize_entry("name", &name.name)?;
2156                        if self.enclosing_ns != &name.namespace {
2157                            map.serialize_entry("namespace", &name.namespace)?;
2158                        }
2159                        map.serialize_entry("size", size)?;
2160                        map.end()
2161                    }
2162                    SchemaPiece::Decimal {
2163                        scale,
2164                        precision,
2165                        fixed_size: Some(size),
2166                    } => {
2167                        let mut map = serializer.serialize_map(Some(6))?;
2168                        map.serialize_entry("type", "fixed")?;
2169                        map.serialize_entry("logicalType", "decimal")?;
2170                        map.serialize_entry("name", &name.name)?;
2171                        if self.enclosing_ns != &name.namespace {
2172                            map.serialize_entry("namespace", &name.namespace)?;
2173                        }
2174                        map.serialize_entry("size", size)?;
2175                        map.serialize_entry("precision", precision)?;
2176                        map.serialize_entry("scale", scale)?;
2177                        map.end()
2178                    }
2179                    SchemaPiece::Null
2180                    | SchemaPiece::Boolean
2181                    | SchemaPiece::Int
2182                    | SchemaPiece::Long
2183                    | SchemaPiece::Float
2184                    | SchemaPiece::Double
2185                    | SchemaPiece::Date
2186                    | SchemaPiece::TimestampMilli
2187                    | SchemaPiece::TimestampMicro
2188                    | SchemaPiece::Decimal {
2189                        fixed_size: None, ..
2190                    }
2191                    | SchemaPiece::Bytes
2192                    | SchemaPiece::String
2193                    | SchemaPiece::Array(_)
2194                    | SchemaPiece::Map(_)
2195                    | SchemaPiece::Union(_)
2196                    | SchemaPiece::Uuid
2197                    | SchemaPiece::Json => {
2198                        unreachable!("Unexpected anonymous schema piece in named schema position")
2199                    }
2200                    SchemaPiece::ResolveIntLong
2201                    | SchemaPiece::ResolveDateTimestamp
2202                    | SchemaPiece::ResolveIntFloat
2203                    | SchemaPiece::ResolveIntDouble
2204                    | SchemaPiece::ResolveLongFloat
2205                    | SchemaPiece::ResolveLongDouble
2206                    | SchemaPiece::ResolveFloatDouble
2207                    | SchemaPiece::ResolveConcreteUnion { .. }
2208                    | SchemaPiece::ResolveUnionUnion { .. }
2209                    | SchemaPiece::ResolveUnionConcrete { .. }
2210                    | SchemaPiece::ResolveRecord { .. }
2211                    | SchemaPiece::ResolveIntTsMilli
2212                    | SchemaPiece::ResolveIntTsMicro
2213                    | SchemaPiece::ResolveEnum { .. } => {
2214                        panic!("Attempted to serialize resolved schema")
2215                    }
2216                }
2217            }
2218        }
2219    }
2220}
2221
2222impl Serialize for Schema {
2223    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2224    where
2225        S: Serializer,
2226    {
2227        let ctx = SchemaSerContext {
2228            node: SchemaNodeOrNamed {
2229                root: self,
2230                inner: self.top.as_ref(),
2231            },
2232            seen_named: Rc::new(RefCell::new(Default::default())),
2233            enclosing_ns: "",
2234        };
2235        ctx.serialize(serializer)
2236    }
2237}
2238
2239impl<'a> Serialize for RecordFieldSerContext<'a> {
2240    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2241    where
2242        S: Serializer,
2243    {
2244        let mut map = serializer.serialize_map(None)?;
2245        map.serialize_entry("name", &self.inner.name)?;
2246        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2247        if let Some(default) = &self.inner.default {
2248            map.serialize_entry("default", default)?;
2249        }
2250        if let Some(doc) = &self.inner.doc {
2251            map.serialize_entry("doc", doc)?;
2252        }
2253        map.end()
2254    }
2255}
2256
2257/// Parses a **valid** avro schema into the Parsing Canonical Form.
2258/// <https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas>
2259fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2260    pcf(schema, "", false)
2261}
2262
2263fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2264    match schema {
2265        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2266        serde_json::Value::String(s) => pcf_string(s),
2267        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2268        serde_json::Value::Number(n) => n.to_string(),
2269        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2270    }
2271}
2272
2273fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2274    // Look for the namespace variant up front.
2275    let default_ns = schema
2276        .get("namespace")
2277        .and_then(|v| v.as_str())
2278        .unwrap_or(enclosing_ns);
2279    let mut fields = Vec::new();
2280    let mut found_next_ns = None;
2281    let mut deferred_values = vec![];
2282    for (k, v) in schema {
2283        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
2284        if schema.len() == 1 && k == "type" {
2285            // Invariant: function is only callable from a valid schema, so this is acceptable.
2286            if let serde_json::Value::String(s) = v {
2287                return pcf_string(s);
2288            }
2289        }
2290
2291        // Strip out unused fields ([STRIP] rule)
2292        if field_ordering_position(k).is_none() {
2293            continue;
2294        }
2295
2296        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
2297        if k == "name" {
2298            // The `fields` stanza needs special handling, as it has "name"
2299            // fields that don't get canonicalized (since they are not type names).
2300            if in_fields {
2301                fields.push((
2302                    k,
2303                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2304                ));
2305                continue;
2306            }
2307            // Invariant: Only valid schemas. Must be a string.
2308            let name = v.as_str().unwrap();
2309            assert!(
2310                found_next_ns.is_none(),
2311                "`name` must not be specified multiple times"
2312            );
2313            let next_ns = match name.rsplit_once('.') {
2314                None => default_ns,
2315                Some((ns, _name)) => ns,
2316            };
2317            found_next_ns = Some(next_ns);
2318            let n = if next_ns.is_empty() {
2319                Cow::Borrowed(name)
2320            } else {
2321                Cow::Owned(format!("{next_ns}.{name}"))
2322            };
2323            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2324            continue;
2325        }
2326
2327        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
2328        if k == "size" {
2329            let i = match v.as_str() {
2330                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2331                None => v.as_i64().unwrap(),
2332            };
2333            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2334            continue;
2335        }
2336
2337        // For anything else, recursively process the result
2338        // (deferred until we know the namespace)
2339        deferred_values.push((k, v));
2340    }
2341
2342    let next_ns = found_next_ns.unwrap_or(default_ns);
2343    for (k, v) in deferred_values {
2344        fields.push((
2345            k,
2346            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2347        ));
2348    }
2349
2350    // Sort the fields by their canonical ordering ([ORDER] rule).
2351    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2352    let inter = fields
2353        .into_iter()
2354        .map(|(_, v)| v)
2355        .collect::<Vec<_>>()
2356        .join(",");
2357    format!("{{{}}}", inter)
2358}
2359
2360fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2361    let inter = arr
2362        .iter()
2363        .map(|s| pcf(s, enclosing_ns, in_fields))
2364        .collect::<Vec<String>>()
2365        .join(",");
2366    format!("[{}]", inter)
2367}
2368
2369fn pcf_string(s: &str) -> String {
2370    format!("\"{}\"", s)
2371}
2372
2373// Used to define the ordering and inclusion of fields.
2374fn field_ordering_position(field: &str) -> Option<usize> {
2375    let v = match field {
2376        "name" => 1,
2377        "type" => 2,
2378        "fields" => 3,
2379        "symbols" => 4,
2380        "items" => 5,
2381        "values" => 6,
2382        "size" => 7,
2383        _ => return None,
2384    };
2385
2386    Some(v)
2387}
2388
2389#[cfg(test)]
2390mod tests {
2391    use mz_ore::{assert_err, assert_ok};
2392
2393    use crate::types::{Record, ToAvro};
2394
2395    use super::*;
2396
2397    fn check_schema(schema: &str, expected: SchemaPiece) {
2398        let schema = Schema::from_str(schema).unwrap();
2399        assert_eq!(&expected, schema.top_node().inner);
2400
2401        // Test serialization round trip
2402        let schema = serde_json::to_string(&schema).unwrap();
2403        let schema = Schema::from_str(&schema).unwrap();
2404        assert_eq!(&expected, schema.top_node().inner);
2405    }
2406
2407    #[mz_ore::test]
2408    fn test_primitive_schema() {
2409        check_schema("\"null\"", SchemaPiece::Null);
2410        check_schema("\"int\"", SchemaPiece::Int);
2411        check_schema("\"double\"", SchemaPiece::Double);
2412    }
2413
2414    #[mz_ore::test]
2415    fn test_array_schema() {
2416        check_schema(
2417            r#"{"type": "array", "items": "string"}"#,
2418            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2419        );
2420    }
2421
2422    #[mz_ore::test]
2423    fn test_map_schema() {
2424        check_schema(
2425            r#"{"type": "map", "values": "double"}"#,
2426            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2427        );
2428    }
2429
2430    #[mz_ore::test]
2431    fn test_union_schema() {
2432        check_schema(
2433            r#"["null", "int"]"#,
2434            SchemaPiece::Union(
2435                UnionSchema::new(vec![
2436                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2437                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2438                ])
2439                .unwrap(),
2440            ),
2441        );
2442    }
2443
2444    #[mz_ore::test]
2445    fn test_multi_union_schema() {
2446        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2447        assert_ok!(schema);
2448        let schema = schema.unwrap();
2449        let node = schema.top_node();
2450        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2451        let union_schema = match node.inner {
2452            SchemaPiece::Union(u) => u,
2453            _ => unreachable!(),
2454        };
2455        assert_eq!(union_schema.variants().len(), 5);
2456        let mut variants = union_schema.variants().iter();
2457        assert_eq!(
2458            SchemaKind::from(node.step(variants.next().unwrap())),
2459            SchemaKind::Null
2460        );
2461        assert_eq!(
2462            SchemaKind::from(node.step(variants.next().unwrap())),
2463            SchemaKind::Int
2464        );
2465        assert_eq!(
2466            SchemaKind::from(node.step(variants.next().unwrap())),
2467            SchemaKind::Float
2468        );
2469        assert_eq!(
2470            SchemaKind::from(node.step(variants.next().unwrap())),
2471            SchemaKind::String
2472        );
2473        assert_eq!(
2474            SchemaKind::from(node.step(variants.next().unwrap())),
2475            SchemaKind::Bytes
2476        );
2477        assert_eq!(variants.next(), None);
2478    }
2479
2480    #[mz_ore::test]
2481    fn test_record_schema() {
2482        let schema = r#"
2483                {
2484                    "type": "record",
2485                    "name": "test",
2486                    "doc": "record doc",
2487                    "fields": [
2488                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2489                        {"name": "b", "doc": "b doc", "type": "string"}
2490                    ]
2491                }
2492            "#;
2493
2494        let mut lookup = BTreeMap::new();
2495        lookup.insert("a".to_owned(), 0);
2496        lookup.insert("b".to_owned(), 1);
2497
2498        let expected = SchemaPiece::Record {
2499            doc: Some("record doc".to_string()),
2500            fields: vec![
2501                RecordField {
2502                    name: "a".to_string(),
2503                    doc: Some("a doc".to_string()),
2504                    default: Some(Value::Number(42i64.into())),
2505                    schema: SchemaPiece::Long.into(),
2506                    order: RecordFieldOrder::Ascending,
2507                    position: 0,
2508                },
2509                RecordField {
2510                    name: "b".to_string(),
2511                    doc: Some("b doc".to_string()),
2512                    default: None,
2513                    schema: SchemaPiece::String.into(),
2514                    order: RecordFieldOrder::Ascending,
2515                    position: 1,
2516                },
2517            ],
2518            lookup,
2519        };
2520
2521        check_schema(schema, expected);
2522    }
2523
2524    #[mz_ore::test]
2525    fn test_enum_schema() {
2526        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2527
2528        let expected = SchemaPiece::Enum {
2529            doc: None,
2530            symbols: vec![
2531                "diamonds".to_owned(),
2532                "spades".to_owned(),
2533                "jokers".to_owned(),
2534                "clubs".to_owned(),
2535                "hearts".to_owned(),
2536            ],
2537            default_idx: Some(2),
2538        };
2539
2540        check_schema(schema, expected);
2541
2542        let bad_schema = Schema::from_str(
2543            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2544        );
2545
2546        assert_err!(bad_schema);
2547    }
2548
2549    #[mz_ore::test]
2550    fn test_fixed_schema() {
2551        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2552
2553        let expected = SchemaPiece::Fixed { size: 16usize };
2554
2555        check_schema(schema, expected);
2556    }
2557
2558    #[mz_ore::test]
2559    fn test_date_schema() {
2560        let kinds = &[
2561            r#"{
2562                    "type": "int",
2563                    "name": "datish",
2564                    "logicalType": "date"
2565                }"#,
2566            r#"{
2567                    "type": "int",
2568                    "name": "datish",
2569                    "connect.name": "io.debezium.time.Date"
2570                }"#,
2571            r#"{
2572                    "type": "int",
2573                    "name": "datish",
2574                    "connect.name": "org.apache.kafka.connect.data.Date"
2575                }"#,
2576        ];
2577        for kind in kinds {
2578            check_schema(*kind, SchemaPiece::Date);
2579
2580            let schema = Schema::from_str(*kind).unwrap();
2581            assert_eq!(
2582                serde_json::to_string(&schema).unwrap(),
2583                r#"{"type":"int","logicalType":"date"}"#
2584            );
2585        }
2586    }
2587
2588    #[mz_ore::test]
2589    fn new_field_in_middle() {
2590        let reader = r#"{
2591            "type": "record",
2592            "name": "MyRecord",
2593            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2594        }"#;
2595        let writer = r#"{
2596            "type": "record",
2597            "name": "MyRecord",
2598            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2599        }"#;
2600        let reader = Schema::from_str(reader).unwrap();
2601        let writer = Schema::from_str(writer).unwrap();
2602
2603        let mut record = Record::new(writer.top_node()).unwrap();
2604        record.put("f1", 1);
2605        record.put("f2", 2);
2606        record.put("f_interposed", 42);
2607
2608        let value = record.avro();
2609
2610        let mut buf = vec![];
2611        crate::encode::encode(&value, &writer, &mut buf);
2612
2613        let resolved = resolve_schemas(&writer, &reader).unwrap();
2614
2615        let reader = &mut &buf[..];
2616        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2617        let expected = crate::types::Value::Record(vec![
2618            ("f1".to_string(), crate::types::Value::Int(1)),
2619            ("f2".to_string(), crate::types::Value::Int(2)),
2620        ]);
2621        assert_eq!(reader_value, expected);
2622        assert!(reader.is_empty()); // all bytes should have been consumed
2623    }
2624
2625    #[mz_ore::test]
2626    fn new_field_at_end() {
2627        let reader = r#"{
2628            "type": "record",
2629            "name": "MyRecord",
2630            "fields": [{"name": "f1", "type": "int"}]
2631        }"#;
2632        let writer = r#"{
2633            "type": "record",
2634            "name": "MyRecord",
2635            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2636        }"#;
2637        let reader = Schema::from_str(reader).unwrap();
2638        let writer = Schema::from_str(writer).unwrap();
2639
2640        let mut record = Record::new(writer.top_node()).unwrap();
2641        record.put("f1", 1);
2642        record.put("f2", 2);
2643
2644        let value = record.avro();
2645
2646        let mut buf = vec![];
2647        crate::encode::encode(&value, &writer, &mut buf);
2648
2649        let resolved = resolve_schemas(&writer, &reader).unwrap();
2650
2651        let reader = &mut &buf[..];
2652        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2653        let expected =
2654            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2655        assert_eq!(reader_value, expected);
2656        assert!(reader.is_empty()); // all bytes should have been consumed
2657    }
2658
2659    #[mz_ore::test]
2660    fn default_non_nums() {
2661        let reader = r#"{
2662            "type": "record",
2663            "name": "MyRecord",
2664            "fields": [
2665                {"name": "f1", "type": "double", "default": "NaN"},
2666                {"name": "f2", "type": "double", "default": "Infinity"},
2667                {"name": "f3", "type": "double", "default": "-Infinity"}
2668            ]
2669        }
2670        "#;
2671        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2672
2673        let writer_schema = Schema::from_str(writer).unwrap();
2674        let reader_schema = Schema::from_str(reader).unwrap();
2675        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2676
2677        let record = Record::new(writer_schema.top_node()).unwrap();
2678
2679        let value = record.avro();
2680        let mut buf = vec![];
2681        crate::encode::encode(&value, &writer_schema, &mut buf);
2682
2683        let reader = &mut &buf[..];
2684        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2685        let expected = crate::types::Value::Record(vec![
2686            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2687            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2688            (
2689                "f3".to_string(),
2690                crate::types::Value::Double(f64::NEG_INFINITY),
2691            ),
2692        ]);
2693
2694        #[derive(Debug)]
2695        struct NanEq(crate::types::Value);
2696        impl std::cmp::PartialEq for NanEq {
2697            fn eq(&self, other: &Self) -> bool {
2698                match (self, other) {
2699                    (
2700                        NanEq(crate::types::Value::Double(x)),
2701                        NanEq(crate::types::Value::Double(y)),
2702                    ) if x.is_nan() && y.is_nan() => true,
2703                    (
2704                        NanEq(crate::types::Value::Float(x)),
2705                        NanEq(crate::types::Value::Float(y)),
2706                    ) if x.is_nan() && y.is_nan() => true,
2707                    (
2708                        NanEq(crate::types::Value::Record(xs)),
2709                        NanEq(crate::types::Value::Record(ys)),
2710                    ) => {
2711                        let xs = xs
2712                            .iter()
2713                            .cloned()
2714                            .map(|(k, v)| (k, NanEq(v)))
2715                            .collect::<Vec<_>>();
2716                        let ys = ys
2717                            .iter()
2718                            .cloned()
2719                            .map(|(k, v)| (k, NanEq(v)))
2720                            .collect::<Vec<_>>();
2721
2722                        xs == ys
2723                    }
2724                    (NanEq(x), NanEq(y)) => x == y,
2725                }
2726            }
2727        }
2728
2729        assert_eq!(NanEq(reader_value), NanEq(expected));
2730        assert!(reader.is_empty());
2731    }
2732
2733    #[mz_ore::test]
2734    fn test_decimal_schemas() {
2735        let schema = r#"{
2736                "type": "fixed",
2737                "name": "dec",
2738                "size": 8,
2739                "logicalType": "decimal",
2740                "precision": 12,
2741                "scale": 5
2742            }"#;
2743        let expected = SchemaPiece::Decimal {
2744            precision: 12,
2745            scale: 5,
2746            fixed_size: Some(8),
2747        };
2748        check_schema(schema, expected);
2749
2750        let schema = r#"{
2751                "type": "bytes",
2752                "logicalType": "decimal",
2753                "precision": 12,
2754                "scale": 5
2755            }"#;
2756        let expected = SchemaPiece::Decimal {
2757            precision: 12,
2758            scale: 5,
2759            fixed_size: None,
2760        };
2761        check_schema(schema, expected);
2762
2763        let res = Schema::from_str(
2764            r#"["bytes", {
2765                "type": "bytes",
2766                "logicalType": "decimal",
2767                "precision": 12,
2768                "scale": 5
2769            }]"#,
2770        );
2771        assert_eq!(
2772            res.unwrap_err().to_string(),
2773            "Schema parse error: Unions cannot contain duplicate types"
2774        );
2775
2776        let writer_schema = Schema::from_str(
2777            r#"["null", {
2778                "type": "bytes"
2779            }]"#,
2780        )
2781        .unwrap();
2782        let reader_schema = Schema::from_str(
2783            r#"["null", {
2784                "type": "bytes",
2785                "logicalType": "decimal",
2786                "precision": 12,
2787                "scale": 5
2788            }]"#,
2789        )
2790        .unwrap();
2791        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2792
2793        let expected = SchemaPiece::ResolveUnionUnion {
2794            permutation: vec![
2795                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2796                Ok((
2797                    1,
2798                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2799                        precision: 12,
2800                        scale: 5,
2801                        fixed_size: None,
2802                    }),
2803                )),
2804            ],
2805            n_reader_variants: 2,
2806            reader_null_variant: Some(0),
2807        };
2808        assert_eq!(resolved.top_node().inner, &expected);
2809    }
2810
2811    #[mz_ore::test]
2812    fn test_no_documentation() {
2813        let schema =
2814            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2815                .unwrap();
2816
2817        let doc = match schema.top_node().inner {
2818            SchemaPiece::Enum { doc, .. } => doc.clone(),
2819            _ => panic!(),
2820        };
2821
2822        assert_none!(doc);
2823    }
2824
2825    #[mz_ore::test]
2826    fn test_documentation() {
2827        let schema = Schema::from_str(
2828                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2829            ).unwrap();
2830
2831        let doc = match schema.top_node().inner {
2832            SchemaPiece::Enum { doc, .. } => doc.clone(),
2833            _ => None,
2834        };
2835
2836        assert_eq!("Some documentation".to_owned(), doc.unwrap());
2837    }
2838
2839    #[mz_ore::test]
2840    fn test_namespaces_and_names() {
2841        // When name and namespace specified, full name should contain both.
2842        let schema = Schema::from_str(
2843            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2844        )
2845        .unwrap();
2846        assert_eq!(schema.named.len(), 1);
2847        assert_eq!(
2848            schema.named[0].name,
2849            FullName {
2850                name: "name".into(),
2851                namespace: "namespace".into()
2852            }
2853        );
2854
2855        // When name contains dots, parse the dot-separated name as the namespace.
2856        let schema =
2857            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2858                .unwrap();
2859        assert_eq!(schema.named.len(), 1);
2860        assert_eq!(
2861            schema.named[0].name,
2862            FullName {
2863                name: "dots".into(),
2864                namespace: "name.has".into()
2865            }
2866        );
2867
2868        // Same as above, ignore any provided namespace.
2869        let schema = Schema::from_str(
2870            r#"{"type": "enum", "namespace": "namespace",
2871            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2872        )
2873        .unwrap();
2874        assert_eq!(schema.named.len(), 1);
2875        assert_eq!(
2876            schema.named[0].name,
2877            FullName {
2878                name: "dots".into(),
2879                namespace: "name.has".into()
2880            }
2881        );
2882
2883        // Use default namespace when namespace is not provided.
2884        // Materialize uses "" as the default namespace.
2885        let schema = Schema::from_str(
2886            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2887            "fields": [{"name": "name", "type": "string"}]}"#,
2888        )
2889        .unwrap();
2890        assert_eq!(schema.named.len(), 1);
2891        assert_eq!(
2892            schema.named[0].name,
2893            FullName {
2894                name: "TestDoc".into(),
2895                namespace: "".into()
2896            }
2897        );
2898
2899        // Empty namespace strings should be allowed.
2900        let schema = Schema::from_str(
2901            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2902            "fields": [{"name": "name", "type": "string"}]}"#,
2903        )
2904        .unwrap();
2905        assert_eq!(schema.named.len(), 1);
2906        assert_eq!(
2907            schema.named[0].name,
2908            FullName {
2909                name: "TestDoc".into(),
2910                namespace: "".into()
2911            }
2912        );
2913
2914        // Equality of names is defined on the FullName and is case-sensitive.
2915        let first = Schema::from_str(
2916            r#"{"type": "fixed", "namespace": "namespace",
2917            "name": "name", "size": 1}"#,
2918        )
2919        .unwrap();
2920        let second = Schema::from_str(
2921            r#"{"type": "fixed", "name": "namespace.name",
2922            "size": 1}"#,
2923        )
2924        .unwrap();
2925        assert_eq!(first.named[0].name, second.named[0].name);
2926
2927        let first = Schema::from_str(
2928            r#"{"type": "fixed", "namespace": "namespace",
2929            "name": "name", "size": 1}"#,
2930        )
2931        .unwrap();
2932        let second = Schema::from_str(
2933            r#"{"type": "fixed", "name": "namespace.Name",
2934            "size": 1}"#,
2935        )
2936        .unwrap();
2937        assert_ne!(first.named[0].name, second.named[0].name);
2938
2939        let first = Schema::from_str(
2940            r#"{"type": "fixed", "namespace": "Namespace",
2941            "name": "name", "size": 1}"#,
2942        )
2943        .unwrap();
2944        let second = Schema::from_str(
2945            r#"{"type": "fixed", "namespace": "namespace",
2946            "name": "name", "size": 1}"#,
2947        )
2948        .unwrap();
2949        assert_ne!(first.named[0].name, second.named[0].name);
2950
2951        // The name portion of a fullname, record field names, and enum symbols must:
2952        // start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
2953        assert!(
2954            Schema::from_str(
2955                r#"{"type": "record", "name": "99 problems but a name aint one",
2956            "fields": [{"name": "name", "type": "string"}]}"#
2957            )
2958            .is_err()
2959        );
2960
2961        assert!(
2962            Schema::from_str(
2963                r#"{"type": "record", "name": "!!!",
2964            "fields": [{"name": "name", "type": "string"}]}"#
2965            )
2966            .is_err()
2967        );
2968
2969        assert!(
2970            Schema::from_str(
2971                r#"{"type": "record", "name": "_valid_until_©",
2972            "fields": [{"name": "name", "type": "string"}]}"#
2973            )
2974            .is_err()
2975        );
2976
2977        // Use previously defined names and namespaces as type.
2978        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2979              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2980              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2981              {"name": "f3", "type": "MyEnum"},
2982              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2983              {"name": "f5", "type": "other.namespace.OtherEnum"},
2984              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2985              {"name": "f7", "type": "some.other.ThirdEnum"}
2986             ]}"#).unwrap();
2987        assert_eq!(schema.named.len(), 4);
2988
2989        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2990            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // f1
2991            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // f2
2992            assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); // f3
2993            assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); // f4
2994            assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); // f5
2995            assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); // f6
2996            assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); // f7
2997        } else {
2998            panic!("Expected SchemaPiece::Record, found something else");
2999        }
3000
3001        let schema = Schema::from_str(
3002            r#"{"type": "record", "name": "x.Y", "fields": [
3003              {"name": "e", "type":
3004                {"type": "record", "name": "Z", "fields": [
3005                  {"name": "f", "type": "x.Y"},
3006                  {"name": "g", "type": "x.Z"}
3007                ]}
3008              }
3009            ]}"#,
3010        )
3011        .unwrap();
3012        assert_eq!(schema.named.len(), 2);
3013
3014        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3015            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // e
3016        } else {
3017            panic!("Expected SchemaPiece::Record, found something else");
3018        }
3019
3020        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3021            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); // f
3022            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // g
3023        } else {
3024            panic!("Expected SchemaPiece::Record, found something else");
3025        }
3026
3027        let schema = Schema::from_str(
3028            r#"{"type": "record", "name": "R", "fields": [
3029              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3030                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3031                 "symbols": ["Foo", "Bar"]}
3032                }
3033              ]}},
3034              {"name": "t", "type": "Z"}
3035            ]}"#,
3036        )
3037        .unwrap();
3038        assert_eq!(schema.named.len(), 3);
3039
3040        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3041            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // s
3042            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); // t - refers to "".Z
3043        } else {
3044            panic!("Expected SchemaPiece::Record, found something else");
3045        }
3046    }
3047
3048    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
3049    // compile, they pass.
3050    #[mz_ore::test]
3051    fn test_schema_is_send() {
3052        fn send<S: Send>(_s: S) {}
3053
3054        let schema = Schema {
3055            named: vec![],
3056            indices: Default::default(),
3057            top: SchemaPiece::Null.into(),
3058        };
3059        send(schema);
3060    }
3061
3062    #[mz_ore::test]
3063    fn test_schema_is_sync() {
3064        fn sync<S: Sync>(_s: S) {}
3065
3066        let schema = Schema {
3067            named: vec![],
3068            indices: Default::default(),
3069            top: SchemaPiece::Null.into(),
3070        };
3071        sync(&schema);
3072        sync(schema);
3073    }
3074
3075    #[mz_ore::test]
3076    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
3077    fn test_schema_fingerprint() {
3078        let raw_schema = r#"
3079        {
3080            "type": "record",
3081            "name": "test",
3082            "fields": [
3083                {"name": "a", "type": "long", "default": 42},
3084                {"name": "b", "type": "string"}
3085            ]
3086        }
3087    "#;
3088        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3089        let schema = Schema::from_str(raw_schema).unwrap();
3090        assert_eq!(&schema.canonical_form(), expected_canonical);
3091        let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3092            .as_ref()
3093            .iter()
3094            .map(|b| format!("{b:02x}"))
3095            .collect::<String>();
3096        assert_eq!(
3097            format!("{}", schema.fingerprint(&digest::SHA256)),
3098            expected_fingerprint
3099        );
3100
3101        let raw_schema = r#"
3102{
3103  "type": "record",
3104  "name": "ns.r1",
3105  "namespace": "ignored",
3106  "fields": [
3107    {
3108      "name": "f1",
3109      "type": {
3110        "type": "fixed",
3111        "name": "r2",
3112        "size": 1
3113      }
3114    }
3115  ]
3116}
3117"#;
3118        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3119        let schema = Schema::from_str(raw_schema).unwrap();
3120        assert_eq!(&schema.canonical_form(), expected_canonical);
3121        let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3122            .as_ref()
3123            .iter()
3124            .map(|b| format!("{b:02x}"))
3125            .collect::<String>();
3126        assert_eq!(
3127            format!("{}", schema.fingerprint(&digest::SHA256)),
3128            expected_fingerprint
3129        );
3130    }
3131
3132    #[mz_ore::test]
3133    fn test_make_valid() {
3134        for (input, expected) in [
3135            ("foo", "foo"),
3136            ("az99", "az99"),
3137            ("99az", "_99az"),
3138            ("is,bad", "is_bad"),
3139            ("@#$%", "____"),
3140            ("i-amMisBehaved!", "i_amMisBehaved_"),
3141            ("", "_"),
3142        ] {
3143            let actual = Name::make_valid(input);
3144            assert_eq!(expected, actual, "Name::make_valid({input})")
3145        }
3146    }
3147
3148    #[mz_ore::test]
3149    fn test_parse_with_simple_reference() {
3150        // Schema A defines a User record
3151        let ref_schema_json = r#"{
3152            "type": "record",
3153            "name": "User",
3154            "namespace": "com.example",
3155            "fields": [{"name": "id", "type": "int"}]
3156        }"#;
3157
3158        // Schema B references User
3159        let primary_json = r#"{
3160            "type": "record",
3161            "name": "Event",
3162            "namespace": "com.example",
3163            "fields": [{"name": "user", "type": "com.example.User"}]
3164        }"#;
3165
3166        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3167        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3168
3169        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3170
3171        // Verify both Event and User types are in the schema
3172        let user_name = FullName {
3173            name: "User".to_string(),
3174            namespace: "com.example".to_string(),
3175        };
3176        let event_name = FullName {
3177            name: "Event".to_string(),
3178            namespace: "com.example".to_string(),
3179        };
3180
3181        assert!(
3182            schema.indices.contains_key(&user_name),
3183            "User type should be in schema indices"
3184        );
3185        assert!(
3186            schema.indices.contains_key(&event_name),
3187            "Event type should be in schema indices"
3188        );
3189
3190        // Verify Event's user field references User
3191        if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3192            let event_piece = &schema.named[*event_idx].piece;
3193            if let SchemaPiece::Record { fields, .. } = event_piece {
3194                assert_eq!(fields.len(), 1);
3195                assert_eq!(fields[0].name, "user");
3196                // The user field should reference the User type (Named)
3197                assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3198            } else {
3199                panic!("Expected Event to be a record");
3200            }
3201        } else {
3202            panic!("Expected top to be Named");
3203        }
3204    }
3205
3206    #[mz_ore::test]
3207    fn test_parse_with_nested_references() {
3208        // Schema A defines Address
3209        let schema_a = r#"{
3210            "type": "record",
3211            "name": "Address",
3212            "namespace": "com.example",
3213            "fields": [
3214                {"name": "street", "type": "string"},
3215                {"name": "city", "type": "string"}
3216            ]
3217        }"#;
3218
3219        // Schema B defines User with Address field (references A)
3220        let schema_b = r#"{
3221            "type": "record",
3222            "name": "User",
3223            "namespace": "com.example",
3224            "fields": [
3225                {"name": "id", "type": "int"},
3226                {"name": "address", "type": "com.example.Address"}
3227            ]
3228        }"#;
3229
3230        // Schema C defines Event with User field (references B, transitively A)
3231        let schema_c = r#"{
3232            "type": "record",
3233            "name": "Event",
3234            "namespace": "com.example",
3235            "fields": [
3236                {"name": "user", "type": "com.example.User"},
3237                {"name": "timestamp", "type": "long"}
3238            ]
3239        }"#;
3240
3241        // Parse A first
3242        let ref_schema_a = Schema::from_str(schema_a).unwrap();
3243
3244        // Parse B with reference to A
3245        let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3246        let ref_schema_b =
3247            Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3248                .unwrap();
3249
3250        // Parse C with references to A and B (in dependency order)
3251        let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3252        let final_schema =
3253            Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3254
3255        // Verify all three types are in the schema
3256        for name in ["Address", "User", "Event"] {
3257            let full_name = FullName {
3258                name: name.to_string(),
3259                namespace: "com.example".to_string(),
3260            };
3261            assert!(
3262                final_schema.indices.contains_key(&full_name),
3263                "{} type should be in schema indices",
3264                name
3265            );
3266        }
3267    }
3268
3269    #[mz_ore::test]
3270    fn test_parse_with_multiple_types_in_reference() {
3271        // Schema A defines both Address and PhoneNumber
3272        let ref_schema_json = r#"{
3273            "type": "record",
3274            "name": "ContactInfo",
3275            "namespace": "com.example",
3276            "fields": [
3277                {
3278                    "name": "address",
3279                    "type": {
3280                        "type": "record",
3281                        "name": "Address",
3282                        "fields": [{"name": "street", "type": "string"}]
3283                    }
3284                },
3285                {
3286                    "name": "phone",
3287                    "type": {
3288                        "type": "record",
3289                        "name": "PhoneNumber",
3290                        "fields": [{"name": "number", "type": "string"}]
3291                    }
3292                }
3293            ]
3294        }"#;
3295
3296        // Schema B references both Address and PhoneNumber
3297        let primary_json = r#"{
3298            "type": "record",
3299            "name": "User",
3300            "namespace": "com.example",
3301            "fields": [
3302                {"name": "id", "type": "int"},
3303                {"name": "home", "type": "com.example.Address"},
3304                {"name": "mobile", "type": "com.example.PhoneNumber"}
3305            ]
3306        }"#;
3307
3308        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3309        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3310
3311        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3312
3313        // Verify all types are in the schema
3314        for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3315            let full_name = FullName {
3316                name: name.to_string(),
3317                namespace: "com.example".to_string(),
3318            };
3319            assert!(
3320                schema.indices.contains_key(&full_name),
3321                "{} type should be in schema indices",
3322                name
3323            );
3324        }
3325    }
3326
3327    #[mz_ore::test]
3328    fn test_parse_with_no_references() {
3329        // When no references are provided, it should behave like regular parse
3330        let schema_json = r#"{
3331            "type": "record",
3332            "name": "Simple",
3333            "fields": [{"name": "id", "type": "int"}]
3334        }"#;
3335
3336        let value: Value = serde_json::from_str(schema_json).unwrap();
3337
3338        let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3339        let schema_normal = Schema::parse(&value).unwrap();
3340
3341        // Both should produce equivalent schemas
3342        assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3343        assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3344    }
3345
3346    #[mz_ore::test]
3347    fn test_parse_with_reference_in_array() {
3348        // Schema A defines an Item record
3349        let ref_schema_json = r#"{
3350            "type": "record",
3351            "name": "Item",
3352            "namespace": "com.example",
3353            "fields": [{"name": "name", "type": "string"}]
3354        }"#;
3355
3356        // Schema B has an array of Items
3357        let primary_json = r#"{
3358            "type": "record",
3359            "name": "Order",
3360            "namespace": "com.example",
3361            "fields": [
3362                {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3363            ]
3364        }"#;
3365
3366        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3367        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3368
3369        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3370
3371        // Verify both types exist
3372        let item_name = FullName {
3373            name: "Item".to_string(),
3374            namespace: "com.example".to_string(),
3375        };
3376        assert!(schema.indices.contains_key(&item_name));
3377
3378        // Verify the array items type is a Named reference
3379        if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3380            let order_piece = &schema.named[*order_idx].piece;
3381            if let SchemaPiece::Record { fields, .. } = order_piece {
3382                if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3383                    assert!(
3384                        matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3385                        "Array items should be a Named reference to Item"
3386                    );
3387                } else {
3388                    panic!("Expected items field to be an array");
3389                }
3390            } else {
3391                panic!("Expected Order to be a record");
3392            }
3393        } else {
3394            panic!("Expected top to be Named");
3395        }
3396    }
3397}