Skip to main content

mz_avro/
schema.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for parsing and interacting with schemas in Avro format.
25
26use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use aws_lc_rs::digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92/// Describes errors happened while parsing Avro schemas.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113/// Represents an Avro schema fingerprint
114/// More information about Avro schema fingerprints can be found in the
115/// [Avro Schema Resolution documentation](https://avro.apache.org/docs/++version++/specification/#schema-resolution)
116#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    /// A `null` Avro schema.
174    Null,
175    /// A `boolean` Avro schema.
176    Boolean,
177    /// An `int` Avro schema.
178    Int,
179    /// A `long` Avro schema.
180    Long,
181    /// A `float` Avro schema.
182    Float,
183    /// A `double` Avro schema.
184    Double,
185    /// An `Int` Avro schema with a semantic type being days since the unix epoch.
186    Date,
187    /// An `Int64` Avro schema with a semantic type being milliseconds since the unix epoch.
188    ///
189    /// <https://avro.apache.org/docs/++version++/specification/#time_ms>
190    TimestampMilli,
191    /// An `Int64` Avro schema with a semantic type being microseconds since the unix epoch.
192    ///
193    /// <https://avro.apache.org/docs/++version++/specification/#time-microsecond-precision>
194    TimestampMicro,
195    /// A `bytes` or `fixed` Avro schema with a logical type of `decimal` and
196    /// the specified precision and scale.
197    ///
198    /// If the underlying type is `fixed`,
199    /// the `fixed_size` field specifies the size.
200    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    /// A `bytes` Avro schema.
206    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
207    Bytes,
208    /// A `string` Avro schema.
209    /// `String` represents a unicode character sequence.
210    String,
211    /// A `string` Avro schema that is tagged as representing JSON data
212    Json,
213    /// A `string` Avro schema with a logical type of `uuid`.
214    Uuid,
215    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
216    /// This variant holds the `Schema` for the array element type.
217    Array(Box<SchemaPieceOrNamed>),
218    /// A `map` Avro schema.
219    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
220    /// `Map` keys are assumed to be `string`.
221    Map(Box<SchemaPieceOrNamed>),
222    /// A `union` Avro schema.
223    Union(UnionSchema),
224    /// A value written as `int` and read as `long`,
225    /// for the timestamp-millis logicalType.
226    ResolveIntTsMilli,
227    /// A value written as `int` and read as `long`,
228    /// for the timestamp-micros logicalType.
229    ResolveIntTsMicro,
230    /// A value written as an `int` with `date` logical type,
231    /// and read as any timestamp type
232    ResolveDateTimestamp,
233    /// A value written as `int` and read as `long`
234    ResolveIntLong,
235    /// A value written as `int` and read as `float`
236    ResolveIntFloat,
237    /// A value written as `int` and read as `double`
238    ResolveIntDouble,
239    /// A value written as `long` and read as `float`
240    ResolveLongFloat,
241    /// A value written as `long` and read as `double`
242    ResolveLongDouble,
243    /// A value written as `float` and read as `double`
244    ResolveFloatDouble,
245    /// A concrete (i.e., non-`union`) type in the writer,
246    /// resolved against one specific variant of a `union` in the reader.
247    ResolveConcreteUnion {
248        /// The index of the variant in the reader
249        index: usize,
250        /// The concrete type
251        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    /// A union in the writer, resolved against a union in the reader.
256    /// The two schemas may have different variants and the variants may be in a different order.
257    ResolveUnionUnion {
258        /// A mapping of the fields in the writer to those in the reader.
259        /// If the `i`th element is `Err(e)`, the `i`th field in the writer
260        /// did not match any field in the reader (or even if it matched by name, resolution failed).
261        /// If the `i`th element is `Ok((j, piece))`, then the `i`th field of the writer
262        /// matched the `j`th field of the reader, and `piece` is their resolved node.
263        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    /// The inverse of `ResolveConcreteUnion`
268    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    /// A `record` Avro schema.
273    ///
274    /// The `lookup` table maps field names to their position in the `Vec`
275    /// of `fields`.
276    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    /// An `enum` Avro schema.
282    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        /// The index of the default value.
286        ///
287        /// This is only used in schema resolution: it is the value that
288        /// will be read by a reader when a writer writes a value that the reader
289        /// does not expect.
290        default_idx: Option<usize>,
291    },
292    /// A `fixed` Avro schema.
293    Fixed { size: usize },
294    /// A record in the writer, resolved against a record in the reader.
295    /// The two schemas may have different fields and the fields may be in a different order.
296    ResolveRecord {
297        /// Fields that do not exist in the writer schema, but had a default
298        /// value specified in the reader schema, which we use.
299        defaults: Vec<ResolvedDefaultValueField>,
300        /// Fields in the order of their appearance in the writer schema.
301        /// `Present` if they could be resolved against a field in the reader schema;
302        /// `Absent` otherwise.
303        fields: Vec<ResolvedRecordField>,
304        /// The size of `defaults`, plus the number of `Present` values in `fields`.
305        n_reader_fields: usize,
306    },
307    /// An enum in the writer, resolved against an enum in the reader.
308    /// The two schemas may have different values and the values may be in a different order.
309    ResolveEnum {
310        doc: Documentation,
311        /// Symbols in order of the writer schema along with their index in the reader schema,
312        /// or `Err(symbol_name)` if they don't exist in the reader schema.
313        symbols: Vec<Result<(usize, String), String>>,
314        /// The value to decode if the writer writes some value not expected by the reader.
315        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    /// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
321    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    /// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
325    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    /// Constructs an `avro::Value` if this is of underlying int type.
329    /// Guaranteed to be `Some` when `is_underlying_int` is `true`.
330    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            // TODO[btv] - should we bounds-check the date here? We
334            // don't elsewhere... maybe we should everywhere.
335            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    /// Constructs an `avro::Value` if this is of underlying long type.
340    /// Guaranteed to be `Some` when `is_underlying_long` is `true`.
341    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351/// Represents any valid Avro schema
352/// More information about Avro schemas can be found in the
353/// [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration)
354#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode<'_> {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407/// This type is used to simplify enum variant comparison between `Schema` and `types::Value`.
408///
409/// **NOTE** This type was introduced due to a limitation of `mem::discriminant` requiring a _value_
410/// be constructed in order to get the discriminant, which makes it difficult to implement a
411/// function that maps from `Discriminant<Schema> -> Discriminant<Value>`. Conversion into this
412/// intermediate type should be especially fast, as the number of enum variants is small, which
413/// _should_ compile into a jump-table for the conversion.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    // Fixed-length types
417    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    // Variable-length types
424    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    // This can arise in resolved schemas, particularly when a union resolves to a non-union.
433    // We would need to do a lookup to find the actual type.
434    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521/// Represents names for `record`, `enum` and `fixed` Avro schemas.
522///
523/// Each of these `Schema`s have a `fullname` composed of two parts:
524///   * a name
525///   * a namespace
526///
527/// `aliases` can also be defined, to facilitate schema evolution.
528///
529/// More information about schema names can be found in the
530/// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
531#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    // [XXX] btv - what happens if `name` contains dots, _and_ `namespace` is `Some` ?
546    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    /// Get the shortest unambiguous synonym of this name
573    /// at a given point in the schema graph. If this name
574    /// is in the same namespace as the enclosing node, this
575    /// returns the short name; otherwise, it returns the fully qualified name.
576    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    /// Returns the namespace of the name
584    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595/// Represents documentation for complex Avro schemas.
596pub type Documentation = Option<String>;
597
598impl Name {
599    /// Reports whether the given string is a valid Avro name.
600    ///
601    /// See: <https://avro.apache.org/docs/++version++/specification/#names>
602    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    /// Returns an error if the given name is invalid.
609    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    /// Rewrites `name` to be valid.
622    ///
623    /// Any non alphanumeric characters are replaced with underscores. If the
624    /// name begins with a number, it is prefixed with `_`.
625    ///
626    /// `make_valid` is not injective. Multiple invalid names are mapped to the
627    /// the same valid name.
628    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    /// Parse a `serde_json::map::Map` into a `Name`.
653    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    /// Parses a name from a simple string.
702    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    /// Return the `fullname` of this `Name`
709    ///
710    /// More information about fullnames can be found in the
711    /// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
712    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732/// Represents a `field` in a `record` Avro schema.
733#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    /// Name of the field.
736    pub name: String,
737    /// Documentation of the field.
738    pub doc: Documentation,
739    /// Default value of the field.
740    /// This value will be used when reading Avro datum if schema resolution
741    /// is enabled.
742    pub default: Option<Value>,
743    /// Schema of the field.
744    pub schema: SchemaPieceOrNamed,
745    /// Order of the field.
746    ///
747    /// **NOTE** This currently has no effect.
748    pub order: RecordFieldOrder,
749    /// Position of the field in the list of `field` of its parent `Schema`
750    pub position: usize,
751}
752
753/// Represents any valid order for a `field` in a `record` Avro schema.
754#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    // Used to ensure uniqueness of anonymous schema inputs, and provide constant time finding of the
768    // schema index given a value.
769    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    // Same as above, for named input references
772    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    /// Returns a slice to all variants of this schema.
812    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    /// Returns a mutable slice to all variants of this schema.
817    pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818        &mut self.schemas
819    }
820
821    /// Returns true if the first variant of this `UnionSchema` is `Null`.
822    pub fn is_nullable(&self) -> bool {
823        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824    }
825
826    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827        self.anon_variant_index
828            .get(&SchemaKind::from(sp))
829            .map(|idx| (*idx, &self.schemas[*idx]))
830    }
831
832    pub fn match_ref(
833        &self,
834        other: SchemaPieceRefOrNamed,
835        names_map: &BTreeMap<usize, usize>,
836    ) -> Option<(usize, &SchemaPieceOrNamed)> {
837        match other {
838            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839            SchemaPieceRefOrNamed::Named(idx) => names_map
840                .get(&idx)
841                .and_then(|idx| self.named_variant_index.get(idx))
842                .map(|idx| (*idx, &self.schemas[*idx])),
843        }
844    }
845
846    #[inline(always)]
847    pub fn match_(
848        &self,
849        other: &SchemaPieceOrNamed,
850        names_map: &BTreeMap<usize, usize>,
851    ) -> Option<(usize, &SchemaPieceOrNamed)> {
852        self.match_ref(other.as_ref(), names_map)
853    }
854
855    /// Like [`UnionSchema::match_`], but additionally honors Avro numeric
856    /// promotion. Here `self` is the *reader* union and `writer` is a writer
857    /// variant being resolved against it.
858    pub fn match_promote_writer(
859        &self,
860        writer: &SchemaPieceOrNamed,
861        names_map: &BTreeMap<usize, usize>,
862    ) -> Option<(usize, &SchemaPieceOrNamed)> {
863        self.match_ref_promote_writer(writer.as_ref(), names_map)
864    }
865
866    /// Like [`UnionSchema::match_ref`], but additionally honors Avro numeric
867    /// promotion. `self` is the *reader* union and `writer` is a concrete
868    /// writer piece. An exact match (by kind for anonymous pieces, by name for
869    /// named ones) is preferred; otherwise the first reader variant in
870    /// declaration order that `writer` can be promoted into is returned. See
871    /// `can_promote`.
872    pub fn match_ref_promote_writer(
873        &self,
874        writer: SchemaPieceRefOrNamed,
875        names_map: &BTreeMap<usize, usize>,
876    ) -> Option<(usize, &SchemaPieceOrNamed)> {
877        if let Some(hit) = self.match_ref(writer, names_map) {
878            return Some(hit);
879        }
880        let SchemaPieceRefOrNamed::Piece(wp) = writer else {
881            return None;
882        };
883        self.schemas
884            .iter()
885            .enumerate()
886            .find_map(|(idx, variant)| match variant {
887                SchemaPieceOrNamed::Piece(rp) if can_promote(wp, rp) => Some((idx, variant)),
888                _ => None,
889            })
890    }
891
892    /// Like [`UnionSchema::match_ref`], but additionally honors Avro numeric
893    /// promotion. `self` is the *writer* union and `reader` is a concrete
894    /// reader piece. An exact match is preferred; otherwise the first writer
895    /// variant in declaration order that can be promoted into `reader` is
896    /// returned. See `can_promote`.
897    pub fn match_ref_promote_reader(
898        &self,
899        reader: SchemaPieceRefOrNamed,
900        names_map: &BTreeMap<usize, usize>,
901    ) -> Option<(usize, &SchemaPieceOrNamed)> {
902        if let Some(hit) = self.match_ref(reader, names_map) {
903            return Some(hit);
904        }
905        let SchemaPieceRefOrNamed::Piece(rp) = reader else {
906            return None;
907        };
908        self.schemas
909            .iter()
910            .enumerate()
911            .find_map(|(idx, variant)| match variant {
912                SchemaPieceOrNamed::Piece(wp) if can_promote(wp, rp) => Some((idx, variant)),
913                _ => None,
914            })
915    }
916}
917
918/// Returns `true` if a value written with primitive schema `writer` can be read
919/// as primitive `reader` under Avro's numeric promotion rules: `int` →
920/// `long`/`float`/`double`, `long` → `float`/`double`, and `float` → `double`.
921///
922/// These are exactly the promotions performed during schema resolution in
923/// `reader.rs` (`ResolveIntLong`, `ResolveIntDouble`, ...). The two must be
924/// kept in sync: only return `true` here for a pair that resolution can
925/// actually decode, otherwise a union variant would match but fail to resolve.
926fn can_promote(writer: &SchemaPiece, reader: &SchemaPiece) -> bool {
927    use SchemaPiece::*;
928    matches!(
929        (writer, reader),
930        (Int, Long)
931            | (Int, Float)
932            | (Int, Double)
933            | (Long, Float)
934            | (Long, Double)
935            | (Float, Double)
936    )
937}
938
939// No need to compare variant_index, it is derivative of schemas.
940impl PartialEq for UnionSchema {
941    fn eq(&self, other: &UnionSchema) -> bool {
942        self.schemas.eq(&other.schemas)
943    }
944}
945
946#[derive(Default)]
947struct SchemaParser {
948    named: Vec<Option<NamedSchemaPiece>>,
949    indices: BTreeMap<FullName, usize>,
950    depth: usize,
951}
952
953/// Cap on nested-type depth while parsing an Avro schema JSON. The parser
954/// recurses through `parse_inner` for every nested `record`/`array`/`map`/
955/// `union` so an untrusted schema like `{"type":{"type":{"type":...}}}`
956/// can otherwise overflow the stack.
957const MAX_SCHEMA_DEPTH: usize = 128;
958
959impl SchemaParser {
960    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
961        let top = self.parse_inner("", value)?;
962        let SchemaParser { named, indices, .. } = self;
963        Ok(Schema {
964            named: named.into_iter().map(|o| o.unwrap()).collect(),
965            indices,
966            top,
967        })
968    }
969
970    fn parse_inner(
971        &mut self,
972        default_namespace: &str,
973        value: &Value,
974    ) -> Result<SchemaPieceOrNamed, AvroError> {
975        self.depth += 1;
976        if self.depth > MAX_SCHEMA_DEPTH {
977            self.depth -= 1;
978            return Err(ParseSchemaError::new(format!(
979                "schema nesting depth exceeds limit {MAX_SCHEMA_DEPTH}"
980            ))
981            .into());
982        }
983        let result = self.parse_inner_impl(default_namespace, value);
984        self.depth -= 1;
985        result
986    }
987
988    fn parse_inner_impl(
989        &mut self,
990        default_namespace: &str,
991        value: &Value,
992    ) -> Result<SchemaPieceOrNamed, AvroError> {
993        match *value {
994            Value::String(ref t) => {
995                let name = FullName::from_parts(t.as_str(), None, default_namespace);
996                if let Some(idx) = self.indices.get(&name) {
997                    Ok(SchemaPieceOrNamed::Named(*idx))
998                } else {
999                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
1000                        t.as_str(),
1001                    )?))
1002                }
1003            }
1004            Value::Object(ref data) => self.parse_complex(default_namespace, data),
1005            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
1006                self.parse_union(default_namespace, data)?,
1007            )),
1008            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
1009        }
1010    }
1011
1012    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
1013        let idx = match self.indices.entry(fullname) {
1014            Entry::Vacant(ve) => *ve.insert(self.named.len()),
1015            Entry::Occupied(oe) => {
1016                return Err(ParseSchemaError::new(format!(
1017                    "Sub-schema with name {:?} encountered multiple times",
1018                    oe.key()
1019                ))
1020                .into());
1021            }
1022        };
1023        self.named.push(None);
1024        Ok(idx)
1025    }
1026
1027    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
1028        assert_none!(self.named[index]);
1029        self.named[index] = Some(schema);
1030    }
1031
1032    fn parse_named_type(
1033        &mut self,
1034        type_name: &str,
1035        default_namespace: &str,
1036        complex: &Map<String, Value>,
1037    ) -> Result<usize, AvroError> {
1038        let name = Name::parse(complex)?;
1039        match name.name.as_str() {
1040            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
1041                return Err(ParseSchemaError::new(format!(
1042                    "{} may not be used as a custom type name",
1043                    name.name
1044                ))
1045                .into());
1046            }
1047            _ => {}
1048        };
1049        let fullname = name.fullname(default_namespace);
1050        let default_namespace = fullname.namespace.clone();
1051        let idx = self.alloc_name(fullname.clone())?;
1052        let piece = match type_name {
1053            "record" => self.parse_record(&default_namespace, complex),
1054            "enum" => self.parse_enum(complex),
1055            "fixed" => self.parse_fixed(&default_namespace, complex),
1056            _ => unreachable!("Unknown named type kind: {}", type_name),
1057        }?;
1058
1059        self.insert(
1060            idx,
1061            NamedSchemaPiece {
1062                name: fullname,
1063                piece,
1064            },
1065        );
1066
1067        Ok(idx)
1068    }
1069
1070    /// Parse a `serde_json::Value` representing a complex Avro type into a
1071    /// `Schema`.
1072    ///
1073    /// Avro supports "recursive" definition of types.
1074    /// e.g: {"type": {"type": "string"}}
1075    fn parse_complex(
1076        &mut self,
1077        default_namespace: &str,
1078        complex: &Map<String, Value>,
1079    ) -> Result<SchemaPieceOrNamed, AvroError> {
1080        match complex.get("type") {
1081            Some(&Value::String(ref t)) => Ok(match t.as_str() {
1082                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
1083                    t,
1084                    default_namespace,
1085                    complex,
1086                )?),
1087                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
1088                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
1089                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
1090                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
1091                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
1092                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
1093                other => {
1094                    let name = FullName {
1095                        name: other.into(),
1096                        namespace: default_namespace.into(),
1097                    };
1098                    if let Some(idx) = self.indices.get(&name) {
1099                        SchemaPieceOrNamed::Named(*idx)
1100                    } else {
1101                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
1102                    }
1103                }
1104            }),
1105            Some(&Value::Object(ref data)) => match data.get("type") {
1106                Some(value) => self.parse_inner(default_namespace, value),
1107                None => Err(
1108                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1109                ),
1110            },
1111            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1112        }
1113    }
1114
1115    /// Parse a `serde_json::Value` representing a Avro record type into a
1116    /// `Schema`.
1117    fn parse_record(
1118        &mut self,
1119        default_namespace: &str,
1120        complex: &Map<String, Value>,
1121    ) -> Result<SchemaPiece, AvroError> {
1122        let mut lookup = BTreeMap::new();
1123
1124        let fields: Vec<RecordField> = complex
1125            .get("fields")
1126            .and_then(|fields| fields.as_array())
1127            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1128            .and_then(|fields| {
1129                fields
1130                    .iter()
1131                    .filter_map(|field| field.as_object())
1132                    .enumerate()
1133                    .map(|(position, field)| {
1134                        self.parse_record_field(default_namespace, field, position)
1135                    })
1136                    .collect::<Result<_, _>>()
1137            })?;
1138
1139        for field in &fields {
1140            lookup.insert(field.name.clone(), field.position);
1141        }
1142
1143        Ok(SchemaPiece::Record {
1144            doc: complex.doc(),
1145            fields,
1146            lookup,
1147        })
1148    }
1149
1150    /// Parse a `serde_json::Value` into a `RecordField`.
1151    fn parse_record_field(
1152        &mut self,
1153        default_namespace: &str,
1154        field: &Map<String, Value>,
1155        position: usize,
1156    ) -> Result<RecordField, AvroError> {
1157        let name = field
1158            .name()
1159            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1160
1161        Name::validate(&name)?;
1162
1163        let schema = field
1164            .get("type")
1165            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1166            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1167
1168        let default = field.get("default").cloned();
1169
1170        let order = field
1171            .get("order")
1172            .and_then(|order| order.as_str())
1173            .and_then(|order| match order {
1174                "ascending" => Some(RecordFieldOrder::Ascending),
1175                "descending" => Some(RecordFieldOrder::Descending),
1176                "ignore" => Some(RecordFieldOrder::Ignore),
1177                _ => None,
1178            })
1179            .unwrap_or(RecordFieldOrder::Ascending);
1180
1181        Ok(RecordField {
1182            name,
1183            doc: field.doc(),
1184            default,
1185            schema,
1186            order,
1187            position,
1188        })
1189    }
1190
1191    /// Parse a `serde_json::Value` representing a Avro enum type into a
1192    /// `Schema`.
1193    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1194        let symbols: Vec<String> = complex
1195            .get("symbols")
1196            .and_then(|v| v.as_array())
1197            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1198            .and_then(|symbols| {
1199                symbols
1200                    .iter()
1201                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1202                    .collect::<Option<_>>()
1203                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1204            })?;
1205
1206        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1207        for symbol in symbols.iter() {
1208            if unique_symbols.contains(symbol) {
1209                return Err(ParseSchemaError::new(format!(
1210                    "Enum symbols must be unique, found multiple: {}",
1211                    symbol
1212                ))
1213                .into());
1214            } else {
1215                unique_symbols.insert(symbol);
1216            }
1217        }
1218
1219        let default_idx = if let Some(default) = complex.get("default") {
1220            let default_str = default.as_str().ok_or_else(|| {
1221                ParseSchemaError::new(format!(
1222                    "Enum default should be a string, got: {:?}",
1223                    default
1224                ))
1225            })?;
1226            let default_idx = symbols
1227                .iter()
1228                .position(|x| x == default_str)
1229                .ok_or_else(|| {
1230                    ParseSchemaError::new(format!(
1231                        "Enum default not found in list of symbols: {}",
1232                        default_str
1233                    ))
1234                })?;
1235            Some(default_idx)
1236        } else {
1237            None
1238        };
1239
1240        Ok(SchemaPiece::Enum {
1241            doc: complex.doc(),
1242            symbols,
1243            default_idx,
1244        })
1245    }
1246
1247    /// Parse a `serde_json::Value` representing a Avro array type into a
1248    /// `Schema`.
1249    fn parse_array(
1250        &mut self,
1251        default_namespace: &str,
1252        complex: &Map<String, Value>,
1253    ) -> Result<SchemaPiece, AvroError> {
1254        complex
1255            .get("items")
1256            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1257            .and_then(|items| self.parse_inner(default_namespace, items))
1258            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1259    }
1260
1261    /// Parse a `serde_json::Value` representing a Avro map type into a
1262    /// `Schema`.
1263    fn parse_map(
1264        &mut self,
1265        default_namespace: &str,
1266        complex: &Map<String, Value>,
1267    ) -> Result<SchemaPiece, AvroError> {
1268        complex
1269            .get("values")
1270            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1271            .and_then(|items| self.parse_inner(default_namespace, items))
1272            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1273    }
1274
1275    /// Parse a `serde_json::Value` representing a Avro union type into a
1276    /// `Schema`.
1277    fn parse_union(
1278        &mut self,
1279        default_namespace: &str,
1280        items: &[Value],
1281    ) -> Result<SchemaPiece, AvroError> {
1282        items
1283            .iter()
1284            .map(|value| self.parse_inner(default_namespace, value))
1285            .collect::<Result<Vec<_>, _>>()
1286            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1287    }
1288
1289    /// Parse a `serde_json::Value` representing a logical decimal type into a
1290    /// `Schema`.
1291    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1292        let precision = complex
1293            .get("precision")
1294            .and_then(|v| v.as_i64())
1295            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1296
1297        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1298
1299        if scale < 0 {
1300            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1301        }
1302
1303        if precision < 0 {
1304            return Err(
1305                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1306            );
1307        }
1308
1309        if scale > precision {
1310            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1311        }
1312
1313        Ok((precision as usize, scale as usize))
1314    }
1315
1316    /// Parse a `serde_json::Value` representing an Avro bytes type into a
1317    /// `Schema`.
1318    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1319        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1320
1321        if let Some("decimal") = logical_type {
1322            match Self::parse_decimal(complex) {
1323                Ok((precision, scale)) => {
1324                    return Ok(SchemaPiece::Decimal {
1325                        precision,
1326                        scale,
1327                        fixed_size: None,
1328                    });
1329                }
1330                Err(e) => warn!(
1331                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1332                    complex, e
1333                ),
1334            }
1335        }
1336
1337        Ok(SchemaPiece::Bytes)
1338    }
1339
1340    /// Parse a [`serde_json::Value`] representing an Avro Int type
1341    ///
1342    /// If the complex type has a `connect.name` tag (as [emitted by
1343    /// Debezium][1]) that matches a `Date` tag, we specify that the correct
1344    /// schema to use is `Date`.
1345    ///
1346    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1347    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1348        const AVRO_DATE: &str = "date";
1349        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1350        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1351        if let Some(name) = complex.get("connect.name") {
1352            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1353                if name == KAFKA_DATE {
1354                    warn!("using deprecated debezium date format");
1355                }
1356                return Ok(SchemaPiece::Date);
1357            }
1358        }
1359        // Put this after the custom semantic types so that the debezium
1360        // warning is emitted, since the logicalType tag shows up in the
1361        // deprecated debezium format :-/
1362        if let Some(name) = complex.get("logicalType") {
1363            if name == AVRO_DATE {
1364                return Ok(SchemaPiece::Date);
1365            }
1366        }
1367        if !complex.is_empty() {
1368            debug!("parsing complex type as regular int: {:?}", complex);
1369        }
1370        Ok(SchemaPiece::Int)
1371    }
1372
1373    /// Parse a [`serde_json::Value`] representing an Avro Int64/Long type
1374    ///
1375    /// The debezium/kafka types are document at [the debezium site][1], and the
1376    /// avro ones are documented at [Avro][2].
1377    ///
1378    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1379    /// [2]: https://avro.apache.org/docs/++version++/specification/
1380    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1381        const AVRO_MILLI_TS: &str = "timestamp-millis";
1382        const AVRO_MICRO_TS: &str = "timestamp-micros";
1383
1384        const CONNECT_MILLI_TS: &[&str] = &[
1385            "io.debezium.time.Timestamp",
1386            "org.apache.kafka.connect.data.Timestamp",
1387        ];
1388        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1389
1390        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1391            if CONNECT_MILLI_TS.contains(&&**name) {
1392                return Ok(SchemaPiece::TimestampMilli);
1393            }
1394            if name == CONNECT_MICRO_TS {
1395                return Ok(SchemaPiece::TimestampMicro);
1396            }
1397        }
1398        if let Some(name) = complex.get("logicalType") {
1399            if name == AVRO_MILLI_TS {
1400                return Ok(SchemaPiece::TimestampMilli);
1401            }
1402            if name == AVRO_MICRO_TS {
1403                return Ok(SchemaPiece::TimestampMicro);
1404            }
1405        }
1406        if !complex.is_empty() {
1407            debug!("parsing complex type as regular long: {:?}", complex);
1408        }
1409        Ok(SchemaPiece::Long)
1410    }
1411
1412    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1413        const CONNECT_JSON: &str = "io.debezium.data.Json";
1414
1415        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1416            if CONNECT_JSON == name.as_str() {
1417                return SchemaPiece::Json;
1418            }
1419        }
1420        if let Some(name) = complex.get("logicalType") {
1421            if name == "uuid" {
1422                return SchemaPiece::Uuid;
1423            }
1424        }
1425        debug!("parsing complex type as regular string: {:?}", complex);
1426        SchemaPiece::String
1427    }
1428
1429    /// Parse a `serde_json::Value` representing a Avro fixed type into a
1430    /// `Schema`.
1431    fn parse_fixed(
1432        &self,
1433        _default_namespace: &str,
1434        complex: &Map<String, Value>,
1435    ) -> Result<SchemaPiece, AvroError> {
1436        let _name = Name::parse(complex)?;
1437
1438        let size = complex
1439            .get("size")
1440            .and_then(|v| v.as_i64())
1441            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1442        if size <= 0 {
1443            return Err(ParseSchemaError::new(format!(
1444                "Fixed values require a positive size attribute, found: {}",
1445                size
1446            ))
1447            .into());
1448        }
1449
1450        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1451
1452        if let Some("decimal") = logical_type {
1453            match Self::parse_decimal(complex) {
1454                Ok((precision, scale)) => {
1455                    let max = ((8 * size - 1) as f64 * 2_f64.log10()).floor() as usize;
1456                    if precision > max {
1457                        warn!(
1458                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1459                            precision, size
1460                        );
1461                    } else {
1462                        return Ok(SchemaPiece::Decimal {
1463                            precision,
1464                            scale,
1465                            fixed_size: Some(size as usize),
1466                        });
1467                    }
1468                }
1469                Err(e) => warn!(
1470                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1471                    complex, e
1472                ),
1473            }
1474        }
1475
1476        Ok(SchemaPiece::Fixed {
1477            size: size as usize,
1478        })
1479    }
1480}
1481
1482impl Schema {
1483    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
1484    /// schema.
1485    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1486        Self::parse_with_references(value, &[])
1487    }
1488
1489    /// Parse an JSON Avro schema with referenced named types.
1490    ///
1491    /// This is used when parsing a schema that references types defined in other
1492    /// schemas (Confluent Schema Registry schema references). Referenced schemas
1493    /// should be provided in dependency order (dependencies first).
1494    ///
1495    /// # Arguments
1496    ///
1497    /// * `primary` - The primary schema JSON value to parse
1498    /// * `reference_schemas` - Schemas whose named types should be available during parsing
1499    pub fn parse_with_references(
1500        primary: &Value,
1501        reference_schemas: &[Schema],
1502    ) -> Result<Self, AvroError> {
1503        // Collect and remap named types from all referenced schemas
1504        let (named, indices) = Self::collect_named_types(reference_schemas);
1505
1506        // Create parser with pre-populated named types
1507        let p = SchemaParser {
1508            named: named.into_iter().map(Some).collect(),
1509            indices,
1510            depth: 0,
1511        };
1512        p.parse(primary)
1513    }
1514
1515    /// Collect all named types from a list of schemas, remapping indices as needed.
1516    ///
1517    /// When combining named types from multiple schemas, each schema's internal
1518    /// indices need to be remapped to account for duplicates and new positions.
1519    fn collect_named_types(
1520        schemas: &[Schema],
1521    ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1522        let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1523        let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1524
1525        for schema in schemas {
1526            // First pass: Build the index_map from this schema's indices to combined indices.
1527            // For types that already exist: map to existing combined index
1528            // For new types: map to their future position in combined_named
1529            let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1530            let mut new_type_offset = combined_named.len();
1531
1532            for named_piece in &schema.named {
1533                if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1534                    // Type already exists, use existing index
1535                    index_map.push(existing_idx);
1536                } else {
1537                    // New type, assign next available index
1538                    index_map.push(new_type_offset);
1539                    new_type_offset += 1;
1540                }
1541            }
1542
1543            // Second pass: Add new types with proper remapping
1544            for named_piece in &schema.named {
1545                if combined_indices.contains_key(&named_piece.name) {
1546                    continue;
1547                }
1548
1549                let mut remapped = named_piece.clone();
1550                Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1551
1552                let new_idx = combined_named.len();
1553                combined_indices.insert(remapped.name.clone(), new_idx);
1554                combined_named.push(remapped);
1555            }
1556        }
1557
1558        (combined_named, combined_indices)
1559    }
1560
1561    /// Recursively remap Named indices in a SchemaPiece using an index map.
1562    fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1563        match piece {
1564            SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1565            SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1566            SchemaPiece::Union(union) => {
1567                for variant in union.variants_mut() {
1568                    Self::remap_indices_with_map(variant, index_map);
1569                }
1570            }
1571            SchemaPiece::Record { fields, .. } => {
1572                for field in fields {
1573                    Self::remap_indices_with_map(&mut field.schema, index_map);
1574                }
1575            }
1576            _ => {}
1577        }
1578    }
1579
1580    /// Remap a single SchemaPieceOrNamed using an index map.
1581    fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1582        match item {
1583            SchemaPieceOrNamed::Named(idx) => {
1584                if let Some(&new_idx) = index_map.get(*idx) {
1585                    *idx = new_idx;
1586                }
1587            }
1588            SchemaPieceOrNamed::Piece(piece) => {
1589                Self::remap_indices_in_piece_with_map(piece, index_map)
1590            }
1591        }
1592    }
1593
1594    /// Converts `self` into its [Parsing Canonical Form].
1595    ///
1596    /// [Parsing Canonical Form]:
1597    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1598    pub fn canonical_form(&self) -> String {
1599        let json = serde_json::to_value(self).unwrap();
1600        parsing_canonical_form(&json)
1601    }
1602
1603    /// Generate fingerprint of Schema's [Parsing Canonical Form].
1604    ///
1605    /// [Parsing Canonical Form]:
1606    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1607    pub fn fingerprint(&self, algorithm: &'static digest::Algorithm) -> SchemaFingerprint {
1608        let hash = digest::digest(algorithm, self.canonical_form().as_bytes());
1609        SchemaFingerprint {
1610            bytes: hash.as_ref().to_vec(),
1611        }
1612    }
1613
1614    /// Parse a `serde_json::Value` representing a primitive Avro type into a
1615    /// `Schema`.
1616    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1617        match primitive {
1618            "null" => Ok(SchemaPiece::Null),
1619            "boolean" => Ok(SchemaPiece::Boolean),
1620            "int" => Ok(SchemaPiece::Int),
1621            "long" => Ok(SchemaPiece::Long),
1622            "double" => Ok(SchemaPiece::Double),
1623            "float" => Ok(SchemaPiece::Float),
1624            "bytes" => Ok(SchemaPiece::Bytes),
1625            "string" => Ok(SchemaPiece::String),
1626            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1627        }
1628    }
1629}
1630
1631impl FromStr for Schema {
1632    type Err = AvroError;
1633
1634    /// Create a `Schema` from a string representing a JSON Avro schema.
1635    fn from_str(input: &str) -> Result<Self, AvroError> {
1636        let value = serde_json::from_str(input)
1637            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1638        Self::parse(&value)
1639    }
1640}
1641
1642#[derive(Clone, Debug, PartialEq)]
1643pub struct NamedSchemaPiece {
1644    pub name: FullName,
1645    pub piece: SchemaPiece,
1646}
1647
1648#[derive(Copy, Clone, Debug)]
1649pub struct SchemaNode<'a> {
1650    pub root: &'a Schema,
1651    pub inner: &'a SchemaPiece,
1652    pub name: Option<&'a FullName>,
1653}
1654
1655#[derive(Copy, Clone, Debug)]
1656pub enum SchemaPieceRefOrNamed<'a> {
1657    Piece(&'a SchemaPiece),
1658    Named(usize),
1659}
1660
1661impl<'a> SchemaPieceRefOrNamed<'a> {
1662    pub fn get_human_name(&self, root: &Schema) -> String {
1663        match self {
1664            Self::Piece(piece) => format!("{:?}", piece),
1665            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1666        }
1667    }
1668
1669    #[inline(always)]
1670    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1671        match self {
1672            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1673            SchemaPieceRefOrNamed::Named(index) => {
1674                let named_piece = root.lookup(index);
1675                (&named_piece.piece, Some(&named_piece.name))
1676            }
1677        }
1678    }
1679}
1680
1681#[derive(Copy, Clone, Debug)]
1682pub struct SchemaNodeOrNamed<'a> {
1683    pub root: &'a Schema,
1684    pub inner: SchemaPieceRefOrNamed<'a>,
1685}
1686
1687impl<'a> SchemaNodeOrNamed<'a> {
1688    #[inline(always)]
1689    pub fn lookup(self) -> SchemaNode<'a> {
1690        let (inner, name) = self.inner.get_piece_and_name(self.root);
1691        SchemaNode {
1692            root: self.root,
1693            inner,
1694            name,
1695        }
1696    }
1697    #[inline(always)]
1698    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1699        self.step_ref(next.as_ref())
1700    }
1701    #[inline(always)]
1702    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1703        Self {
1704            root: self.root,
1705            inner: match next {
1706                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1707                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1708            },
1709        }
1710    }
1711
1712    pub fn to_schema(self) -> Schema {
1713        let mut cloner = SchemaSubtreeDeepCloner {
1714            old_root: self.root,
1715            old_to_new_names: Default::default(),
1716            named: Default::default(),
1717        };
1718        let piece = cloner.clone_piece_or_named(self.inner);
1719        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1720        let indices: BTreeMap<FullName, usize> = named
1721            .iter()
1722            .enumerate()
1723            .map(|(i, nsp)| (nsp.name.clone(), i))
1724            .collect();
1725        Schema {
1726            named,
1727            indices,
1728            top: piece,
1729        }
1730    }
1731
1732    pub fn namespace(self) -> Option<&'a str> {
1733        let SchemaNode { name, .. } = self.lookup();
1734        name.map(|FullName { namespace, .. }| namespace.as_str())
1735    }
1736}
1737
1738struct SchemaSubtreeDeepCloner<'a> {
1739    old_root: &'a Schema,
1740    old_to_new_names: BTreeMap<usize, usize>,
1741    named: Vec<Option<NamedSchemaPiece>>,
1742}
1743
1744impl<'a> SchemaSubtreeDeepCloner<'a> {
1745    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1746        match piece {
1747            SchemaPiece::Null => SchemaPiece::Null,
1748            SchemaPiece::Boolean => SchemaPiece::Boolean,
1749            SchemaPiece::Int => SchemaPiece::Int,
1750            SchemaPiece::Long => SchemaPiece::Long,
1751            SchemaPiece::Float => SchemaPiece::Float,
1752            SchemaPiece::Double => SchemaPiece::Double,
1753            SchemaPiece::Date => SchemaPiece::Date,
1754            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1755            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1756            SchemaPiece::Json => SchemaPiece::Json,
1757            SchemaPiece::Decimal {
1758                scale,
1759                precision,
1760                fixed_size,
1761            } => SchemaPiece::Decimal {
1762                scale: *scale,
1763                precision: *precision,
1764                fixed_size: *fixed_size,
1765            },
1766            SchemaPiece::Bytes => SchemaPiece::Bytes,
1767            SchemaPiece::String => SchemaPiece::String,
1768            SchemaPiece::Uuid => SchemaPiece::Uuid,
1769            SchemaPiece::Array(inner) => {
1770                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1771            }
1772            SchemaPiece::Map(inner) => {
1773                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1774            }
1775            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1776                schemas: us
1777                    .schemas
1778                    .iter()
1779                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1780                    .collect(),
1781                anon_variant_index: us.anon_variant_index.clone(),
1782                named_variant_index: us.named_variant_index.clone(),
1783            }),
1784            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1785            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1786            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1787            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1788            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1789            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1790            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1791            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1792            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1793            SchemaPiece::ResolveConcreteUnion {
1794                index,
1795                inner,
1796                n_reader_variants,
1797                reader_null_variant,
1798            } => SchemaPiece::ResolveConcreteUnion {
1799                index: *index,
1800                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1801                n_reader_variants: *n_reader_variants,
1802                reader_null_variant: *reader_null_variant,
1803            },
1804            SchemaPiece::ResolveUnionUnion {
1805                permutation,
1806                n_reader_variants,
1807                reader_null_variant,
1808            } => SchemaPiece::ResolveUnionUnion {
1809                permutation: permutation
1810                    .clone()
1811                    .into_iter()
1812                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1813                    .collect(),
1814                n_reader_variants: *n_reader_variants,
1815                reader_null_variant: *reader_null_variant,
1816            },
1817            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1818                SchemaPiece::ResolveUnionConcrete {
1819                    index: *index,
1820                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1821                }
1822            }
1823            SchemaPiece::Record {
1824                doc,
1825                fields,
1826                lookup,
1827            } => SchemaPiece::Record {
1828                doc: doc.clone(),
1829                fields: fields
1830                    .iter()
1831                    .map(|rf| RecordField {
1832                        name: rf.name.clone(),
1833                        doc: rf.doc.clone(),
1834                        default: rf.default.clone(),
1835                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1836                        order: rf.order,
1837                        position: rf.position,
1838                    })
1839                    .collect(),
1840                lookup: lookup.clone(),
1841            },
1842            SchemaPiece::Enum {
1843                doc,
1844                symbols,
1845                default_idx,
1846            } => SchemaPiece::Enum {
1847                doc: doc.clone(),
1848                symbols: symbols.clone(),
1849                default_idx: *default_idx,
1850            },
1851            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1852            SchemaPiece::ResolveRecord {
1853                defaults,
1854                fields,
1855                n_reader_fields,
1856            } => SchemaPiece::ResolveRecord {
1857                defaults: defaults.clone(),
1858                fields: fields
1859                    .iter()
1860                    .map(|rf| match rf {
1861                        ResolvedRecordField::Present(rf) => {
1862                            ResolvedRecordField::Present(RecordField {
1863                                name: rf.name.clone(),
1864                                doc: rf.doc.clone(),
1865                                default: rf.default.clone(),
1866                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1867                                order: rf.order,
1868                                position: rf.position,
1869                            })
1870                        }
1871                        ResolvedRecordField::Absent(writer_schema) => {
1872                            ResolvedRecordField::Absent(writer_schema.clone())
1873                        }
1874                    })
1875                    .collect(),
1876                n_reader_fields: *n_reader_fields,
1877            },
1878            SchemaPiece::ResolveEnum {
1879                doc,
1880                symbols,
1881                default,
1882            } => SchemaPiece::ResolveEnum {
1883                doc: doc.clone(),
1884                symbols: symbols.clone(),
1885                default: default.clone(),
1886            },
1887        }
1888    }
1889    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1890        match piece {
1891            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1892            SchemaPieceRefOrNamed::Named(index) => {
1893                let new_index = match self.old_to_new_names.entry(index) {
1894                    Entry::Vacant(ve) => {
1895                        let new_index = self.named.len();
1896                        self.named.push(None);
1897                        ve.insert(new_index);
1898                        let old_named_piece = self.old_root.lookup(index);
1899                        let new_named_piece = NamedSchemaPiece {
1900                            name: old_named_piece.name.clone(),
1901                            piece: self.clone_piece(&old_named_piece.piece),
1902                        };
1903                        self.named[new_index] = Some(new_named_piece);
1904                        new_index
1905                    }
1906                    Entry::Occupied(oe) => *oe.get(),
1907                };
1908                SchemaPieceOrNamed::Named(new_index)
1909            }
1910        }
1911    }
1912}
1913
1914impl<'a> SchemaNode<'a> {
1915    #[inline(always)]
1916    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1917        let (inner, name) = next.get_piece_and_name(self.root);
1918        Self {
1919            root: self.root,
1920            inner,
1921            name,
1922        }
1923    }
1924
1925    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1926        use serde_json::Value::*;
1927        let val = match (json, self.inner) {
1928            // A default value always matches the first variant of a union
1929            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1930                Some(variant) => AvroValue::Union {
1931                    index: 0,
1932                    inner: Box::new(self.step(variant).json_to_value(json)?),
1933                    n_variants: us.schemas.len(),
1934                    null_variant: us
1935                        .schemas
1936                        .iter()
1937                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1938                },
1939                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1940            },
1941            (Null, SchemaPiece::Null) => AvroValue::Null,
1942            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1943            (Number(n), piece) => {
1944                match piece {
1945                    piece if piece.is_underlying_int() => {
1946                        let i =
1947                            n.as_i64()
1948                                .and_then(|i| i32::try_from(i).ok())
1949                                .ok_or_else(|| {
1950                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1951                                })?;
1952                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1953                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1954                        })?
1955                    }
1956                    piece if piece.is_underlying_long() => {
1957                        let i = n.as_i64().ok_or_else(|| {
1958                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1959                        })?;
1960                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1961                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1962                        })?
1963                    }
1964                    SchemaPiece::Float => {
1965                        let f = n.as_f64().ok_or_else(|| {
1966                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1967                        })?;
1968                        AvroValue::Float(f as f32)
1969                    }
1970                    SchemaPiece::Double => {
1971                        let f = n.as_f64().ok_or_else(|| {
1972                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1973                        })?;
1974                        AvroValue::Double(f)
1975                    }
1976                    _ => {
1977                        return Err(ParseSchemaError(format!(
1978                            "Unexpected number in default: {}",
1979                            n
1980                        )));
1981                    }
1982                }
1983            }
1984            (String(s), piece)
1985                if s.eq_ignore_ascii_case("nan")
1986                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1987            {
1988                match piece {
1989                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1990                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1991                    _ => unreachable!(),
1992                }
1993            }
1994            (String(s), piece)
1995                if s.eq_ignore_ascii_case("infinity")
1996                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1997            {
1998                match piece {
1999                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
2000                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
2001                    _ => unreachable!(),
2002                }
2003            }
2004            (String(s), piece)
2005                if s.eq_ignore_ascii_case("-infinity")
2006                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
2007            {
2008                match piece {
2009                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
2010                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
2011                    _ => unreachable!(),
2012                }
2013            }
2014            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
2015            (
2016                String(s),
2017                SchemaPiece::Decimal {
2018                    precision, scale, ..
2019                },
2020            ) => AvroValue::Decimal(DecimalValue {
2021                precision: *precision,
2022                scale: *scale,
2023                unscaled: s.clone().into_bytes(),
2024            }),
2025            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
2026            (Object(map), SchemaPiece::Record { fields, .. }) => {
2027                let field_values = fields
2028                    .iter()
2029                    .map(|rf| {
2030                        let jval = map.get(&rf.name).ok_or_else(|| {
2031                            ParseSchemaError(format!(
2032                                "Field not found in default value: {}",
2033                                rf.name
2034                            ))
2035                        })?;
2036                        let value = self.step(&rf.schema).json_to_value(jval)?;
2037                        Ok((rf.name.clone(), value))
2038                    })
2039                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
2040                AvroValue::Record(field_values)
2041            }
2042            (String(s), SchemaPiece::Enum { symbols, .. }) => {
2043                match symbols.iter().find_position(|sym| s == *sym) {
2044                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
2045                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
2046                }
2047            }
2048            (Array(vals), SchemaPiece::Array(inner)) => {
2049                let node = self.step(&**inner);
2050                let vals = vals
2051                    .iter()
2052                    .map(|val| node.json_to_value(val))
2053                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
2054                AvroValue::Array(vals)
2055            }
2056            (Object(map), SchemaPiece::Map(inner)) => {
2057                let node = self.step(&**inner);
2058                let map = map
2059                    .iter()
2060                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
2061                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
2062                AvroValue::Map(map)
2063            }
2064            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
2065                AvroValue::Fixed(*size, s.clone().into_bytes())
2066            }
2067            _ => {
2068                return Err(ParseSchemaError(format!(
2069                    "Json default value {} does not match schema",
2070                    json
2071                )));
2072            }
2073        };
2074        Ok(val)
2075    }
2076}
2077
2078#[derive(Clone)]
2079struct SchemaSerContext<'a> {
2080    node: SchemaNodeOrNamed<'a>,
2081    // This does not logically need Rc<RefCell<_>> semantics --
2082    // it is only ever mutated in one stack frame at a time.
2083    // But AFAICT serde doesn't expose a way to
2084    // provide some mutable context to every node in the tree...
2085    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
2086    /// The namespace of this node's parent, or "" by default
2087    enclosing_ns: &'a str,
2088}
2089
2090#[derive(Clone)]
2091struct RecordFieldSerContext<'a> {
2092    outer: &'a SchemaSerContext<'a>,
2093    inner: &'a RecordField,
2094}
2095
2096impl<'a> SchemaSerContext<'a> {
2097    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
2098        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
2099        Self {
2100            node: self.node.step_ref(next),
2101            seen_named: Rc::clone(&self.seen_named),
2102            enclosing_ns: ns,
2103        }
2104    }
2105}
2106
2107impl<'a> Serialize for SchemaSerContext<'a> {
2108    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2109    where
2110        S: Serializer,
2111    {
2112        match self.node.inner {
2113            SchemaPieceRefOrNamed::Piece(piece) => match piece {
2114                SchemaPiece::Null => serializer.serialize_str("null"),
2115                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2116                SchemaPiece::Int => serializer.serialize_str("int"),
2117                SchemaPiece::Long => serializer.serialize_str("long"),
2118                SchemaPiece::Float => serializer.serialize_str("float"),
2119                SchemaPiece::Double => serializer.serialize_str("double"),
2120                SchemaPiece::Date => {
2121                    let mut map = serializer.serialize_map(Some(2))?;
2122                    map.serialize_entry("type", "int")?;
2123                    map.serialize_entry("logicalType", "date")?;
2124                    map.end()
2125                }
2126                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2127                    let mut map = serializer.serialize_map(Some(2))?;
2128                    map.serialize_entry("type", "long")?;
2129                    if piece == &SchemaPiece::TimestampMilli {
2130                        map.serialize_entry("logicalType", "timestamp-millis")?;
2131                    } else {
2132                        map.serialize_entry("logicalType", "timestamp-micros")?;
2133                    }
2134                    map.end()
2135                }
2136                SchemaPiece::Decimal {
2137                    precision,
2138                    scale,
2139                    fixed_size: None,
2140                } => {
2141                    let mut map = serializer.serialize_map(Some(4))?;
2142                    map.serialize_entry("type", "bytes")?;
2143                    map.serialize_entry("precision", precision)?;
2144                    map.serialize_entry("scale", scale)?;
2145                    map.serialize_entry("logicalType", "decimal")?;
2146                    map.end()
2147                }
2148                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2149                SchemaPiece::String => serializer.serialize_str("string"),
2150                SchemaPiece::Array(inner) => {
2151                    let mut map = serializer.serialize_map(Some(2))?;
2152                    map.serialize_entry("type", "array")?;
2153                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2154                    map.end()
2155                }
2156                SchemaPiece::Map(inner) => {
2157                    let mut map = serializer.serialize_map(Some(2))?;
2158                    map.serialize_entry("type", "map")?;
2159                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2160                    map.end()
2161                }
2162                SchemaPiece::Union(inner) => {
2163                    let variants = inner.variants();
2164                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2165                    for v in variants {
2166                        seq.serialize_element(&self.step(v.as_ref()))?;
2167                    }
2168                    seq.end()
2169                }
2170                SchemaPiece::Json => {
2171                    let mut map = serializer.serialize_map(Some(2))?;
2172                    map.serialize_entry("type", "string")?;
2173                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2174                    map.end()
2175                }
2176                SchemaPiece::Uuid => {
2177                    let mut map = serializer.serialize_map(Some(4))?;
2178                    map.serialize_entry("type", "string")?;
2179                    map.serialize_entry("logicalType", "uuid")?;
2180                    map.end()
2181                }
2182                SchemaPiece::Record { .. }
2183                | SchemaPiece::Decimal {
2184                    fixed_size: Some(_),
2185                    ..
2186                }
2187                | SchemaPiece::Enum { .. }
2188                | SchemaPiece::Fixed { .. } => {
2189                    unreachable!("Unexpected named schema piece in anonymous schema position")
2190                }
2191                SchemaPiece::ResolveIntLong
2192                | SchemaPiece::ResolveDateTimestamp
2193                | SchemaPiece::ResolveIntFloat
2194                | SchemaPiece::ResolveIntDouble
2195                | SchemaPiece::ResolveLongFloat
2196                | SchemaPiece::ResolveLongDouble
2197                | SchemaPiece::ResolveFloatDouble
2198                | SchemaPiece::ResolveConcreteUnion { .. }
2199                | SchemaPiece::ResolveUnionUnion { .. }
2200                | SchemaPiece::ResolveUnionConcrete { .. }
2201                | SchemaPiece::ResolveRecord { .. }
2202                | SchemaPiece::ResolveIntTsMicro
2203                | SchemaPiece::ResolveIntTsMilli
2204                | SchemaPiece::ResolveEnum { .. } => {
2205                    panic!("Attempted to serialize resolved schema")
2206                }
2207            },
2208            SchemaPieceRefOrNamed::Named(index) => {
2209                let mut map = self.seen_named.borrow_mut();
2210                let named_piece = match map.get(&index) {
2211                    Some(name) => {
2212                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2213                    }
2214                    None => self.node.root.lookup(index),
2215                };
2216                let name = &named_piece.name;
2217                map.insert(index, name.clone());
2218                std::mem::drop(map);
2219                match &named_piece.piece {
2220                    SchemaPiece::Record { doc, fields, .. } => {
2221                        let mut map = serializer.serialize_map(None)?;
2222                        map.serialize_entry("type", "record")?;
2223                        map.serialize_entry("name", &name.name)?;
2224                        if self.enclosing_ns != &name.namespace {
2225                            map.serialize_entry("namespace", &name.namespace)?;
2226                        }
2227                        if let Some(docstr) = doc {
2228                            map.serialize_entry("doc", docstr)?;
2229                        }
2230                        // TODO (brennan) - serialize aliases
2231                        map.serialize_entry(
2232                            "fields",
2233                            &fields
2234                                .iter()
2235                                .map(|f| RecordFieldSerContext {
2236                                    outer: self,
2237                                    inner: f,
2238                                })
2239                                .collect::<Vec<_>>(),
2240                        )?;
2241                        map.end()
2242                    }
2243                    SchemaPiece::Enum {
2244                        symbols,
2245                        default_idx,
2246                        ..
2247                    } => {
2248                        let mut map = serializer.serialize_map(None)?;
2249                        map.serialize_entry("type", "enum")?;
2250                        map.serialize_entry("name", &name.name)?;
2251                        if self.enclosing_ns != &name.namespace {
2252                            map.serialize_entry("namespace", &name.namespace)?;
2253                        }
2254                        map.serialize_entry("symbols", symbols)?;
2255                        if let Some(default_idx) = *default_idx {
2256                            assert!(default_idx < symbols.len());
2257                            map.serialize_entry("default", &symbols[default_idx])?;
2258                        }
2259                        map.end()
2260                    }
2261                    SchemaPiece::Fixed { size } => {
2262                        let mut map = serializer.serialize_map(None)?;
2263                        map.serialize_entry("type", "fixed")?;
2264                        map.serialize_entry("name", &name.name)?;
2265                        if self.enclosing_ns != &name.namespace {
2266                            map.serialize_entry("namespace", &name.namespace)?;
2267                        }
2268                        map.serialize_entry("size", size)?;
2269                        map.end()
2270                    }
2271                    SchemaPiece::Decimal {
2272                        scale,
2273                        precision,
2274                        fixed_size: Some(size),
2275                    } => {
2276                        let mut map = serializer.serialize_map(Some(6))?;
2277                        map.serialize_entry("type", "fixed")?;
2278                        map.serialize_entry("logicalType", "decimal")?;
2279                        map.serialize_entry("name", &name.name)?;
2280                        if self.enclosing_ns != &name.namespace {
2281                            map.serialize_entry("namespace", &name.namespace)?;
2282                        }
2283                        map.serialize_entry("size", size)?;
2284                        map.serialize_entry("precision", precision)?;
2285                        map.serialize_entry("scale", scale)?;
2286                        map.end()
2287                    }
2288                    SchemaPiece::Null
2289                    | SchemaPiece::Boolean
2290                    | SchemaPiece::Int
2291                    | SchemaPiece::Long
2292                    | SchemaPiece::Float
2293                    | SchemaPiece::Double
2294                    | SchemaPiece::Date
2295                    | SchemaPiece::TimestampMilli
2296                    | SchemaPiece::TimestampMicro
2297                    | SchemaPiece::Decimal {
2298                        fixed_size: None, ..
2299                    }
2300                    | SchemaPiece::Bytes
2301                    | SchemaPiece::String
2302                    | SchemaPiece::Array(_)
2303                    | SchemaPiece::Map(_)
2304                    | SchemaPiece::Union(_)
2305                    | SchemaPiece::Uuid
2306                    | SchemaPiece::Json => {
2307                        unreachable!("Unexpected anonymous schema piece in named schema position")
2308                    }
2309                    SchemaPiece::ResolveIntLong
2310                    | SchemaPiece::ResolveDateTimestamp
2311                    | SchemaPiece::ResolveIntFloat
2312                    | SchemaPiece::ResolveIntDouble
2313                    | SchemaPiece::ResolveLongFloat
2314                    | SchemaPiece::ResolveLongDouble
2315                    | SchemaPiece::ResolveFloatDouble
2316                    | SchemaPiece::ResolveConcreteUnion { .. }
2317                    | SchemaPiece::ResolveUnionUnion { .. }
2318                    | SchemaPiece::ResolveUnionConcrete { .. }
2319                    | SchemaPiece::ResolveRecord { .. }
2320                    | SchemaPiece::ResolveIntTsMilli
2321                    | SchemaPiece::ResolveIntTsMicro
2322                    | SchemaPiece::ResolveEnum { .. } => {
2323                        panic!("Attempted to serialize resolved schema")
2324                    }
2325                }
2326            }
2327        }
2328    }
2329}
2330
2331impl Serialize for Schema {
2332    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2333    where
2334        S: Serializer,
2335    {
2336        let ctx = SchemaSerContext {
2337            node: SchemaNodeOrNamed {
2338                root: self,
2339                inner: self.top.as_ref(),
2340            },
2341            seen_named: Rc::new(RefCell::new(Default::default())),
2342            enclosing_ns: "",
2343        };
2344        ctx.serialize(serializer)
2345    }
2346}
2347
2348impl<'a> Serialize for RecordFieldSerContext<'a> {
2349    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2350    where
2351        S: Serializer,
2352    {
2353        let mut map = serializer.serialize_map(None)?;
2354        map.serialize_entry("name", &self.inner.name)?;
2355        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2356        if let Some(default) = &self.inner.default {
2357            map.serialize_entry("default", default)?;
2358        }
2359        if let Some(doc) = &self.inner.doc {
2360            map.serialize_entry("doc", doc)?;
2361        }
2362        map.end()
2363    }
2364}
2365
2366/// Parses a **valid** avro schema into the Parsing Canonical Form.
2367/// <https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas>
2368fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2369    pcf(schema, "", false)
2370}
2371
2372fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2373    match schema {
2374        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2375        serde_json::Value::String(s) => pcf_string(s),
2376        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2377        serde_json::Value::Number(n) => n.to_string(),
2378        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2379    }
2380}
2381
2382fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2383    // Look for the namespace variant up front.
2384    let default_ns = schema
2385        .get("namespace")
2386        .and_then(|v| v.as_str())
2387        .unwrap_or(enclosing_ns);
2388    let mut fields = Vec::new();
2389    let mut found_next_ns = None;
2390    let mut deferred_values = vec![];
2391    for (k, v) in schema {
2392        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
2393        if schema.len() == 1 && k == "type" {
2394            // Invariant: function is only callable from a valid schema, so this is acceptable.
2395            if let serde_json::Value::String(s) = v {
2396                return pcf_string(s);
2397            }
2398        }
2399
2400        // Strip out unused fields ([STRIP] rule)
2401        if field_ordering_position(k).is_none() {
2402            continue;
2403        }
2404
2405        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
2406        if k == "name" {
2407            // The `fields` stanza needs special handling, as it has "name"
2408            // fields that don't get canonicalized (since they are not type names).
2409            if in_fields {
2410                fields.push((
2411                    k,
2412                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2413                ));
2414                continue;
2415            }
2416            // Invariant: Only valid schemas. Must be a string.
2417            let name = v.as_str().unwrap();
2418            assert!(
2419                found_next_ns.is_none(),
2420                "`name` must not be specified multiple times"
2421            );
2422            let next_ns = match name.rsplit_once('.') {
2423                None => default_ns,
2424                Some((ns, _name)) => ns,
2425            };
2426            found_next_ns = Some(next_ns);
2427            let n = if next_ns.is_empty() {
2428                Cow::Borrowed(name)
2429            } else {
2430                Cow::Owned(format!("{next_ns}.{name}"))
2431            };
2432            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2433            continue;
2434        }
2435
2436        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
2437        if k == "size" {
2438            let i = match v.as_str() {
2439                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2440                None => v.as_i64().unwrap(),
2441            };
2442            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2443            continue;
2444        }
2445
2446        // For anything else, recursively process the result
2447        // (deferred until we know the namespace)
2448        deferred_values.push((k, v));
2449    }
2450
2451    let next_ns = found_next_ns.unwrap_or(default_ns);
2452    for (k, v) in deferred_values {
2453        fields.push((
2454            k,
2455            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2456        ));
2457    }
2458
2459    // Sort the fields by their canonical ordering ([ORDER] rule).
2460    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2461    let inter = fields
2462        .into_iter()
2463        .map(|(_, v)| v)
2464        .collect::<Vec<_>>()
2465        .join(",");
2466    format!("{{{}}}", inter)
2467}
2468
2469fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2470    let inter = arr
2471        .iter()
2472        .map(|s| pcf(s, enclosing_ns, in_fields))
2473        .collect::<Vec<String>>()
2474        .join(",");
2475    format!("[{}]", inter)
2476}
2477
2478fn pcf_string(s: &str) -> String {
2479    format!("\"{}\"", s)
2480}
2481
2482// Used to define the ordering and inclusion of fields.
2483fn field_ordering_position(field: &str) -> Option<usize> {
2484    let v = match field {
2485        "name" => 1,
2486        "type" => 2,
2487        "fields" => 3,
2488        "symbols" => 4,
2489        "items" => 5,
2490        "values" => 6,
2491        "size" => 7,
2492        _ => return None,
2493    };
2494
2495    Some(v)
2496}
2497
2498#[cfg(test)]
2499mod tests {
2500    use mz_ore::{assert_err, assert_ok};
2501
2502    use crate::types::{Record, ToAvro};
2503
2504    use super::*;
2505
2506    fn check_schema(schema: &str, expected: SchemaPiece) {
2507        let schema = Schema::from_str(schema).unwrap();
2508        assert_eq!(&expected, schema.top_node().inner);
2509
2510        // Test serialization round trip
2511        let schema = serde_json::to_string(&schema).unwrap();
2512        let schema = Schema::from_str(&schema).unwrap();
2513        assert_eq!(&expected, schema.top_node().inner);
2514    }
2515
2516    #[mz_ore::test]
2517    fn test_primitive_schema() {
2518        check_schema("\"null\"", SchemaPiece::Null);
2519        check_schema("\"int\"", SchemaPiece::Int);
2520        check_schema("\"double\"", SchemaPiece::Double);
2521    }
2522
2523    #[mz_ore::test]
2524    fn test_array_schema() {
2525        check_schema(
2526            r#"{"type": "array", "items": "string"}"#,
2527            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2528        );
2529    }
2530
2531    #[mz_ore::test]
2532    fn test_map_schema() {
2533        check_schema(
2534            r#"{"type": "map", "values": "double"}"#,
2535            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2536        );
2537    }
2538
2539    #[mz_ore::test]
2540    fn test_union_schema() {
2541        check_schema(
2542            r#"["null", "int"]"#,
2543            SchemaPiece::Union(
2544                UnionSchema::new(vec![
2545                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2546                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2547                ])
2548                .unwrap(),
2549            ),
2550        );
2551    }
2552
2553    #[mz_ore::test]
2554    fn test_multi_union_schema() {
2555        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2556        assert_ok!(schema);
2557        let schema = schema.unwrap();
2558        let node = schema.top_node();
2559        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2560        let union_schema = match node.inner {
2561            SchemaPiece::Union(u) => u,
2562            _ => unreachable!(),
2563        };
2564        assert_eq!(union_schema.variants().len(), 5);
2565        let mut variants = union_schema.variants().iter();
2566        assert_eq!(
2567            SchemaKind::from(node.step(variants.next().unwrap())),
2568            SchemaKind::Null
2569        );
2570        assert_eq!(
2571            SchemaKind::from(node.step(variants.next().unwrap())),
2572            SchemaKind::Int
2573        );
2574        assert_eq!(
2575            SchemaKind::from(node.step(variants.next().unwrap())),
2576            SchemaKind::Float
2577        );
2578        assert_eq!(
2579            SchemaKind::from(node.step(variants.next().unwrap())),
2580            SchemaKind::String
2581        );
2582        assert_eq!(
2583            SchemaKind::from(node.step(variants.next().unwrap())),
2584            SchemaKind::Bytes
2585        );
2586        assert_eq!(variants.next(), None);
2587    }
2588
2589    #[mz_ore::test]
2590    fn test_record_schema() {
2591        let schema = r#"
2592                {
2593                    "type": "record",
2594                    "name": "test",
2595                    "doc": "record doc",
2596                    "fields": [
2597                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2598                        {"name": "b", "doc": "b doc", "type": "string"}
2599                    ]
2600                }
2601            "#;
2602
2603        let mut lookup = BTreeMap::new();
2604        lookup.insert("a".to_owned(), 0);
2605        lookup.insert("b".to_owned(), 1);
2606
2607        let expected = SchemaPiece::Record {
2608            doc: Some("record doc".to_string()),
2609            fields: vec![
2610                RecordField {
2611                    name: "a".to_string(),
2612                    doc: Some("a doc".to_string()),
2613                    default: Some(Value::Number(42i64.into())),
2614                    schema: SchemaPiece::Long.into(),
2615                    order: RecordFieldOrder::Ascending,
2616                    position: 0,
2617                },
2618                RecordField {
2619                    name: "b".to_string(),
2620                    doc: Some("b doc".to_string()),
2621                    default: None,
2622                    schema: SchemaPiece::String.into(),
2623                    order: RecordFieldOrder::Ascending,
2624                    position: 1,
2625                },
2626            ],
2627            lookup,
2628        };
2629
2630        check_schema(schema, expected);
2631    }
2632
2633    #[mz_ore::test]
2634    fn test_enum_schema() {
2635        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2636
2637        let expected = SchemaPiece::Enum {
2638            doc: None,
2639            symbols: vec![
2640                "diamonds".to_owned(),
2641                "spades".to_owned(),
2642                "jokers".to_owned(),
2643                "clubs".to_owned(),
2644                "hearts".to_owned(),
2645            ],
2646            default_idx: Some(2),
2647        };
2648
2649        check_schema(schema, expected);
2650
2651        let bad_schema = Schema::from_str(
2652            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2653        );
2654
2655        assert_err!(bad_schema);
2656    }
2657
2658    #[mz_ore::test]
2659    fn test_fixed_schema() {
2660        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2661
2662        let expected = SchemaPiece::Fixed { size: 16usize };
2663
2664        check_schema(schema, expected);
2665    }
2666
2667    #[mz_ore::test]
2668    fn test_date_schema() {
2669        let kinds = &[
2670            r#"{
2671                    "type": "int",
2672                    "name": "datish",
2673                    "logicalType": "date"
2674                }"#,
2675            r#"{
2676                    "type": "int",
2677                    "name": "datish",
2678                    "connect.name": "io.debezium.time.Date"
2679                }"#,
2680            r#"{
2681                    "type": "int",
2682                    "name": "datish",
2683                    "connect.name": "org.apache.kafka.connect.data.Date"
2684                }"#,
2685        ];
2686        for kind in kinds {
2687            check_schema(*kind, SchemaPiece::Date);
2688
2689            let schema = Schema::from_str(*kind).unwrap();
2690            assert_eq!(
2691                serde_json::to_string(&schema).unwrap(),
2692                r#"{"type":"int","logicalType":"date"}"#
2693            );
2694        }
2695    }
2696
2697    #[mz_ore::test]
2698    fn new_field_in_middle() {
2699        let reader = r#"{
2700            "type": "record",
2701            "name": "MyRecord",
2702            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2703        }"#;
2704        let writer = r#"{
2705            "type": "record",
2706            "name": "MyRecord",
2707            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2708        }"#;
2709        let reader = Schema::from_str(reader).unwrap();
2710        let writer = Schema::from_str(writer).unwrap();
2711
2712        let mut record = Record::new(writer.top_node()).unwrap();
2713        record.put("f1", 1);
2714        record.put("f2", 2);
2715        record.put("f_interposed", 42);
2716
2717        let value = record.avro();
2718
2719        let mut buf = vec![];
2720        crate::encode::encode(&value, &writer, &mut buf);
2721
2722        let resolved = resolve_schemas(&writer, &reader).unwrap();
2723
2724        let reader = &mut &buf[..];
2725        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2726        let expected = crate::types::Value::Record(vec![
2727            ("f1".to_string(), crate::types::Value::Int(1)),
2728            ("f2".to_string(), crate::types::Value::Int(2)),
2729        ]);
2730        assert_eq!(reader_value, expected);
2731        assert!(reader.is_empty()); // all bytes should have been consumed
2732    }
2733
2734    #[mz_ore::test]
2735    fn new_field_at_end() {
2736        let reader = r#"{
2737            "type": "record",
2738            "name": "MyRecord",
2739            "fields": [{"name": "f1", "type": "int"}]
2740        }"#;
2741        let writer = r#"{
2742            "type": "record",
2743            "name": "MyRecord",
2744            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2745        }"#;
2746        let reader = Schema::from_str(reader).unwrap();
2747        let writer = Schema::from_str(writer).unwrap();
2748
2749        let mut record = Record::new(writer.top_node()).unwrap();
2750        record.put("f1", 1);
2751        record.put("f2", 2);
2752
2753        let value = record.avro();
2754
2755        let mut buf = vec![];
2756        crate::encode::encode(&value, &writer, &mut buf);
2757
2758        let resolved = resolve_schemas(&writer, &reader).unwrap();
2759
2760        let reader = &mut &buf[..];
2761        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2762        let expected =
2763            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2764        assert_eq!(reader_value, expected);
2765        assert!(reader.is_empty()); // all bytes should have been consumed
2766    }
2767
2768    /// Union variant matching decides on its own which numeric promotions are
2769    /// legal, separately from the schema resolver that actually decodes a
2770    /// writer value into a reader type. The two must agree: if matching accepts
2771    /// a promotion the resolver rejects, a union variant can match but then
2772    /// fail to decode at read time. This test pins that agreement over the
2773    /// primitive types so they cannot drift apart.
2774    #[mz_ore::test]
2775    fn test_union_promotion_agrees_with_resolution() {
2776        // Resolving two bare primitives drives exactly the resolution path that
2777        // union matching consults, so it is the right oracle.
2778        fn resolves(writer: &str, reader: &str) -> bool {
2779            let writer = Schema::from_str(&format!("\"{writer}\"")).unwrap();
2780            let reader = Schema::from_str(&format!("\"{reader}\"")).unwrap();
2781            resolve_schemas(&writer, &reader).is_ok()
2782        }
2783        fn promotes(writer: &str, reader: &str) -> bool {
2784            can_promote(
2785                &Schema::parse_primitive(writer).unwrap(),
2786                &Schema::parse_primitive(reader).unwrap(),
2787            )
2788        }
2789
2790        // The dangerous direction, over every primitive pair: any promotion
2791        // matching accepts must actually be decodable by resolution.
2792        let primitives = [
2793            "null", "boolean", "int", "long", "float", "double", "bytes", "string",
2794        ];
2795        for w in primitives {
2796            for r in primitives {
2797                if promotes(w, r) {
2798                    assert!(
2799                        resolves(w, r),
2800                        "{w} -> {r} is accepted for matching but cannot be resolved",
2801                    );
2802                }
2803            }
2804        }
2805
2806        // Over the numeric kinds, for distinct kinds the two must agree
2807        // exactly. (Identical kinds resolve trivially but are not promotions,
2808        // so matching reports false for them; exclude `w == r`.)
2809        let numeric = ["int", "long", "float", "double"];
2810        for w in numeric {
2811            for r in numeric {
2812                if w != r {
2813                    assert_eq!(
2814                        promotes(w, r),
2815                        resolves(w, r),
2816                        "matching and resolution disagree on {w} -> {r}",
2817                    );
2818                }
2819            }
2820        }
2821    }
2822
2823    #[mz_ore::test]
2824    fn default_non_nums() {
2825        let reader = r#"{
2826            "type": "record",
2827            "name": "MyRecord",
2828            "fields": [
2829                {"name": "f1", "type": "double", "default": "NaN"},
2830                {"name": "f2", "type": "double", "default": "Infinity"},
2831                {"name": "f3", "type": "double", "default": "-Infinity"}
2832            ]
2833        }
2834        "#;
2835        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2836
2837        let writer_schema = Schema::from_str(writer).unwrap();
2838        let reader_schema = Schema::from_str(reader).unwrap();
2839        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2840
2841        let record = Record::new(writer_schema.top_node()).unwrap();
2842
2843        let value = record.avro();
2844        let mut buf = vec![];
2845        crate::encode::encode(&value, &writer_schema, &mut buf);
2846
2847        let reader = &mut &buf[..];
2848        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2849        let expected = crate::types::Value::Record(vec![
2850            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2851            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2852            (
2853                "f3".to_string(),
2854                crate::types::Value::Double(f64::NEG_INFINITY),
2855            ),
2856        ]);
2857
2858        #[derive(Debug)]
2859        struct NanEq(crate::types::Value);
2860        impl std::cmp::PartialEq for NanEq {
2861            fn eq(&self, other: &Self) -> bool {
2862                match (self, other) {
2863                    (
2864                        NanEq(crate::types::Value::Double(x)),
2865                        NanEq(crate::types::Value::Double(y)),
2866                    ) if x.is_nan() && y.is_nan() => true,
2867                    (
2868                        NanEq(crate::types::Value::Float(x)),
2869                        NanEq(crate::types::Value::Float(y)),
2870                    ) if x.is_nan() && y.is_nan() => true,
2871                    (
2872                        NanEq(crate::types::Value::Record(xs)),
2873                        NanEq(crate::types::Value::Record(ys)),
2874                    ) => {
2875                        let xs = xs
2876                            .iter()
2877                            .cloned()
2878                            .map(|(k, v)| (k, NanEq(v)))
2879                            .collect::<Vec<_>>();
2880                        let ys = ys
2881                            .iter()
2882                            .cloned()
2883                            .map(|(k, v)| (k, NanEq(v)))
2884                            .collect::<Vec<_>>();
2885
2886                        xs == ys
2887                    }
2888                    (NanEq(x), NanEq(y)) => x == y,
2889                }
2890            }
2891        }
2892
2893        assert_eq!(NanEq(reader_value), NanEq(expected));
2894        assert!(reader.is_empty());
2895    }
2896
2897    #[mz_ore::test]
2898    fn test_decimal_schemas() {
2899        let schema = r#"{
2900                "type": "fixed",
2901                "name": "dec",
2902                "size": 8,
2903                "logicalType": "decimal",
2904                "precision": 12,
2905                "scale": 5
2906            }"#;
2907        let expected = SchemaPiece::Decimal {
2908            precision: 12,
2909            scale: 5,
2910            fixed_size: Some(8),
2911        };
2912        check_schema(schema, expected);
2913
2914        let schema = r#"{
2915                "type": "bytes",
2916                "logicalType": "decimal",
2917                "precision": 12,
2918                "scale": 5
2919            }"#;
2920        let expected = SchemaPiece::Decimal {
2921            precision: 12,
2922            scale: 5,
2923            fixed_size: None,
2924        };
2925        check_schema(schema, expected);
2926
2927        let res = Schema::from_str(
2928            r#"["bytes", {
2929                "type": "bytes",
2930                "logicalType": "decimal",
2931                "precision": 12,
2932                "scale": 5
2933            }]"#,
2934        );
2935        assert_eq!(
2936            res.unwrap_err().to_string(),
2937            "Schema parse error: Unions cannot contain duplicate types"
2938        );
2939
2940        let writer_schema = Schema::from_str(
2941            r#"["null", {
2942                "type": "bytes"
2943            }]"#,
2944        )
2945        .unwrap();
2946        let reader_schema = Schema::from_str(
2947            r#"["null", {
2948                "type": "bytes",
2949                "logicalType": "decimal",
2950                "precision": 12,
2951                "scale": 5
2952            }]"#,
2953        )
2954        .unwrap();
2955        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2956
2957        let expected = SchemaPiece::ResolveUnionUnion {
2958            permutation: vec![
2959                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2960                Ok((
2961                    1,
2962                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2963                        precision: 12,
2964                        scale: 5,
2965                        fixed_size: None,
2966                    }),
2967                )),
2968            ],
2969            n_reader_variants: 2,
2970            reader_null_variant: Some(0),
2971        };
2972        assert_eq!(resolved.top_node().inner, &expected);
2973    }
2974
2975    #[mz_ore::test]
2976    fn test_no_documentation() {
2977        let schema =
2978            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2979                .unwrap();
2980
2981        let doc = match schema.top_node().inner {
2982            SchemaPiece::Enum { doc, .. } => doc.clone(),
2983            _ => panic!(),
2984        };
2985
2986        assert_none!(doc);
2987    }
2988
2989    #[mz_ore::test]
2990    fn test_documentation() {
2991        let schema = Schema::from_str(
2992                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2993            ).unwrap();
2994
2995        let doc = match schema.top_node().inner {
2996            SchemaPiece::Enum { doc, .. } => doc.clone(),
2997            _ => None,
2998        };
2999
3000        assert_eq!("Some documentation".to_owned(), doc.unwrap());
3001    }
3002
3003    #[mz_ore::test]
3004    fn test_namespaces_and_names() {
3005        // When name and namespace specified, full name should contain both.
3006        let schema = Schema::from_str(
3007            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
3008        )
3009        .unwrap();
3010        assert_eq!(schema.named.len(), 1);
3011        assert_eq!(
3012            schema.named[0].name,
3013            FullName {
3014                name: "name".into(),
3015                namespace: "namespace".into()
3016            }
3017        );
3018
3019        // When name contains dots, parse the dot-separated name as the namespace.
3020        let schema =
3021            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
3022                .unwrap();
3023        assert_eq!(schema.named.len(), 1);
3024        assert_eq!(
3025            schema.named[0].name,
3026            FullName {
3027                name: "dots".into(),
3028                namespace: "name.has".into()
3029            }
3030        );
3031
3032        // Same as above, ignore any provided namespace.
3033        let schema = Schema::from_str(
3034            r#"{"type": "enum", "namespace": "namespace",
3035            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
3036        )
3037        .unwrap();
3038        assert_eq!(schema.named.len(), 1);
3039        assert_eq!(
3040            schema.named[0].name,
3041            FullName {
3042                name: "dots".into(),
3043                namespace: "name.has".into()
3044            }
3045        );
3046
3047        // Use default namespace when namespace is not provided.
3048        // Materialize uses "" as the default namespace.
3049        let schema = Schema::from_str(
3050            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
3051            "fields": [{"name": "name", "type": "string"}]}"#,
3052        )
3053        .unwrap();
3054        assert_eq!(schema.named.len(), 1);
3055        assert_eq!(
3056            schema.named[0].name,
3057            FullName {
3058                name: "TestDoc".into(),
3059                namespace: "".into()
3060            }
3061        );
3062
3063        // Empty namespace strings should be allowed.
3064        let schema = Schema::from_str(
3065            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
3066            "fields": [{"name": "name", "type": "string"}]}"#,
3067        )
3068        .unwrap();
3069        assert_eq!(schema.named.len(), 1);
3070        assert_eq!(
3071            schema.named[0].name,
3072            FullName {
3073                name: "TestDoc".into(),
3074                namespace: "".into()
3075            }
3076        );
3077
3078        // Equality of names is defined on the FullName and is case-sensitive.
3079        let first = Schema::from_str(
3080            r#"{"type": "fixed", "namespace": "namespace",
3081            "name": "name", "size": 1}"#,
3082        )
3083        .unwrap();
3084        let second = Schema::from_str(
3085            r#"{"type": "fixed", "name": "namespace.name",
3086            "size": 1}"#,
3087        )
3088        .unwrap();
3089        assert_eq!(first.named[0].name, second.named[0].name);
3090
3091        let first = Schema::from_str(
3092            r#"{"type": "fixed", "namespace": "namespace",
3093            "name": "name", "size": 1}"#,
3094        )
3095        .unwrap();
3096        let second = Schema::from_str(
3097            r#"{"type": "fixed", "name": "namespace.Name",
3098            "size": 1}"#,
3099        )
3100        .unwrap();
3101        assert_ne!(first.named[0].name, second.named[0].name);
3102
3103        let first = Schema::from_str(
3104            r#"{"type": "fixed", "namespace": "Namespace",
3105            "name": "name", "size": 1}"#,
3106        )
3107        .unwrap();
3108        let second = Schema::from_str(
3109            r#"{"type": "fixed", "namespace": "namespace",
3110            "name": "name", "size": 1}"#,
3111        )
3112        .unwrap();
3113        assert_ne!(first.named[0].name, second.named[0].name);
3114
3115        // The name portion of a fullname, record field names, and enum symbols must:
3116        // start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
3117        assert!(
3118            Schema::from_str(
3119                r#"{"type": "record", "name": "99 problems but a name aint one",
3120            "fields": [{"name": "name", "type": "string"}]}"#
3121            )
3122            .is_err()
3123        );
3124
3125        assert!(
3126            Schema::from_str(
3127                r#"{"type": "record", "name": "!!!",
3128            "fields": [{"name": "name", "type": "string"}]}"#
3129            )
3130            .is_err()
3131        );
3132
3133        assert!(
3134            Schema::from_str(
3135                r#"{"type": "record", "name": "_valid_until_©",
3136            "fields": [{"name": "name", "type": "string"}]}"#
3137            )
3138            .is_err()
3139        );
3140
3141        // Use previously defined names and namespaces as type.
3142        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
3143              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
3144              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
3145              {"name": "f3", "type": "MyEnum"},
3146              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
3147              {"name": "f5", "type": "other.namespace.OtherEnum"},
3148              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
3149              {"name": "f7", "type": "some.other.ThirdEnum"}
3150             ]}"#).unwrap();
3151        assert_eq!(schema.named.len(), 4);
3152
3153        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3154            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // f1
3155            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // f2
3156            assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); // f3
3157            assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); // f4
3158            assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); // f5
3159            assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); // f6
3160            assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); // f7
3161        } else {
3162            panic!("Expected SchemaPiece::Record, found something else");
3163        }
3164
3165        let schema = Schema::from_str(
3166            r#"{"type": "record", "name": "x.Y", "fields": [
3167              {"name": "e", "type":
3168                {"type": "record", "name": "Z", "fields": [
3169                  {"name": "f", "type": "x.Y"},
3170                  {"name": "g", "type": "x.Z"}
3171                ]}
3172              }
3173            ]}"#,
3174        )
3175        .unwrap();
3176        assert_eq!(schema.named.len(), 2);
3177
3178        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3179            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // e
3180        } else {
3181            panic!("Expected SchemaPiece::Record, found something else");
3182        }
3183
3184        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3185            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); // f
3186            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // g
3187        } else {
3188            panic!("Expected SchemaPiece::Record, found something else");
3189        }
3190
3191        let schema = Schema::from_str(
3192            r#"{"type": "record", "name": "R", "fields": [
3193              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3194                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3195                 "symbols": ["Foo", "Bar"]}
3196                }
3197              ]}},
3198              {"name": "t", "type": "Z"}
3199            ]}"#,
3200        )
3201        .unwrap();
3202        assert_eq!(schema.named.len(), 3);
3203
3204        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3205            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // s
3206            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); // t - refers to "".Z
3207        } else {
3208            panic!("Expected SchemaPiece::Record, found something else");
3209        }
3210    }
3211
3212    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
3213    // compile, they pass.
3214    #[mz_ore::test]
3215    fn test_schema_is_send() {
3216        fn send<S: Send>(_s: S) {}
3217
3218        let schema = Schema {
3219            named: vec![],
3220            indices: Default::default(),
3221            top: SchemaPiece::Null.into(),
3222        };
3223        send(schema);
3224    }
3225
3226    #[mz_ore::test]
3227    fn test_schema_is_sync() {
3228        fn sync<S: Sync>(_s: S) {}
3229
3230        let schema = Schema {
3231            named: vec![],
3232            indices: Default::default(),
3233            top: SchemaPiece::Null.into(),
3234        };
3235        sync(&schema);
3236        sync(schema);
3237    }
3238
3239    #[mz_ore::test]
3240    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
3241    fn test_schema_fingerprint() {
3242        let raw_schema = r#"
3243        {
3244            "type": "record",
3245            "name": "test",
3246            "fields": [
3247                {"name": "a", "type": "long", "default": 42},
3248                {"name": "b", "type": "string"}
3249            ]
3250        }
3251    "#;
3252        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3253        let schema = Schema::from_str(raw_schema).unwrap();
3254        assert_eq!(&schema.canonical_form(), expected_canonical);
3255        let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3256            .as_ref()
3257            .iter()
3258            .map(|b| format!("{b:02x}"))
3259            .collect::<String>();
3260        assert_eq!(
3261            format!("{}", schema.fingerprint(&digest::SHA256)),
3262            expected_fingerprint
3263        );
3264
3265        let raw_schema = r#"
3266{
3267  "type": "record",
3268  "name": "ns.r1",
3269  "namespace": "ignored",
3270  "fields": [
3271    {
3272      "name": "f1",
3273      "type": {
3274        "type": "fixed",
3275        "name": "r2",
3276        "size": 1
3277      }
3278    }
3279  ]
3280}
3281"#;
3282        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3283        let schema = Schema::from_str(raw_schema).unwrap();
3284        assert_eq!(&schema.canonical_form(), expected_canonical);
3285        let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3286            .as_ref()
3287            .iter()
3288            .map(|b| format!("{b:02x}"))
3289            .collect::<String>();
3290        assert_eq!(
3291            format!("{}", schema.fingerprint(&digest::SHA256)),
3292            expected_fingerprint
3293        );
3294    }
3295
3296    #[mz_ore::test]
3297    fn test_make_valid() {
3298        for (input, expected) in [
3299            ("foo", "foo"),
3300            ("az99", "az99"),
3301            ("99az", "_99az"),
3302            ("is,bad", "is_bad"),
3303            ("@#$%", "____"),
3304            ("i-amMisBehaved!", "i_amMisBehaved_"),
3305            ("", "_"),
3306        ] {
3307            let actual = Name::make_valid(input);
3308            assert_eq!(expected, actual, "Name::make_valid({input})")
3309        }
3310    }
3311
3312    #[mz_ore::test]
3313    fn test_parse_with_simple_reference() {
3314        // Schema A defines a User record
3315        let ref_schema_json = r#"{
3316            "type": "record",
3317            "name": "User",
3318            "namespace": "com.example",
3319            "fields": [{"name": "id", "type": "int"}]
3320        }"#;
3321
3322        // Schema B references User
3323        let primary_json = r#"{
3324            "type": "record",
3325            "name": "Event",
3326            "namespace": "com.example",
3327            "fields": [{"name": "user", "type": "com.example.User"}]
3328        }"#;
3329
3330        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3331        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3332
3333        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3334
3335        // Verify both Event and User types are in the schema
3336        let user_name = FullName {
3337            name: "User".to_string(),
3338            namespace: "com.example".to_string(),
3339        };
3340        let event_name = FullName {
3341            name: "Event".to_string(),
3342            namespace: "com.example".to_string(),
3343        };
3344
3345        assert!(
3346            schema.indices.contains_key(&user_name),
3347            "User type should be in schema indices"
3348        );
3349        assert!(
3350            schema.indices.contains_key(&event_name),
3351            "Event type should be in schema indices"
3352        );
3353
3354        // Verify Event's user field references User
3355        if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3356            let event_piece = &schema.named[*event_idx].piece;
3357            if let SchemaPiece::Record { fields, .. } = event_piece {
3358                assert_eq!(fields.len(), 1);
3359                assert_eq!(fields[0].name, "user");
3360                // The user field should reference the User type (Named)
3361                assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3362            } else {
3363                panic!("Expected Event to be a record");
3364            }
3365        } else {
3366            panic!("Expected top to be Named");
3367        }
3368    }
3369
3370    #[mz_ore::test]
3371    fn test_parse_with_nested_references() {
3372        // Schema A defines Address
3373        let schema_a = r#"{
3374            "type": "record",
3375            "name": "Address",
3376            "namespace": "com.example",
3377            "fields": [
3378                {"name": "street", "type": "string"},
3379                {"name": "city", "type": "string"}
3380            ]
3381        }"#;
3382
3383        // Schema B defines User with Address field (references A)
3384        let schema_b = r#"{
3385            "type": "record",
3386            "name": "User",
3387            "namespace": "com.example",
3388            "fields": [
3389                {"name": "id", "type": "int"},
3390                {"name": "address", "type": "com.example.Address"}
3391            ]
3392        }"#;
3393
3394        // Schema C defines Event with User field (references B, transitively A)
3395        let schema_c = r#"{
3396            "type": "record",
3397            "name": "Event",
3398            "namespace": "com.example",
3399            "fields": [
3400                {"name": "user", "type": "com.example.User"},
3401                {"name": "timestamp", "type": "long"}
3402            ]
3403        }"#;
3404
3405        // Parse A first
3406        let ref_schema_a = Schema::from_str(schema_a).unwrap();
3407
3408        // Parse B with reference to A
3409        let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3410        let ref_schema_b =
3411            Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3412                .unwrap();
3413
3414        // Parse C with references to A and B (in dependency order)
3415        let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3416        let final_schema =
3417            Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3418
3419        // Verify all three types are in the schema
3420        for name in ["Address", "User", "Event"] {
3421            let full_name = FullName {
3422                name: name.to_string(),
3423                namespace: "com.example".to_string(),
3424            };
3425            assert!(
3426                final_schema.indices.contains_key(&full_name),
3427                "{} type should be in schema indices",
3428                name
3429            );
3430        }
3431    }
3432
3433    #[mz_ore::test]
3434    fn test_parse_with_multiple_types_in_reference() {
3435        // Schema A defines both Address and PhoneNumber
3436        let ref_schema_json = r#"{
3437            "type": "record",
3438            "name": "ContactInfo",
3439            "namespace": "com.example",
3440            "fields": [
3441                {
3442                    "name": "address",
3443                    "type": {
3444                        "type": "record",
3445                        "name": "Address",
3446                        "fields": [{"name": "street", "type": "string"}]
3447                    }
3448                },
3449                {
3450                    "name": "phone",
3451                    "type": {
3452                        "type": "record",
3453                        "name": "PhoneNumber",
3454                        "fields": [{"name": "number", "type": "string"}]
3455                    }
3456                }
3457            ]
3458        }"#;
3459
3460        // Schema B references both Address and PhoneNumber
3461        let primary_json = r#"{
3462            "type": "record",
3463            "name": "User",
3464            "namespace": "com.example",
3465            "fields": [
3466                {"name": "id", "type": "int"},
3467                {"name": "home", "type": "com.example.Address"},
3468                {"name": "mobile", "type": "com.example.PhoneNumber"}
3469            ]
3470        }"#;
3471
3472        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3473        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3474
3475        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3476
3477        // Verify all types are in the schema
3478        for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3479            let full_name = FullName {
3480                name: name.to_string(),
3481                namespace: "com.example".to_string(),
3482            };
3483            assert!(
3484                schema.indices.contains_key(&full_name),
3485                "{} type should be in schema indices",
3486                name
3487            );
3488        }
3489    }
3490
3491    #[mz_ore::test]
3492    fn test_parse_with_no_references() {
3493        // When no references are provided, it should behave like regular parse
3494        let schema_json = r#"{
3495            "type": "record",
3496            "name": "Simple",
3497            "fields": [{"name": "id", "type": "int"}]
3498        }"#;
3499
3500        let value: Value = serde_json::from_str(schema_json).unwrap();
3501
3502        let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3503        let schema_normal = Schema::parse(&value).unwrap();
3504
3505        // Both should produce equivalent schemas
3506        assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3507        assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3508    }
3509
3510    #[mz_ore::test]
3511    fn test_parse_with_reference_in_array() {
3512        // Schema A defines an Item record
3513        let ref_schema_json = r#"{
3514            "type": "record",
3515            "name": "Item",
3516            "namespace": "com.example",
3517            "fields": [{"name": "name", "type": "string"}]
3518        }"#;
3519
3520        // Schema B has an array of Items
3521        let primary_json = r#"{
3522            "type": "record",
3523            "name": "Order",
3524            "namespace": "com.example",
3525            "fields": [
3526                {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3527            ]
3528        }"#;
3529
3530        let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3531        let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3532
3533        let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3534
3535        // Verify both types exist
3536        let item_name = FullName {
3537            name: "Item".to_string(),
3538            namespace: "com.example".to_string(),
3539        };
3540        assert!(schema.indices.contains_key(&item_name));
3541
3542        // Verify the array items type is a Named reference
3543        if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3544            let order_piece = &schema.named[*order_idx].piece;
3545            if let SchemaPiece::Record { fields, .. } = order_piece {
3546                if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3547                    assert!(
3548                        matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3549                        "Array items should be a Named reference to Item"
3550                    );
3551                } else {
3552                    panic!("Expected items field to be an array");
3553                }
3554            } else {
3555                panic!("Expected Order to be a record");
3556            }
3557        } else {
3558            panic!("Expected top to be Named");
3559        }
3560    }
3561}