mz_avro/
schema.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for parsing and interacting with schemas in Avro format.
25
26use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92/// Describes errors happened while parsing Avro schemas.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113/// Represents an Avro schema fingerprint
114/// More information about Avro schema fingerprints can be found in the
115/// [Avro Schema Fingerprint documentation](https://avro.apache.org/docs/current/spec.html#schema_fingerprints)
116#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    /// A `null` Avro schema.
174    Null,
175    /// A `boolean` Avro schema.
176    Boolean,
177    /// An `int` Avro schema.
178    Int,
179    /// A `long` Avro schema.
180    Long,
181    /// A `float` Avro schema.
182    Float,
183    /// A `double` Avro schema.
184    Double,
185    /// An `Int` Avro schema with a semantic type being days since the unix epoch.
186    Date,
187    /// An `Int64` Avro schema with a semantic type being milliseconds since the unix epoch.
188    ///
189    /// <https://avro.apache.org/docs/current/spec.html#Timestamp+%28millisecond+precision%29>
190    TimestampMilli,
191    /// An `Int64` Avro schema with a semantic type being microseconds since the unix epoch.
192    ///
193    /// <https://avro.apache.org/docs/current/spec.html#Timestamp+%28microsecond+precision%29>
194    TimestampMicro,
195    /// A `bytes` or `fixed` Avro schema with a logical type of `decimal` and
196    /// the specified precision and scale.
197    ///
198    /// If the underlying type is `fixed`,
199    /// the `fixed_size` field specifies the size.
200    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    /// A `bytes` Avro schema.
206    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
207    Bytes,
208    /// A `string` Avro schema.
209    /// `String` represents a unicode character sequence.
210    String,
211    /// A `string` Avro schema that is tagged as representing JSON data
212    Json,
213    /// A `string` Avro schema with a logical type of `uuid`.
214    Uuid,
215    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
216    /// This variant holds the `Schema` for the array element type.
217    Array(Box<SchemaPieceOrNamed>),
218    /// A `map` Avro schema.
219    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
220    /// `Map` keys are assumed to be `string`.
221    Map(Box<SchemaPieceOrNamed>),
222    /// A `union` Avro schema.
223    Union(UnionSchema),
224    /// A value written as `int` and read as `long`,
225    /// for the timestamp-millis logicalType.
226    ResolveIntTsMilli,
227    /// A value written as `int` and read as `long`,
228    /// for the timestamp-micros logicalType.
229    ResolveIntTsMicro,
230    /// A value written as an `int` with `date` logical type,
231    /// and read as any timestamp type
232    ResolveDateTimestamp,
233    /// A value written as `int` and read as `long`
234    ResolveIntLong,
235    /// A value written as `int` and read as `float`
236    ResolveIntFloat,
237    /// A value written as `int` and read as `double`
238    ResolveIntDouble,
239    /// A value written as `long` and read as `float`
240    ResolveLongFloat,
241    /// A value written as `long` and read as `double`
242    ResolveLongDouble,
243    /// A value written as `float` and read as `double`
244    ResolveFloatDouble,
245    /// A concrete (i.e., non-`union`) type in the writer,
246    /// resolved against one specific variant of a `union` in the reader.
247    ResolveConcreteUnion {
248        /// The index of the variant in the reader
249        index: usize,
250        /// The concrete type
251        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    /// A union in the writer, resolved against a union in the reader.
256    /// The two schemas may have different variants and the variants may be in a different order.
257    ResolveUnionUnion {
258        /// A mapping of the fields in the writer to those in the reader.
259        /// If the `i`th element is `Err(e)`, the `i`th field in the writer
260        /// did not match any field in the reader (or even if it matched by name, resolution failed).
261        /// If the `i`th element is `Ok((j, piece))`, then the `i`th field of the writer
262        /// matched the `j`th field of the reader, and `piece` is their resolved node.
263        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    /// The inverse of `ResolveConcreteUnion`
268    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    /// A `record` Avro schema.
273    ///
274    /// The `lookup` table maps field names to their position in the `Vec`
275    /// of `fields`.
276    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    /// An `enum` Avro schema.
282    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        /// The index of the default value.
286        ///
287        /// This is only used in schema resolution: it is the value that
288        /// will be read by a reader when a writer writes a value that the reader
289        /// does not expect.
290        default_idx: Option<usize>,
291    },
292    /// A `fixed` Avro schema.
293    Fixed { size: usize },
294    /// A record in the writer, resolved against a record in the reader.
295    /// The two schemas may have different fields and the fields may be in a different order.
296    ResolveRecord {
297        /// Fields that do not exist in the writer schema, but had a default
298        /// value specified in the reader schema, which we use.
299        defaults: Vec<ResolvedDefaultValueField>,
300        /// Fields in the order of their appearance in the writer schema.
301        /// `Present` if they could be resolved against a field in the reader schema;
302        /// `Absent` otherwise.
303        fields: Vec<ResolvedRecordField>,
304        /// The size of `defaults`, plus the number of `Present` values in `fields`.
305        n_reader_fields: usize,
306    },
307    /// An enum in the writer, resolved against an enum in the reader.
308    /// The two schemas may have different values and the values may be in a different order.
309    ResolveEnum {
310        doc: Documentation,
311        /// Symbols in order of the writer schema along with their index in the reader schema,
312        /// or `Err(symbol_name)` if they don't exist in the reader schema.
313        symbols: Vec<Result<(usize, String), String>>,
314        /// The value to decode if the writer writes some value not expected by the reader.
315        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    /// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
321    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    /// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
325    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    /// Constructs an `avro::Value` if this is of underlying int type.
329    /// Guaranteed to be `Some` when `is_underlying_int` is `true`.
330    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            // TODO[btv] - should we bounds-check the date here? We
334            // don't elsewhere... maybe we should everywhere.
335            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    /// Constructs an `avro::Value` if this is of underlying long type.
340    /// Guaranteed to be `Some` when `is_underlying_long` is `true`.
341    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351/// Represents any valid Avro schema
352/// More information about Avro schemas can be found in the
353/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
354#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407/// This type is used to simplify enum variant comparison between `Schema` and `types::Value`.
408///
409/// **NOTE** This type was introduced due to a limitation of `mem::discriminant` requiring a _value_
410/// be constructed in order to get the discriminant, which makes it difficult to implement a
411/// function that maps from `Discriminant<Schema> -> Discriminant<Value>`. Conversion into this
412/// intermediate type should be especially fast, as the number of enum variants is small, which
413/// _should_ compile into a jump-table for the conversion.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    // Fixed-length types
417    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    // Variable-length types
424    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    // This can arise in resolved schemas, particularly when a union resolves to a non-union.
433    // We would need to do a lookup to find the actual type.
434    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521/// Represents names for `record`, `enum` and `fixed` Avro schemas.
522///
523/// Each of these `Schema`s have a `fullname` composed of two parts:
524///   * a name
525///   * a namespace
526///
527/// `aliases` can also be defined, to facilitate schema evolution.
528///
529/// More information about schema names can be found in the
530/// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
531#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    // [XXX] btv - what happens if `name` contains dots, _and_ `namespace` is `Some` ?
546    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    /// Get the shortest unambiguous synonym of this name
573    /// at a given point in the schema graph. If this name
574    /// is in the same namespace as the enclosing node, this
575    /// returns the short name; otherwise, it returns the fully qualified name.
576    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    /// Returns the namespace of the name
584    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595/// Represents documentation for complex Avro schemas.
596pub type Documentation = Option<String>;
597
598impl Name {
599    /// Reports whether the given string is a valid Avro name.
600    ///
601    /// See: <https://avro.apache.org/docs/1.11.1/specification/#names>
602    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    /// Returns an error if the given name is invalid.
609    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    /// Rewrites `name` to be valid.
622    ///
623    /// Any non alphanumeric characters are replaced with underscores. If the
624    /// name begins with a number, it is prefixed with `_`.
625    ///
626    /// `make_valid` is not injective. Multiple invalid names are mapped to the
627    /// the same valid name.
628    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    /// Parse a `serde_json::map::Map` into a `Name`.
653    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    /// Parses a name from a simple string.
702    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    /// Return the `fullname` of this `Name`
709    ///
710    /// More information about fullnames can be found in the
711    /// [Avro specification](https://avro.apache.org/docs/current/spec.html#names)
712    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732/// Represents a `field` in a `record` Avro schema.
733#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    /// Name of the field.
736    pub name: String,
737    /// Documentation of the field.
738    pub doc: Documentation,
739    /// Default value of the field.
740    /// This value will be used when reading Avro datum if schema resolution
741    /// is enabled.
742    pub default: Option<Value>,
743    /// Schema of the field.
744    pub schema: SchemaPieceOrNamed,
745    /// Order of the field.
746    ///
747    /// **NOTE** This currently has no effect.
748    pub order: RecordFieldOrder,
749    /// Position of the field in the list of `field` of its parent `Schema`
750    pub position: usize,
751}
752
753/// Represents any valid order for a `field` in a `record` Avro schema.
754#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    // Used to ensure uniqueness of anonymous schema inputs, and provide constant time finding of the
768    // schema index given a value.
769    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    // Same as above, for named input references
772    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    /// Returns a slice to all variants of this schema.
812    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    /// Returns true if the first variant of this `UnionSchema` is `Null`.
817    pub fn is_nullable(&self) -> bool {
818        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
819    }
820
821    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
822        self.anon_variant_index
823            .get(&SchemaKind::from(sp))
824            .map(|idx| (*idx, &self.schemas[*idx]))
825    }
826
827    pub fn match_ref(
828        &self,
829        other: SchemaPieceRefOrNamed,
830        names_map: &BTreeMap<usize, usize>,
831    ) -> Option<(usize, &SchemaPieceOrNamed)> {
832        match other {
833            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
834            SchemaPieceRefOrNamed::Named(idx) => names_map
835                .get(&idx)
836                .and_then(|idx| self.named_variant_index.get(idx))
837                .map(|idx| (*idx, &self.schemas[*idx])),
838        }
839    }
840
841    #[inline(always)]
842    pub fn match_(
843        &self,
844        other: &SchemaPieceOrNamed,
845        names_map: &BTreeMap<usize, usize>,
846    ) -> Option<(usize, &SchemaPieceOrNamed)> {
847        self.match_ref(other.as_ref(), names_map)
848    }
849}
850
851// No need to compare variant_index, it is derivative of schemas.
852impl PartialEq for UnionSchema {
853    fn eq(&self, other: &UnionSchema) -> bool {
854        self.schemas.eq(&other.schemas)
855    }
856}
857
858#[derive(Default)]
859struct SchemaParser {
860    named: Vec<Option<NamedSchemaPiece>>,
861    indices: BTreeMap<FullName, usize>,
862}
863
864impl SchemaParser {
865    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
866        let top = self.parse_inner("", value)?;
867        let SchemaParser { named, indices } = self;
868        Ok(Schema {
869            named: named.into_iter().map(|o| o.unwrap()).collect(),
870            indices,
871            top,
872        })
873    }
874
875    fn parse_inner(
876        &mut self,
877        default_namespace: &str,
878        value: &Value,
879    ) -> Result<SchemaPieceOrNamed, AvroError> {
880        match *value {
881            Value::String(ref t) => {
882                let name = FullName::from_parts(t.as_str(), None, default_namespace);
883                if let Some(idx) = self.indices.get(&name) {
884                    Ok(SchemaPieceOrNamed::Named(*idx))
885                } else {
886                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
887                        t.as_str(),
888                    )?))
889                }
890            }
891            Value::Object(ref data) => self.parse_complex(default_namespace, data),
892            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
893                self.parse_union(default_namespace, data)?,
894            )),
895            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
896        }
897    }
898
899    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
900        let idx = match self.indices.entry(fullname) {
901            Entry::Vacant(ve) => *ve.insert(self.named.len()),
902            Entry::Occupied(oe) => {
903                return Err(ParseSchemaError::new(format!(
904                    "Sub-schema with name {:?} encountered multiple times",
905                    oe.key()
906                ))
907                .into());
908            }
909        };
910        self.named.push(None);
911        Ok(idx)
912    }
913
914    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
915        assert_none!(self.named[index]);
916        self.named[index] = Some(schema);
917    }
918
919    fn parse_named_type(
920        &mut self,
921        type_name: &str,
922        default_namespace: &str,
923        complex: &Map<String, Value>,
924    ) -> Result<usize, AvroError> {
925        let name = Name::parse(complex)?;
926        match name.name.as_str() {
927            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
928                return Err(ParseSchemaError::new(format!(
929                    "{} may not be used as a custom type name",
930                    name.name
931                ))
932                .into());
933            }
934            _ => {}
935        };
936        let fullname = name.fullname(default_namespace);
937        let default_namespace = fullname.namespace.clone();
938        let idx = self.alloc_name(fullname.clone())?;
939        let piece = match type_name {
940            "record" => self.parse_record(&default_namespace, complex),
941            "enum" => self.parse_enum(complex),
942            "fixed" => self.parse_fixed(&default_namespace, complex),
943            _ => unreachable!("Unknown named type kind: {}", type_name),
944        }?;
945
946        self.insert(
947            idx,
948            NamedSchemaPiece {
949                name: fullname,
950                piece,
951            },
952        );
953
954        Ok(idx)
955    }
956
957    /// Parse a `serde_json::Value` representing a complex Avro type into a
958    /// `Schema`.
959    ///
960    /// Avro supports "recursive" definition of types.
961    /// e.g: {"type": {"type": "string"}}
962    fn parse_complex(
963        &mut self,
964        default_namespace: &str,
965        complex: &Map<String, Value>,
966    ) -> Result<SchemaPieceOrNamed, AvroError> {
967        match complex.get("type") {
968            Some(&Value::String(ref t)) => Ok(match t.as_str() {
969                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
970                    t,
971                    default_namespace,
972                    complex,
973                )?),
974                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
975                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
976                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
977                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
978                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
979                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
980                other => {
981                    let name = FullName {
982                        name: other.into(),
983                        namespace: default_namespace.into(),
984                    };
985                    if let Some(idx) = self.indices.get(&name) {
986                        SchemaPieceOrNamed::Named(*idx)
987                    } else {
988                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
989                    }
990                }
991            }),
992            Some(&Value::Object(ref data)) => match data.get("type") {
993                Some(value) => self.parse_inner(default_namespace, value),
994                None => Err(
995                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
996                ),
997            },
998            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
999        }
1000    }
1001
1002    /// Parse a `serde_json::Value` representing a Avro record type into a
1003    /// `Schema`.
1004    fn parse_record(
1005        &mut self,
1006        default_namespace: &str,
1007        complex: &Map<String, Value>,
1008    ) -> Result<SchemaPiece, AvroError> {
1009        let mut lookup = BTreeMap::new();
1010
1011        let fields: Vec<RecordField> = complex
1012            .get("fields")
1013            .and_then(|fields| fields.as_array())
1014            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1015            .and_then(|fields| {
1016                fields
1017                    .iter()
1018                    .filter_map(|field| field.as_object())
1019                    .enumerate()
1020                    .map(|(position, field)| {
1021                        self.parse_record_field(default_namespace, field, position)
1022                    })
1023                    .collect::<Result<_, _>>()
1024            })?;
1025
1026        for field in &fields {
1027            lookup.insert(field.name.clone(), field.position);
1028        }
1029
1030        Ok(SchemaPiece::Record {
1031            doc: complex.doc(),
1032            fields,
1033            lookup,
1034        })
1035    }
1036
1037    /// Parse a `serde_json::Value` into a `RecordField`.
1038    fn parse_record_field(
1039        &mut self,
1040        default_namespace: &str,
1041        field: &Map<String, Value>,
1042        position: usize,
1043    ) -> Result<RecordField, AvroError> {
1044        let name = field
1045            .name()
1046            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1047
1048        Name::validate(&name)?;
1049
1050        let schema = field
1051            .get("type")
1052            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1053            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1054
1055        let default = field.get("default").cloned();
1056
1057        let order = field
1058            .get("order")
1059            .and_then(|order| order.as_str())
1060            .and_then(|order| match order {
1061                "ascending" => Some(RecordFieldOrder::Ascending),
1062                "descending" => Some(RecordFieldOrder::Descending),
1063                "ignore" => Some(RecordFieldOrder::Ignore),
1064                _ => None,
1065            })
1066            .unwrap_or(RecordFieldOrder::Ascending);
1067
1068        Ok(RecordField {
1069            name,
1070            doc: field.doc(),
1071            default,
1072            schema,
1073            order,
1074            position,
1075        })
1076    }
1077
1078    /// Parse a `serde_json::Value` representing a Avro enum type into a
1079    /// `Schema`.
1080    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1081        let symbols: Vec<String> = complex
1082            .get("symbols")
1083            .and_then(|v| v.as_array())
1084            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1085            .and_then(|symbols| {
1086                symbols
1087                    .iter()
1088                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1089                    .collect::<Option<_>>()
1090                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1091            })?;
1092
1093        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1094        for symbol in symbols.iter() {
1095            if unique_symbols.contains(symbol) {
1096                return Err(ParseSchemaError::new(format!(
1097                    "Enum symbols must be unique, found multiple: {}",
1098                    symbol
1099                ))
1100                .into());
1101            } else {
1102                unique_symbols.insert(symbol);
1103            }
1104        }
1105
1106        let default_idx = if let Some(default) = complex.get("default") {
1107            let default_str = default.as_str().ok_or_else(|| {
1108                ParseSchemaError::new(format!(
1109                    "Enum default should be a string, got: {:?}",
1110                    default
1111                ))
1112            })?;
1113            let default_idx = symbols
1114                .iter()
1115                .position(|x| x == default_str)
1116                .ok_or_else(|| {
1117                    ParseSchemaError::new(format!(
1118                        "Enum default not found in list of symbols: {}",
1119                        default_str
1120                    ))
1121                })?;
1122            Some(default_idx)
1123        } else {
1124            None
1125        };
1126
1127        Ok(SchemaPiece::Enum {
1128            doc: complex.doc(),
1129            symbols,
1130            default_idx,
1131        })
1132    }
1133
1134    /// Parse a `serde_json::Value` representing a Avro array type into a
1135    /// `Schema`.
1136    fn parse_array(
1137        &mut self,
1138        default_namespace: &str,
1139        complex: &Map<String, Value>,
1140    ) -> Result<SchemaPiece, AvroError> {
1141        complex
1142            .get("items")
1143            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1144            .and_then(|items| self.parse_inner(default_namespace, items))
1145            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1146    }
1147
1148    /// Parse a `serde_json::Value` representing a Avro map type into a
1149    /// `Schema`.
1150    fn parse_map(
1151        &mut self,
1152        default_namespace: &str,
1153        complex: &Map<String, Value>,
1154    ) -> Result<SchemaPiece, AvroError> {
1155        complex
1156            .get("values")
1157            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1158            .and_then(|items| self.parse_inner(default_namespace, items))
1159            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1160    }
1161
1162    /// Parse a `serde_json::Value` representing a Avro union type into a
1163    /// `Schema`.
1164    fn parse_union(
1165        &mut self,
1166        default_namespace: &str,
1167        items: &[Value],
1168    ) -> Result<SchemaPiece, AvroError> {
1169        items
1170            .iter()
1171            .map(|value| self.parse_inner(default_namespace, value))
1172            .collect::<Result<Vec<_>, _>>()
1173            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1174    }
1175
1176    /// Parse a `serde_json::Value` representing a logical decimal type into a
1177    /// `Schema`.
1178    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1179        let precision = complex
1180            .get("precision")
1181            .and_then(|v| v.as_i64())
1182            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1183
1184        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1185
1186        if scale < 0 {
1187            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1188        }
1189
1190        if precision < 0 {
1191            return Err(
1192                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1193            );
1194        }
1195
1196        if scale > precision {
1197            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1198        }
1199
1200        Ok((precision as usize, scale as usize))
1201    }
1202
1203    /// Parse a `serde_json::Value` representing an Avro bytes type into a
1204    /// `Schema`.
1205    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1206        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1207
1208        if let Some("decimal") = logical_type {
1209            match Self::parse_decimal(complex) {
1210                Ok((precision, scale)) => {
1211                    return Ok(SchemaPiece::Decimal {
1212                        precision,
1213                        scale,
1214                        fixed_size: None,
1215                    });
1216                }
1217                Err(e) => warn!(
1218                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1219                    complex, e
1220                ),
1221            }
1222        }
1223
1224        Ok(SchemaPiece::Bytes)
1225    }
1226
1227    /// Parse a [`serde_json::Value`] representing an Avro Int type
1228    ///
1229    /// If the complex type has a `connect.name` tag (as [emitted by
1230    /// Debezium][1]) that matches a `Date` tag, we specify that the correct
1231    /// schema to use is `Date`.
1232    ///
1233    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1234    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1235        const AVRO_DATE: &str = "date";
1236        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1237        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1238        if let Some(name) = complex.get("connect.name") {
1239            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1240                if name == KAFKA_DATE {
1241                    warn!("using deprecated debezium date format");
1242                }
1243                return Ok(SchemaPiece::Date);
1244            }
1245        }
1246        // Put this after the custom semantic types so that the debezium
1247        // warning is emitted, since the logicalType tag shows up in the
1248        // deprecated debezium format :-/
1249        if let Some(name) = complex.get("logicalType") {
1250            if name == AVRO_DATE {
1251                return Ok(SchemaPiece::Date);
1252            }
1253        }
1254        if !complex.is_empty() {
1255            debug!("parsing complex type as regular int: {:?}", complex);
1256        }
1257        Ok(SchemaPiece::Int)
1258    }
1259
1260    /// Parse a [`serde_json::Value`] representing an Avro Int64/Long type
1261    ///
1262    /// The debezium/kafka types are document at [the debezium site][1], and the
1263    /// avro ones are documented at [Avro][2].
1264    ///
1265    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1266    /// [2]: https://avro.apache.org/docs/1.9.0/spec.html
1267    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1268        const AVRO_MILLI_TS: &str = "timestamp-millis";
1269        const AVRO_MICRO_TS: &str = "timestamp-micros";
1270
1271        const CONNECT_MILLI_TS: &[&str] = &[
1272            "io.debezium.time.Timestamp",
1273            "org.apache.kafka.connect.data.Timestamp",
1274        ];
1275        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1276
1277        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1278            if CONNECT_MILLI_TS.contains(&&**name) {
1279                return Ok(SchemaPiece::TimestampMilli);
1280            }
1281            if name == CONNECT_MICRO_TS {
1282                return Ok(SchemaPiece::TimestampMicro);
1283            }
1284        }
1285        if let Some(name) = complex.get("logicalType") {
1286            if name == AVRO_MILLI_TS {
1287                return Ok(SchemaPiece::TimestampMilli);
1288            }
1289            if name == AVRO_MICRO_TS {
1290                return Ok(SchemaPiece::TimestampMicro);
1291            }
1292        }
1293        if !complex.is_empty() {
1294            debug!("parsing complex type as regular long: {:?}", complex);
1295        }
1296        Ok(SchemaPiece::Long)
1297    }
1298
1299    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1300        const CONNECT_JSON: &str = "io.debezium.data.Json";
1301
1302        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1303            if CONNECT_JSON == name.as_str() {
1304                return SchemaPiece::Json;
1305            }
1306        }
1307        if let Some(name) = complex.get("logicalType") {
1308            if name == "uuid" {
1309                return SchemaPiece::Uuid;
1310            }
1311        }
1312        debug!("parsing complex type as regular string: {:?}", complex);
1313        SchemaPiece::String
1314    }
1315
1316    /// Parse a `serde_json::Value` representing a Avro fixed type into a
1317    /// `Schema`.
1318    fn parse_fixed(
1319        &self,
1320        _default_namespace: &str,
1321        complex: &Map<String, Value>,
1322    ) -> Result<SchemaPiece, AvroError> {
1323        let _name = Name::parse(complex)?;
1324
1325        let size = complex
1326            .get("size")
1327            .and_then(|v| v.as_i64())
1328            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1329        if size <= 0 {
1330            return Err(ParseSchemaError::new(format!(
1331                "Fixed values require a positive size attribute, found: {}",
1332                size
1333            ))
1334            .into());
1335        }
1336
1337        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1338
1339        if let Some("decimal") = logical_type {
1340            match Self::parse_decimal(complex) {
1341                Ok((precision, scale)) => {
1342                    let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1343                    if precision > max {
1344                        warn!(
1345                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1346                            precision, size
1347                        );
1348                    } else {
1349                        return Ok(SchemaPiece::Decimal {
1350                            precision,
1351                            scale,
1352                            fixed_size: Some(size as usize),
1353                        });
1354                    }
1355                }
1356                Err(e) => warn!(
1357                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1358                    complex, e
1359                ),
1360            }
1361        }
1362
1363        Ok(SchemaPiece::Fixed {
1364            size: size as usize,
1365        })
1366    }
1367}
1368
1369impl Schema {
1370    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
1371    /// schema.
1372    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1373        let p = SchemaParser {
1374            named: vec![],
1375            indices: Default::default(),
1376        };
1377        p.parse(value)
1378    }
1379
1380    /// Converts `self` into its [Parsing Canonical Form].
1381    ///
1382    /// [Parsing Canonical Form]:
1383    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
1384    pub fn canonical_form(&self) -> String {
1385        let json = serde_json::to_value(self).unwrap();
1386        parsing_canonical_form(&json)
1387    }
1388
1389    /// Generate [fingerprint] of Schema's [Parsing Canonical Form].
1390    ///
1391    /// [Parsing Canonical Form]:
1392    /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
1393    /// [fingerprint]:
1394    /// https://avro.apache.org/docs/current/spec.html#schema_fingerprints
1395    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1396        let mut d = D::new();
1397        d.update(self.canonical_form());
1398        SchemaFingerprint {
1399            bytes: d.finalize().to_vec(),
1400        }
1401    }
1402
1403    /// Parse a `serde_json::Value` representing a primitive Avro type into a
1404    /// `Schema`.
1405    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1406        match primitive {
1407            "null" => Ok(SchemaPiece::Null),
1408            "boolean" => Ok(SchemaPiece::Boolean),
1409            "int" => Ok(SchemaPiece::Int),
1410            "long" => Ok(SchemaPiece::Long),
1411            "double" => Ok(SchemaPiece::Double),
1412            "float" => Ok(SchemaPiece::Float),
1413            "bytes" => Ok(SchemaPiece::Bytes),
1414            "string" => Ok(SchemaPiece::String),
1415            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1416        }
1417    }
1418}
1419
1420impl FromStr for Schema {
1421    type Err = AvroError;
1422
1423    /// Create a `Schema` from a string representing a JSON Avro schema.
1424    fn from_str(input: &str) -> Result<Self, AvroError> {
1425        let value = serde_json::from_str(input)
1426            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1427        Self::parse(&value)
1428    }
1429}
1430
1431#[derive(Clone, Debug, PartialEq)]
1432pub struct NamedSchemaPiece {
1433    pub name: FullName,
1434    pub piece: SchemaPiece,
1435}
1436
1437#[derive(Copy, Clone, Debug)]
1438pub struct SchemaNode<'a> {
1439    pub root: &'a Schema,
1440    pub inner: &'a SchemaPiece,
1441    pub name: Option<&'a FullName>,
1442}
1443
1444#[derive(Copy, Clone, Debug)]
1445pub enum SchemaPieceRefOrNamed<'a> {
1446    Piece(&'a SchemaPiece),
1447    Named(usize),
1448}
1449
1450impl<'a> SchemaPieceRefOrNamed<'a> {
1451    pub fn get_human_name(&self, root: &Schema) -> String {
1452        match self {
1453            Self::Piece(piece) => format!("{:?}", piece),
1454            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1455        }
1456    }
1457
1458    #[inline(always)]
1459    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1460        match self {
1461            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1462            SchemaPieceRefOrNamed::Named(index) => {
1463                let named_piece = root.lookup(index);
1464                (&named_piece.piece, Some(&named_piece.name))
1465            }
1466        }
1467    }
1468}
1469
1470#[derive(Copy, Clone, Debug)]
1471pub struct SchemaNodeOrNamed<'a> {
1472    pub root: &'a Schema,
1473    pub inner: SchemaPieceRefOrNamed<'a>,
1474}
1475
1476impl<'a> SchemaNodeOrNamed<'a> {
1477    #[inline(always)]
1478    pub fn lookup(self) -> SchemaNode<'a> {
1479        let (inner, name) = self.inner.get_piece_and_name(self.root);
1480        SchemaNode {
1481            root: self.root,
1482            inner,
1483            name,
1484        }
1485    }
1486    #[inline(always)]
1487    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1488        self.step_ref(next.as_ref())
1489    }
1490    #[inline(always)]
1491    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1492        Self {
1493            root: self.root,
1494            inner: match next {
1495                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1496                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1497            },
1498        }
1499    }
1500
1501    pub fn to_schema(self) -> Schema {
1502        let mut cloner = SchemaSubtreeDeepCloner {
1503            old_root: self.root,
1504            old_to_new_names: Default::default(),
1505            named: Default::default(),
1506        };
1507        let piece = cloner.clone_piece_or_named(self.inner);
1508        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1509        let indices: BTreeMap<FullName, usize> = named
1510            .iter()
1511            .enumerate()
1512            .map(|(i, nsp)| (nsp.name.clone(), i))
1513            .collect();
1514        Schema {
1515            named,
1516            indices,
1517            top: piece,
1518        }
1519    }
1520
1521    pub fn namespace(self) -> Option<&'a str> {
1522        let SchemaNode { name, .. } = self.lookup();
1523        name.map(|FullName { namespace, .. }| namespace.as_str())
1524    }
1525}
1526
1527struct SchemaSubtreeDeepCloner<'a> {
1528    old_root: &'a Schema,
1529    old_to_new_names: BTreeMap<usize, usize>,
1530    named: Vec<Option<NamedSchemaPiece>>,
1531}
1532
1533impl<'a> SchemaSubtreeDeepCloner<'a> {
1534    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1535        match piece {
1536            SchemaPiece::Null => SchemaPiece::Null,
1537            SchemaPiece::Boolean => SchemaPiece::Boolean,
1538            SchemaPiece::Int => SchemaPiece::Int,
1539            SchemaPiece::Long => SchemaPiece::Long,
1540            SchemaPiece::Float => SchemaPiece::Float,
1541            SchemaPiece::Double => SchemaPiece::Double,
1542            SchemaPiece::Date => SchemaPiece::Date,
1543            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1544            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1545            SchemaPiece::Json => SchemaPiece::Json,
1546            SchemaPiece::Decimal {
1547                scale,
1548                precision,
1549                fixed_size,
1550            } => SchemaPiece::Decimal {
1551                scale: *scale,
1552                precision: *precision,
1553                fixed_size: *fixed_size,
1554            },
1555            SchemaPiece::Bytes => SchemaPiece::Bytes,
1556            SchemaPiece::String => SchemaPiece::String,
1557            SchemaPiece::Uuid => SchemaPiece::Uuid,
1558            SchemaPiece::Array(inner) => {
1559                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1560            }
1561            SchemaPiece::Map(inner) => {
1562                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1563            }
1564            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1565                schemas: us
1566                    .schemas
1567                    .iter()
1568                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1569                    .collect(),
1570                anon_variant_index: us.anon_variant_index.clone(),
1571                named_variant_index: us.named_variant_index.clone(),
1572            }),
1573            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1574            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1575            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1576            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1577            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1578            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1579            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1580            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1581            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1582            SchemaPiece::ResolveConcreteUnion {
1583                index,
1584                inner,
1585                n_reader_variants,
1586                reader_null_variant,
1587            } => SchemaPiece::ResolveConcreteUnion {
1588                index: *index,
1589                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1590                n_reader_variants: *n_reader_variants,
1591                reader_null_variant: *reader_null_variant,
1592            },
1593            SchemaPiece::ResolveUnionUnion {
1594                permutation,
1595                n_reader_variants,
1596                reader_null_variant,
1597            } => SchemaPiece::ResolveUnionUnion {
1598                permutation: permutation
1599                    .clone()
1600                    .into_iter()
1601                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1602                    .collect(),
1603                n_reader_variants: *n_reader_variants,
1604                reader_null_variant: *reader_null_variant,
1605            },
1606            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1607                SchemaPiece::ResolveUnionConcrete {
1608                    index: *index,
1609                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1610                }
1611            }
1612            SchemaPiece::Record {
1613                doc,
1614                fields,
1615                lookup,
1616            } => SchemaPiece::Record {
1617                doc: doc.clone(),
1618                fields: fields
1619                    .iter()
1620                    .map(|rf| RecordField {
1621                        name: rf.name.clone(),
1622                        doc: rf.doc.clone(),
1623                        default: rf.default.clone(),
1624                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1625                        order: rf.order,
1626                        position: rf.position,
1627                    })
1628                    .collect(),
1629                lookup: lookup.clone(),
1630            },
1631            SchemaPiece::Enum {
1632                doc,
1633                symbols,
1634                default_idx,
1635            } => SchemaPiece::Enum {
1636                doc: doc.clone(),
1637                symbols: symbols.clone(),
1638                default_idx: *default_idx,
1639            },
1640            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1641            SchemaPiece::ResolveRecord {
1642                defaults,
1643                fields,
1644                n_reader_fields,
1645            } => SchemaPiece::ResolveRecord {
1646                defaults: defaults.clone(),
1647                fields: fields
1648                    .iter()
1649                    .map(|rf| match rf {
1650                        ResolvedRecordField::Present(rf) => {
1651                            ResolvedRecordField::Present(RecordField {
1652                                name: rf.name.clone(),
1653                                doc: rf.doc.clone(),
1654                                default: rf.default.clone(),
1655                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1656                                order: rf.order,
1657                                position: rf.position,
1658                            })
1659                        }
1660                        ResolvedRecordField::Absent(writer_schema) => {
1661                            ResolvedRecordField::Absent(writer_schema.clone())
1662                        }
1663                    })
1664                    .collect(),
1665                n_reader_fields: *n_reader_fields,
1666            },
1667            SchemaPiece::ResolveEnum {
1668                doc,
1669                symbols,
1670                default,
1671            } => SchemaPiece::ResolveEnum {
1672                doc: doc.clone(),
1673                symbols: symbols.clone(),
1674                default: default.clone(),
1675            },
1676        }
1677    }
1678    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1679        match piece {
1680            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1681            SchemaPieceRefOrNamed::Named(index) => {
1682                let new_index = match self.old_to_new_names.entry(index) {
1683                    Entry::Vacant(ve) => {
1684                        let new_index = self.named.len();
1685                        self.named.push(None);
1686                        ve.insert(new_index);
1687                        let old_named_piece = self.old_root.lookup(index);
1688                        let new_named_piece = NamedSchemaPiece {
1689                            name: old_named_piece.name.clone(),
1690                            piece: self.clone_piece(&old_named_piece.piece),
1691                        };
1692                        self.named[new_index] = Some(new_named_piece);
1693                        new_index
1694                    }
1695                    Entry::Occupied(oe) => *oe.get(),
1696                };
1697                SchemaPieceOrNamed::Named(new_index)
1698            }
1699        }
1700    }
1701}
1702
1703impl<'a> SchemaNode<'a> {
1704    #[inline(always)]
1705    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1706        let (inner, name) = next.get_piece_and_name(self.root);
1707        Self {
1708            root: self.root,
1709            inner,
1710            name,
1711        }
1712    }
1713
1714    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1715        use serde_json::Value::*;
1716        let val = match (json, self.inner) {
1717            // A default value always matches the first variant of a union
1718            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1719                Some(variant) => AvroValue::Union {
1720                    index: 0,
1721                    inner: Box::new(self.step(variant).json_to_value(json)?),
1722                    n_variants: us.schemas.len(),
1723                    null_variant: us
1724                        .schemas
1725                        .iter()
1726                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1727                },
1728                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1729            },
1730            (Null, SchemaPiece::Null) => AvroValue::Null,
1731            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1732            (Number(n), piece) => {
1733                match piece {
1734                    piece if piece.is_underlying_int() => {
1735                        let i =
1736                            n.as_i64()
1737                                .and_then(|i| i32::try_from(i).ok())
1738                                .ok_or_else(|| {
1739                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1740                                })?;
1741                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1742                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1743                        })?
1744                    }
1745                    piece if piece.is_underlying_long() => {
1746                        let i = n.as_i64().ok_or_else(|| {
1747                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1748                        })?;
1749                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1750                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1751                        })?
1752                    }
1753                    SchemaPiece::Float => {
1754                        let f = n.as_f64().ok_or_else(|| {
1755                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1756                        })?;
1757                        AvroValue::Float(f as f32)
1758                    }
1759                    SchemaPiece::Double => {
1760                        let f = n.as_f64().ok_or_else(|| {
1761                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1762                        })?;
1763                        AvroValue::Double(f)
1764                    }
1765                    _ => {
1766                        return Err(ParseSchemaError(format!(
1767                            "Unexpected number in default: {}",
1768                            n
1769                        )));
1770                    }
1771                }
1772            }
1773            (String(s), piece)
1774                if s.eq_ignore_ascii_case("nan")
1775                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1776            {
1777                match piece {
1778                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1779                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1780                    _ => unreachable!(),
1781                }
1782            }
1783            (String(s), piece)
1784                if s.eq_ignore_ascii_case("infinity")
1785                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1786            {
1787                match piece {
1788                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1789                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1790                    _ => unreachable!(),
1791                }
1792            }
1793            (String(s), piece)
1794                if s.eq_ignore_ascii_case("-infinity")
1795                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1796            {
1797                match piece {
1798                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1799                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1800                    _ => unreachable!(),
1801                }
1802            }
1803            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1804            (
1805                String(s),
1806                SchemaPiece::Decimal {
1807                    precision, scale, ..
1808                },
1809            ) => AvroValue::Decimal(DecimalValue {
1810                precision: *precision,
1811                scale: *scale,
1812                unscaled: s.clone().into_bytes(),
1813            }),
1814            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1815            (Object(map), SchemaPiece::Record { fields, .. }) => {
1816                let field_values = fields
1817                    .iter()
1818                    .map(|rf| {
1819                        let jval = map.get(&rf.name).ok_or_else(|| {
1820                            ParseSchemaError(format!(
1821                                "Field not found in default value: {}",
1822                                rf.name
1823                            ))
1824                        })?;
1825                        let value = self.step(&rf.schema).json_to_value(jval)?;
1826                        Ok((rf.name.clone(), value))
1827                    })
1828                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1829                AvroValue::Record(field_values)
1830            }
1831            (String(s), SchemaPiece::Enum { symbols, .. }) => {
1832                match symbols.iter().find_position(|sym| s == *sym) {
1833                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1834                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1835                }
1836            }
1837            (Array(vals), SchemaPiece::Array(inner)) => {
1838                let node = self.step(&**inner);
1839                let vals = vals
1840                    .iter()
1841                    .map(|val| node.json_to_value(val))
1842                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1843                AvroValue::Array(vals)
1844            }
1845            (Object(map), SchemaPiece::Map(inner)) => {
1846                let node = self.step(&**inner);
1847                let map = map
1848                    .iter()
1849                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1850                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1851                AvroValue::Map(map)
1852            }
1853            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1854                AvroValue::Fixed(*size, s.clone().into_bytes())
1855            }
1856            _ => {
1857                return Err(ParseSchemaError(format!(
1858                    "Json default value {} does not match schema",
1859                    json
1860                )));
1861            }
1862        };
1863        Ok(val)
1864    }
1865}
1866
1867#[derive(Clone)]
1868struct SchemaSerContext<'a> {
1869    node: SchemaNodeOrNamed<'a>,
1870    // This does not logically need Rc<RefCell<_>> semantics --
1871    // it is only ever mutated in one stack frame at a time.
1872    // But AFAICT serde doesn't expose a way to
1873    // provide some mutable context to every node in the tree...
1874    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1875    /// The namespace of this node's parent, or "" by default
1876    enclosing_ns: &'a str,
1877}
1878
1879#[derive(Clone)]
1880struct RecordFieldSerContext<'a> {
1881    outer: &'a SchemaSerContext<'a>,
1882    inner: &'a RecordField,
1883}
1884
1885impl<'a> SchemaSerContext<'a> {
1886    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1887        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1888        Self {
1889            node: self.node.step_ref(next),
1890            seen_named: Rc::clone(&self.seen_named),
1891            enclosing_ns: ns,
1892        }
1893    }
1894}
1895
1896impl<'a> Serialize for SchemaSerContext<'a> {
1897    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1898    where
1899        S: Serializer,
1900    {
1901        match self.node.inner {
1902            SchemaPieceRefOrNamed::Piece(piece) => match piece {
1903                SchemaPiece::Null => serializer.serialize_str("null"),
1904                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
1905                SchemaPiece::Int => serializer.serialize_str("int"),
1906                SchemaPiece::Long => serializer.serialize_str("long"),
1907                SchemaPiece::Float => serializer.serialize_str("float"),
1908                SchemaPiece::Double => serializer.serialize_str("double"),
1909                SchemaPiece::Date => {
1910                    let mut map = serializer.serialize_map(Some(2))?;
1911                    map.serialize_entry("type", "int")?;
1912                    map.serialize_entry("logicalType", "date")?;
1913                    map.end()
1914                }
1915                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
1916                    let mut map = serializer.serialize_map(Some(2))?;
1917                    map.serialize_entry("type", "long")?;
1918                    if piece == &SchemaPiece::TimestampMilli {
1919                        map.serialize_entry("logicalType", "timestamp-millis")?;
1920                    } else {
1921                        map.serialize_entry("logicalType", "timestamp-micros")?;
1922                    }
1923                    map.end()
1924                }
1925                SchemaPiece::Decimal {
1926                    precision,
1927                    scale,
1928                    fixed_size: None,
1929                } => {
1930                    let mut map = serializer.serialize_map(Some(4))?;
1931                    map.serialize_entry("type", "bytes")?;
1932                    map.serialize_entry("precision", precision)?;
1933                    map.serialize_entry("scale", scale)?;
1934                    map.serialize_entry("logicalType", "decimal")?;
1935                    map.end()
1936                }
1937                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
1938                SchemaPiece::String => serializer.serialize_str("string"),
1939                SchemaPiece::Array(inner) => {
1940                    let mut map = serializer.serialize_map(Some(2))?;
1941                    map.serialize_entry("type", "array")?;
1942                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
1943                    map.end()
1944                }
1945                SchemaPiece::Map(inner) => {
1946                    let mut map = serializer.serialize_map(Some(2))?;
1947                    map.serialize_entry("type", "map")?;
1948                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
1949                    map.end()
1950                }
1951                SchemaPiece::Union(inner) => {
1952                    let variants = inner.variants();
1953                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
1954                    for v in variants {
1955                        seq.serialize_element(&self.step(v.as_ref()))?;
1956                    }
1957                    seq.end()
1958                }
1959                SchemaPiece::Json => {
1960                    let mut map = serializer.serialize_map(Some(2))?;
1961                    map.serialize_entry("type", "string")?;
1962                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
1963                    map.end()
1964                }
1965                SchemaPiece::Uuid => {
1966                    let mut map = serializer.serialize_map(Some(4))?;
1967                    map.serialize_entry("type", "string")?;
1968                    map.serialize_entry("logicalType", "uuid")?;
1969                    map.end()
1970                }
1971                SchemaPiece::Record { .. }
1972                | SchemaPiece::Decimal {
1973                    fixed_size: Some(_),
1974                    ..
1975                }
1976                | SchemaPiece::Enum { .. }
1977                | SchemaPiece::Fixed { .. } => {
1978                    unreachable!("Unexpected named schema piece in anonymous schema position")
1979                }
1980                SchemaPiece::ResolveIntLong
1981                | SchemaPiece::ResolveDateTimestamp
1982                | SchemaPiece::ResolveIntFloat
1983                | SchemaPiece::ResolveIntDouble
1984                | SchemaPiece::ResolveLongFloat
1985                | SchemaPiece::ResolveLongDouble
1986                | SchemaPiece::ResolveFloatDouble
1987                | SchemaPiece::ResolveConcreteUnion { .. }
1988                | SchemaPiece::ResolveUnionUnion { .. }
1989                | SchemaPiece::ResolveUnionConcrete { .. }
1990                | SchemaPiece::ResolveRecord { .. }
1991                | SchemaPiece::ResolveIntTsMicro
1992                | SchemaPiece::ResolveIntTsMilli
1993                | SchemaPiece::ResolveEnum { .. } => {
1994                    panic!("Attempted to serialize resolved schema")
1995                }
1996            },
1997            SchemaPieceRefOrNamed::Named(index) => {
1998                let mut map = self.seen_named.borrow_mut();
1999                let named_piece = match map.get(&index) {
2000                    Some(name) => {
2001                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2002                    }
2003                    None => self.node.root.lookup(index),
2004                };
2005                let name = &named_piece.name;
2006                map.insert(index, name.clone());
2007                std::mem::drop(map);
2008                match &named_piece.piece {
2009                    SchemaPiece::Record { doc, fields, .. } => {
2010                        let mut map = serializer.serialize_map(None)?;
2011                        map.serialize_entry("type", "record")?;
2012                        map.serialize_entry("name", &name.name)?;
2013                        if self.enclosing_ns != &name.namespace {
2014                            map.serialize_entry("namespace", &name.namespace)?;
2015                        }
2016                        if let Some(docstr) = doc {
2017                            map.serialize_entry("doc", docstr)?;
2018                        }
2019                        // TODO (brennan) - serialize aliases
2020                        map.serialize_entry(
2021                            "fields",
2022                            &fields
2023                                .iter()
2024                                .map(|f| RecordFieldSerContext {
2025                                    outer: self,
2026                                    inner: f,
2027                                })
2028                                .collect::<Vec<_>>(),
2029                        )?;
2030                        map.end()
2031                    }
2032                    SchemaPiece::Enum {
2033                        symbols,
2034                        default_idx,
2035                        ..
2036                    } => {
2037                        let mut map = serializer.serialize_map(None)?;
2038                        map.serialize_entry("type", "enum")?;
2039                        map.serialize_entry("name", &name.name)?;
2040                        if self.enclosing_ns != &name.namespace {
2041                            map.serialize_entry("namespace", &name.namespace)?;
2042                        }
2043                        map.serialize_entry("symbols", symbols)?;
2044                        if let Some(default_idx) = *default_idx {
2045                            assert!(default_idx < symbols.len());
2046                            map.serialize_entry("default", &symbols[default_idx])?;
2047                        }
2048                        map.end()
2049                    }
2050                    SchemaPiece::Fixed { size } => {
2051                        let mut map = serializer.serialize_map(None)?;
2052                        map.serialize_entry("type", "fixed")?;
2053                        map.serialize_entry("name", &name.name)?;
2054                        if self.enclosing_ns != &name.namespace {
2055                            map.serialize_entry("namespace", &name.namespace)?;
2056                        }
2057                        map.serialize_entry("size", size)?;
2058                        map.end()
2059                    }
2060                    SchemaPiece::Decimal {
2061                        scale,
2062                        precision,
2063                        fixed_size: Some(size),
2064                    } => {
2065                        let mut map = serializer.serialize_map(Some(6))?;
2066                        map.serialize_entry("type", "fixed")?;
2067                        map.serialize_entry("logicalType", "decimal")?;
2068                        map.serialize_entry("name", &name.name)?;
2069                        if self.enclosing_ns != &name.namespace {
2070                            map.serialize_entry("namespace", &name.namespace)?;
2071                        }
2072                        map.serialize_entry("size", size)?;
2073                        map.serialize_entry("precision", precision)?;
2074                        map.serialize_entry("scale", scale)?;
2075                        map.end()
2076                    }
2077                    SchemaPiece::Null
2078                    | SchemaPiece::Boolean
2079                    | SchemaPiece::Int
2080                    | SchemaPiece::Long
2081                    | SchemaPiece::Float
2082                    | SchemaPiece::Double
2083                    | SchemaPiece::Date
2084                    | SchemaPiece::TimestampMilli
2085                    | SchemaPiece::TimestampMicro
2086                    | SchemaPiece::Decimal {
2087                        fixed_size: None, ..
2088                    }
2089                    | SchemaPiece::Bytes
2090                    | SchemaPiece::String
2091                    | SchemaPiece::Array(_)
2092                    | SchemaPiece::Map(_)
2093                    | SchemaPiece::Union(_)
2094                    | SchemaPiece::Uuid
2095                    | SchemaPiece::Json => {
2096                        unreachable!("Unexpected anonymous schema piece in named schema position")
2097                    }
2098                    SchemaPiece::ResolveIntLong
2099                    | SchemaPiece::ResolveDateTimestamp
2100                    | SchemaPiece::ResolveIntFloat
2101                    | SchemaPiece::ResolveIntDouble
2102                    | SchemaPiece::ResolveLongFloat
2103                    | SchemaPiece::ResolveLongDouble
2104                    | SchemaPiece::ResolveFloatDouble
2105                    | SchemaPiece::ResolveConcreteUnion { .. }
2106                    | SchemaPiece::ResolveUnionUnion { .. }
2107                    | SchemaPiece::ResolveUnionConcrete { .. }
2108                    | SchemaPiece::ResolveRecord { .. }
2109                    | SchemaPiece::ResolveIntTsMilli
2110                    | SchemaPiece::ResolveIntTsMicro
2111                    | SchemaPiece::ResolveEnum { .. } => {
2112                        panic!("Attempted to serialize resolved schema")
2113                    }
2114                }
2115            }
2116        }
2117    }
2118}
2119
2120impl Serialize for Schema {
2121    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2122    where
2123        S: Serializer,
2124    {
2125        let ctx = SchemaSerContext {
2126            node: SchemaNodeOrNamed {
2127                root: self,
2128                inner: self.top.as_ref(),
2129            },
2130            seen_named: Rc::new(RefCell::new(Default::default())),
2131            enclosing_ns: "",
2132        };
2133        ctx.serialize(serializer)
2134    }
2135}
2136
2137impl<'a> Serialize for RecordFieldSerContext<'a> {
2138    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2139    where
2140        S: Serializer,
2141    {
2142        let mut map = serializer.serialize_map(None)?;
2143        map.serialize_entry("name", &self.inner.name)?;
2144        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2145        if let Some(default) = &self.inner.default {
2146            map.serialize_entry("default", default)?;
2147        }
2148        if let Some(doc) = &self.inner.doc {
2149            map.serialize_entry("doc", doc)?;
2150        }
2151        map.end()
2152    }
2153}
2154
2155/// Parses a **valid** avro schema into the Parsing Canonical Form.
2156/// <https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas>
2157fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2158    pcf(schema, "", false)
2159}
2160
2161fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2162    match schema {
2163        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2164        serde_json::Value::String(s) => pcf_string(s),
2165        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2166        serde_json::Value::Number(n) => n.to_string(),
2167        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2168    }
2169}
2170
2171fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2172    // Look for the namespace variant up front.
2173    let default_ns = schema
2174        .get("namespace")
2175        .and_then(|v| v.as_str())
2176        .unwrap_or(enclosing_ns);
2177    let mut fields = Vec::new();
2178    let mut found_next_ns = None;
2179    let mut deferred_values = vec![];
2180    for (k, v) in schema {
2181        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
2182        if schema.len() == 1 && k == "type" {
2183            // Invariant: function is only callable from a valid schema, so this is acceptable.
2184            if let serde_json::Value::String(s) = v {
2185                return pcf_string(s);
2186            }
2187        }
2188
2189        // Strip out unused fields ([STRIP] rule)
2190        if field_ordering_position(k).is_none() {
2191            continue;
2192        }
2193
2194        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
2195        if k == "name" {
2196            // The `fields` stanza needs special handling, as it has "name"
2197            // fields that don't get canonicalized (since they are not type names).
2198            if in_fields {
2199                fields.push((
2200                    k,
2201                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2202                ));
2203                continue;
2204            }
2205            // Invariant: Only valid schemas. Must be a string.
2206            let name = v.as_str().unwrap();
2207            assert!(
2208                found_next_ns.is_none(),
2209                "`name` must not be specified multiple times"
2210            );
2211            let next_ns = match name.rsplit_once('.') {
2212                None => default_ns,
2213                Some((ns, _name)) => ns,
2214            };
2215            found_next_ns = Some(next_ns);
2216            let n = if next_ns.is_empty() {
2217                Cow::Borrowed(name)
2218            } else {
2219                Cow::Owned(format!("{next_ns}.{name}"))
2220            };
2221            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2222            continue;
2223        }
2224
2225        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
2226        if k == "size" {
2227            let i = match v.as_str() {
2228                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2229                None => v.as_i64().unwrap(),
2230            };
2231            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2232            continue;
2233        }
2234
2235        // For anything else, recursively process the result
2236        // (deferred until we know the namespace)
2237        deferred_values.push((k, v));
2238    }
2239
2240    let next_ns = found_next_ns.unwrap_or(default_ns);
2241    for (k, v) in deferred_values {
2242        fields.push((
2243            k,
2244            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2245        ));
2246    }
2247
2248    // Sort the fields by their canonical ordering ([ORDER] rule).
2249    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2250    let inter = fields
2251        .into_iter()
2252        .map(|(_, v)| v)
2253        .collect::<Vec<_>>()
2254        .join(",");
2255    format!("{{{}}}", inter)
2256}
2257
2258fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2259    let inter = arr
2260        .iter()
2261        .map(|s| pcf(s, enclosing_ns, in_fields))
2262        .collect::<Vec<String>>()
2263        .join(",");
2264    format!("[{}]", inter)
2265}
2266
2267fn pcf_string(s: &str) -> String {
2268    format!("\"{}\"", s)
2269}
2270
2271// Used to define the ordering and inclusion of fields.
2272fn field_ordering_position(field: &str) -> Option<usize> {
2273    let v = match field {
2274        "name" => 1,
2275        "type" => 2,
2276        "fields" => 3,
2277        "symbols" => 4,
2278        "items" => 5,
2279        "values" => 6,
2280        "size" => 7,
2281        _ => return None,
2282    };
2283
2284    Some(v)
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289    use mz_ore::{assert_err, assert_ok};
2290
2291    use crate::types::{Record, ToAvro};
2292
2293    use super::*;
2294
2295    fn check_schema(schema: &str, expected: SchemaPiece) {
2296        let schema = Schema::from_str(schema).unwrap();
2297        assert_eq!(&expected, schema.top_node().inner);
2298
2299        // Test serialization round trip
2300        let schema = serde_json::to_string(&schema).unwrap();
2301        let schema = Schema::from_str(&schema).unwrap();
2302        assert_eq!(&expected, schema.top_node().inner);
2303    }
2304
2305    #[mz_ore::test]
2306    fn test_primitive_schema() {
2307        check_schema("\"null\"", SchemaPiece::Null);
2308        check_schema("\"int\"", SchemaPiece::Int);
2309        check_schema("\"double\"", SchemaPiece::Double);
2310    }
2311
2312    #[mz_ore::test]
2313    fn test_array_schema() {
2314        check_schema(
2315            r#"{"type": "array", "items": "string"}"#,
2316            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2317        );
2318    }
2319
2320    #[mz_ore::test]
2321    fn test_map_schema() {
2322        check_schema(
2323            r#"{"type": "map", "values": "double"}"#,
2324            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2325        );
2326    }
2327
2328    #[mz_ore::test]
2329    fn test_union_schema() {
2330        check_schema(
2331            r#"["null", "int"]"#,
2332            SchemaPiece::Union(
2333                UnionSchema::new(vec![
2334                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2335                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2336                ])
2337                .unwrap(),
2338            ),
2339        );
2340    }
2341
2342    #[mz_ore::test]
2343    fn test_multi_union_schema() {
2344        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2345        assert_ok!(schema);
2346        let schema = schema.unwrap();
2347        let node = schema.top_node();
2348        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2349        let union_schema = match node.inner {
2350            SchemaPiece::Union(u) => u,
2351            _ => unreachable!(),
2352        };
2353        assert_eq!(union_schema.variants().len(), 5);
2354        let mut variants = union_schema.variants().iter();
2355        assert_eq!(
2356            SchemaKind::from(node.step(variants.next().unwrap())),
2357            SchemaKind::Null
2358        );
2359        assert_eq!(
2360            SchemaKind::from(node.step(variants.next().unwrap())),
2361            SchemaKind::Int
2362        );
2363        assert_eq!(
2364            SchemaKind::from(node.step(variants.next().unwrap())),
2365            SchemaKind::Float
2366        );
2367        assert_eq!(
2368            SchemaKind::from(node.step(variants.next().unwrap())),
2369            SchemaKind::String
2370        );
2371        assert_eq!(
2372            SchemaKind::from(node.step(variants.next().unwrap())),
2373            SchemaKind::Bytes
2374        );
2375        assert_eq!(variants.next(), None);
2376    }
2377
2378    #[mz_ore::test]
2379    fn test_record_schema() {
2380        let schema = r#"
2381                {
2382                    "type": "record",
2383                    "name": "test",
2384                    "doc": "record doc",
2385                    "fields": [
2386                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2387                        {"name": "b", "doc": "b doc", "type": "string"}
2388                    ]
2389                }
2390            "#;
2391
2392        let mut lookup = BTreeMap::new();
2393        lookup.insert("a".to_owned(), 0);
2394        lookup.insert("b".to_owned(), 1);
2395
2396        let expected = SchemaPiece::Record {
2397            doc: Some("record doc".to_string()),
2398            fields: vec![
2399                RecordField {
2400                    name: "a".to_string(),
2401                    doc: Some("a doc".to_string()),
2402                    default: Some(Value::Number(42i64.into())),
2403                    schema: SchemaPiece::Long.into(),
2404                    order: RecordFieldOrder::Ascending,
2405                    position: 0,
2406                },
2407                RecordField {
2408                    name: "b".to_string(),
2409                    doc: Some("b doc".to_string()),
2410                    default: None,
2411                    schema: SchemaPiece::String.into(),
2412                    order: RecordFieldOrder::Ascending,
2413                    position: 1,
2414                },
2415            ],
2416            lookup,
2417        };
2418
2419        check_schema(schema, expected);
2420    }
2421
2422    #[mz_ore::test]
2423    fn test_enum_schema() {
2424        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2425
2426        let expected = SchemaPiece::Enum {
2427            doc: None,
2428            symbols: vec![
2429                "diamonds".to_owned(),
2430                "spades".to_owned(),
2431                "jokers".to_owned(),
2432                "clubs".to_owned(),
2433                "hearts".to_owned(),
2434            ],
2435            default_idx: Some(2),
2436        };
2437
2438        check_schema(schema, expected);
2439
2440        let bad_schema = Schema::from_str(
2441            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2442        );
2443
2444        assert_err!(bad_schema);
2445    }
2446
2447    #[mz_ore::test]
2448    fn test_fixed_schema() {
2449        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2450
2451        let expected = SchemaPiece::Fixed { size: 16usize };
2452
2453        check_schema(schema, expected);
2454    }
2455
2456    #[mz_ore::test]
2457    fn test_date_schema() {
2458        let kinds = &[
2459            r#"{
2460                    "type": "int",
2461                    "name": "datish",
2462                    "logicalType": "date"
2463                }"#,
2464            r#"{
2465                    "type": "int",
2466                    "name": "datish",
2467                    "connect.name": "io.debezium.time.Date"
2468                }"#,
2469            r#"{
2470                    "type": "int",
2471                    "name": "datish",
2472                    "connect.name": "org.apache.kafka.connect.data.Date"
2473                }"#,
2474        ];
2475        for kind in kinds {
2476            check_schema(*kind, SchemaPiece::Date);
2477
2478            let schema = Schema::from_str(*kind).unwrap();
2479            assert_eq!(
2480                serde_json::to_string(&schema).unwrap(),
2481                r#"{"type":"int","logicalType":"date"}"#
2482            );
2483        }
2484    }
2485
2486    #[mz_ore::test]
2487    fn new_field_in_middle() {
2488        let reader = r#"{
2489            "type": "record",
2490            "name": "MyRecord",
2491            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2492        }"#;
2493        let writer = r#"{
2494            "type": "record",
2495            "name": "MyRecord",
2496            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2497        }"#;
2498        let reader = Schema::from_str(reader).unwrap();
2499        let writer = Schema::from_str(writer).unwrap();
2500
2501        let mut record = Record::new(writer.top_node()).unwrap();
2502        record.put("f1", 1);
2503        record.put("f2", 2);
2504        record.put("f_interposed", 42);
2505
2506        let value = record.avro();
2507
2508        let mut buf = vec![];
2509        crate::encode::encode(&value, &writer, &mut buf);
2510
2511        let resolved = resolve_schemas(&writer, &reader).unwrap();
2512
2513        let reader = &mut &buf[..];
2514        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2515        let expected = crate::types::Value::Record(vec![
2516            ("f1".to_string(), crate::types::Value::Int(1)),
2517            ("f2".to_string(), crate::types::Value::Int(2)),
2518        ]);
2519        assert_eq!(reader_value, expected);
2520        assert!(reader.is_empty()); // all bytes should have been consumed
2521    }
2522
2523    #[mz_ore::test]
2524    fn new_field_at_end() {
2525        let reader = r#"{
2526            "type": "record",
2527            "name": "MyRecord",
2528            "fields": [{"name": "f1", "type": "int"}]
2529        }"#;
2530        let writer = r#"{
2531            "type": "record",
2532            "name": "MyRecord",
2533            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2534        }"#;
2535        let reader = Schema::from_str(reader).unwrap();
2536        let writer = Schema::from_str(writer).unwrap();
2537
2538        let mut record = Record::new(writer.top_node()).unwrap();
2539        record.put("f1", 1);
2540        record.put("f2", 2);
2541
2542        let value = record.avro();
2543
2544        let mut buf = vec![];
2545        crate::encode::encode(&value, &writer, &mut buf);
2546
2547        let resolved = resolve_schemas(&writer, &reader).unwrap();
2548
2549        let reader = &mut &buf[..];
2550        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2551        let expected =
2552            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2553        assert_eq!(reader_value, expected);
2554        assert!(reader.is_empty()); // all bytes should have been consumed
2555    }
2556
2557    #[mz_ore::test]
2558    fn default_non_nums() {
2559        let reader = r#"{
2560            "type": "record",
2561            "name": "MyRecord",
2562            "fields": [
2563                {"name": "f1", "type": "double", "default": "NaN"},
2564                {"name": "f2", "type": "double", "default": "Infinity"},
2565                {"name": "f3", "type": "double", "default": "-Infinity"}
2566            ]
2567        }
2568        "#;
2569        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2570
2571        let writer_schema = Schema::from_str(writer).unwrap();
2572        let reader_schema = Schema::from_str(reader).unwrap();
2573        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2574
2575        let record = Record::new(writer_schema.top_node()).unwrap();
2576
2577        let value = record.avro();
2578        let mut buf = vec![];
2579        crate::encode::encode(&value, &writer_schema, &mut buf);
2580
2581        let reader = &mut &buf[..];
2582        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2583        let expected = crate::types::Value::Record(vec![
2584            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2585            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2586            (
2587                "f3".to_string(),
2588                crate::types::Value::Double(f64::NEG_INFINITY),
2589            ),
2590        ]);
2591
2592        #[derive(Debug)]
2593        struct NanEq(crate::types::Value);
2594        impl std::cmp::PartialEq for NanEq {
2595            fn eq(&self, other: &Self) -> bool {
2596                match (self, other) {
2597                    (
2598                        NanEq(crate::types::Value::Double(x)),
2599                        NanEq(crate::types::Value::Double(y)),
2600                    ) if x.is_nan() && y.is_nan() => true,
2601                    (
2602                        NanEq(crate::types::Value::Float(x)),
2603                        NanEq(crate::types::Value::Float(y)),
2604                    ) if x.is_nan() && y.is_nan() => true,
2605                    (
2606                        NanEq(crate::types::Value::Record(xs)),
2607                        NanEq(crate::types::Value::Record(ys)),
2608                    ) => {
2609                        let xs = xs
2610                            .iter()
2611                            .cloned()
2612                            .map(|(k, v)| (k, NanEq(v)))
2613                            .collect::<Vec<_>>();
2614                        let ys = ys
2615                            .iter()
2616                            .cloned()
2617                            .map(|(k, v)| (k, NanEq(v)))
2618                            .collect::<Vec<_>>();
2619
2620                        xs == ys
2621                    }
2622                    (NanEq(x), NanEq(y)) => x == y,
2623                }
2624            }
2625        }
2626
2627        assert_eq!(NanEq(reader_value), NanEq(expected));
2628        assert!(reader.is_empty());
2629    }
2630
2631    #[mz_ore::test]
2632    fn test_decimal_schemas() {
2633        let schema = r#"{
2634                "type": "fixed",
2635                "name": "dec",
2636                "size": 8,
2637                "logicalType": "decimal",
2638                "precision": 12,
2639                "scale": 5
2640            }"#;
2641        let expected = SchemaPiece::Decimal {
2642            precision: 12,
2643            scale: 5,
2644            fixed_size: Some(8),
2645        };
2646        check_schema(schema, expected);
2647
2648        let schema = r#"{
2649                "type": "bytes",
2650                "logicalType": "decimal",
2651                "precision": 12,
2652                "scale": 5
2653            }"#;
2654        let expected = SchemaPiece::Decimal {
2655            precision: 12,
2656            scale: 5,
2657            fixed_size: None,
2658        };
2659        check_schema(schema, expected);
2660
2661        let res = Schema::from_str(
2662            r#"["bytes", {
2663                "type": "bytes",
2664                "logicalType": "decimal",
2665                "precision": 12,
2666                "scale": 5
2667            }]"#,
2668        );
2669        assert_eq!(
2670            res.unwrap_err().to_string(),
2671            "Schema parse error: Unions cannot contain duplicate types"
2672        );
2673
2674        let writer_schema = Schema::from_str(
2675            r#"["null", {
2676                "type": "bytes"
2677            }]"#,
2678        )
2679        .unwrap();
2680        let reader_schema = Schema::from_str(
2681            r#"["null", {
2682                "type": "bytes",
2683                "logicalType": "decimal",
2684                "precision": 12,
2685                "scale": 5
2686            }]"#,
2687        )
2688        .unwrap();
2689        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2690
2691        let expected = SchemaPiece::ResolveUnionUnion {
2692            permutation: vec![
2693                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2694                Ok((
2695                    1,
2696                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2697                        precision: 12,
2698                        scale: 5,
2699                        fixed_size: None,
2700                    }),
2701                )),
2702            ],
2703            n_reader_variants: 2,
2704            reader_null_variant: Some(0),
2705        };
2706        assert_eq!(resolved.top_node().inner, &expected);
2707    }
2708
2709    #[mz_ore::test]
2710    fn test_no_documentation() {
2711        let schema =
2712            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2713                .unwrap();
2714
2715        let doc = match schema.top_node().inner {
2716            SchemaPiece::Enum { doc, .. } => doc.clone(),
2717            _ => panic!(),
2718        };
2719
2720        assert_none!(doc);
2721    }
2722
2723    #[mz_ore::test]
2724    fn test_documentation() {
2725        let schema = Schema::from_str(
2726                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2727            ).unwrap();
2728
2729        let doc = match schema.top_node().inner {
2730            SchemaPiece::Enum { doc, .. } => doc.clone(),
2731            _ => None,
2732        };
2733
2734        assert_eq!("Some documentation".to_owned(), doc.unwrap());
2735    }
2736
2737    #[mz_ore::test]
2738    fn test_namespaces_and_names() {
2739        // When name and namespace specified, full name should contain both.
2740        let schema = Schema::from_str(
2741            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2742        )
2743        .unwrap();
2744        assert_eq!(schema.named.len(), 1);
2745        assert_eq!(
2746            schema.named[0].name,
2747            FullName {
2748                name: "name".into(),
2749                namespace: "namespace".into()
2750            }
2751        );
2752
2753        // When name contains dots, parse the dot-separated name as the namespace.
2754        let schema =
2755            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2756                .unwrap();
2757        assert_eq!(schema.named.len(), 1);
2758        assert_eq!(
2759            schema.named[0].name,
2760            FullName {
2761                name: "dots".into(),
2762                namespace: "name.has".into()
2763            }
2764        );
2765
2766        // Same as above, ignore any provided namespace.
2767        let schema = Schema::from_str(
2768            r#"{"type": "enum", "namespace": "namespace",
2769            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2770        )
2771        .unwrap();
2772        assert_eq!(schema.named.len(), 1);
2773        assert_eq!(
2774            schema.named[0].name,
2775            FullName {
2776                name: "dots".into(),
2777                namespace: "name.has".into()
2778            }
2779        );
2780
2781        // Use default namespace when namespace is not provided.
2782        // Materialize uses "" as the default namespace.
2783        let schema = Schema::from_str(
2784            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2785            "fields": [{"name": "name", "type": "string"}]}"#,
2786        )
2787        .unwrap();
2788        assert_eq!(schema.named.len(), 1);
2789        assert_eq!(
2790            schema.named[0].name,
2791            FullName {
2792                name: "TestDoc".into(),
2793                namespace: "".into()
2794            }
2795        );
2796
2797        // Empty namespace strings should be allowed.
2798        let schema = Schema::from_str(
2799            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2800            "fields": [{"name": "name", "type": "string"}]}"#,
2801        )
2802        .unwrap();
2803        assert_eq!(schema.named.len(), 1);
2804        assert_eq!(
2805            schema.named[0].name,
2806            FullName {
2807                name: "TestDoc".into(),
2808                namespace: "".into()
2809            }
2810        );
2811
2812        // Equality of names is defined on the FullName and is case-sensitive.
2813        let first = Schema::from_str(
2814            r#"{"type": "fixed", "namespace": "namespace",
2815            "name": "name", "size": 1}"#,
2816        )
2817        .unwrap();
2818        let second = Schema::from_str(
2819            r#"{"type": "fixed", "name": "namespace.name",
2820            "size": 1}"#,
2821        )
2822        .unwrap();
2823        assert_eq!(first.named[0].name, second.named[0].name);
2824
2825        let first = Schema::from_str(
2826            r#"{"type": "fixed", "namespace": "namespace",
2827            "name": "name", "size": 1}"#,
2828        )
2829        .unwrap();
2830        let second = Schema::from_str(
2831            r#"{"type": "fixed", "name": "namespace.Name",
2832            "size": 1}"#,
2833        )
2834        .unwrap();
2835        assert_ne!(first.named[0].name, second.named[0].name);
2836
2837        let first = Schema::from_str(
2838            r#"{"type": "fixed", "namespace": "Namespace",
2839            "name": "name", "size": 1}"#,
2840        )
2841        .unwrap();
2842        let second = Schema::from_str(
2843            r#"{"type": "fixed", "namespace": "namespace",
2844            "name": "name", "size": 1}"#,
2845        )
2846        .unwrap();
2847        assert_ne!(first.named[0].name, second.named[0].name);
2848
2849        // The name portion of a fullname, record field names, and enum symbols must:
2850        // start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
2851        assert!(
2852            Schema::from_str(
2853                r#"{"type": "record", "name": "99 problems but a name aint one",
2854            "fields": [{"name": "name", "type": "string"}]}"#
2855            )
2856            .is_err()
2857        );
2858
2859        assert!(
2860            Schema::from_str(
2861                r#"{"type": "record", "name": "!!!",
2862            "fields": [{"name": "name", "type": "string"}]}"#
2863            )
2864            .is_err()
2865        );
2866
2867        assert!(
2868            Schema::from_str(
2869                r#"{"type": "record", "name": "_valid_until_©",
2870            "fields": [{"name": "name", "type": "string"}]}"#
2871            )
2872            .is_err()
2873        );
2874
2875        // Use previously defined names and namespaces as type.
2876        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2877              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2878              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2879              {"name": "f3", "type": "MyEnum"},
2880              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2881              {"name": "f5", "type": "other.namespace.OtherEnum"},
2882              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2883              {"name": "f7", "type": "some.other.ThirdEnum"}
2884             ]}"#).unwrap();
2885        assert_eq!(schema.named.len(), 4);
2886
2887        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2888            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // f1
2889            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // f2
2890            assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); // f3
2891            assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); // f4
2892            assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); // f5
2893            assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); // f6
2894            assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); // f7
2895        } else {
2896            panic!("Expected SchemaPiece::Record, found something else");
2897        }
2898
2899        let schema = Schema::from_str(
2900            r#"{"type": "record", "name": "x.Y", "fields": [
2901              {"name": "e", "type":
2902                {"type": "record", "name": "Z", "fields": [
2903                  {"name": "f", "type": "x.Y"},
2904                  {"name": "g", "type": "x.Z"}
2905                ]}
2906              }
2907            ]}"#,
2908        )
2909        .unwrap();
2910        assert_eq!(schema.named.len(), 2);
2911
2912        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2913            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // e
2914        } else {
2915            panic!("Expected SchemaPiece::Record, found something else");
2916        }
2917
2918        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
2919            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); // f
2920            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // g
2921        } else {
2922            panic!("Expected SchemaPiece::Record, found something else");
2923        }
2924
2925        let schema = Schema::from_str(
2926            r#"{"type": "record", "name": "R", "fields": [
2927              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
2928                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
2929                 "symbols": ["Foo", "Bar"]}
2930                }
2931              ]}},
2932              {"name": "t", "type": "Z"}
2933            ]}"#,
2934        )
2935        .unwrap();
2936        assert_eq!(schema.named.len(), 3);
2937
2938        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2939            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // s
2940            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); // t - refers to "".Z
2941        } else {
2942            panic!("Expected SchemaPiece::Record, found something else");
2943        }
2944    }
2945
2946    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
2947    // compile, they pass.
2948    #[mz_ore::test]
2949    fn test_schema_is_send() {
2950        fn send<S: Send>(_s: S) {}
2951
2952        let schema = Schema {
2953            named: vec![],
2954            indices: Default::default(),
2955            top: SchemaPiece::Null.into(),
2956        };
2957        send(schema);
2958    }
2959
2960    #[mz_ore::test]
2961    fn test_schema_is_sync() {
2962        fn sync<S: Sync>(_s: S) {}
2963
2964        let schema = Schema {
2965            named: vec![],
2966            indices: Default::default(),
2967            top: SchemaPiece::Null.into(),
2968        };
2969        sync(&schema);
2970        sync(schema);
2971    }
2972
2973    #[mz_ore::test]
2974    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
2975    fn test_schema_fingerprint() {
2976        use sha2::Sha256;
2977
2978        let raw_schema = r#"
2979        {
2980            "type": "record",
2981            "name": "test",
2982            "fields": [
2983                {"name": "a", "type": "long", "default": 42},
2984                {"name": "b", "type": "string"}
2985            ]
2986        }
2987    "#;
2988        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
2989        let schema = Schema::from_str(raw_schema).unwrap();
2990        assert_eq!(&schema.canonical_form(), expected_canonical);
2991        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
2992        assert_eq!(
2993            format!("{}", schema.fingerprint::<Sha256>()),
2994            expected_fingerprint
2995        );
2996
2997        let raw_schema = r#"
2998{
2999  "type": "record",
3000  "name": "ns.r1",
3001  "namespace": "ignored",
3002  "fields": [
3003    {
3004      "name": "f1",
3005      "type": {
3006        "type": "fixed",
3007        "name": "r2",
3008        "size": 1
3009      }
3010    }
3011  ]
3012}
3013"#;
3014        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3015        let schema = Schema::from_str(raw_schema).unwrap();
3016        assert_eq!(&schema.canonical_form(), expected_canonical);
3017        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3018        assert_eq!(
3019            format!("{}", schema.fingerprint::<Sha256>()),
3020            expected_fingerprint
3021        );
3022    }
3023
3024    #[mz_ore::test]
3025    fn test_make_valid() {
3026        for (input, expected) in [
3027            ("foo", "foo"),
3028            ("az99", "az99"),
3029            ("99az", "_99az"),
3030            ("is,bad", "is_bad"),
3031            ("@#$%", "____"),
3032            ("i-amMisBehaved!", "i_amMisBehaved_"),
3033            ("", "_"),
3034        ] {
3035            let actual = Name::make_valid(input);
3036            assert_eq!(expected, actual, "Name::make_valid({input})")
3037        }
3038    }
3039}