mz_avro/
schema.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for parsing and interacting with schemas in Avro format.
25
26use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92/// Describes errors happened while parsing Avro schemas.
93#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113/// Represents an Avro schema fingerprint
114/// More information about Avro schema fingerprints can be found in the
115/// [Avro Schema Resolution documentation](https://avro.apache.org/docs/++version++/specification/#schema-resolution)
116#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    /// A `null` Avro schema.
174    Null,
175    /// A `boolean` Avro schema.
176    Boolean,
177    /// An `int` Avro schema.
178    Int,
179    /// A `long` Avro schema.
180    Long,
181    /// A `float` Avro schema.
182    Float,
183    /// A `double` Avro schema.
184    Double,
185    /// An `Int` Avro schema with a semantic type being days since the unix epoch.
186    Date,
187    /// An `Int64` Avro schema with a semantic type being milliseconds since the unix epoch.
188    ///
189    /// <https://avro.apache.org/docs/++version++/specification/#time_ms>
190    TimestampMilli,
191    /// An `Int64` Avro schema with a semantic type being microseconds since the unix epoch.
192    ///
193    /// <https://avro.apache.org/docs/++version++/specification/#time-microsecond-precision>
194    TimestampMicro,
195    /// A `bytes` or `fixed` Avro schema with a logical type of `decimal` and
196    /// the specified precision and scale.
197    ///
198    /// If the underlying type is `fixed`,
199    /// the `fixed_size` field specifies the size.
200    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    /// A `bytes` Avro schema.
206    /// `Bytes` represents a sequence of 8-bit unsigned bytes.
207    Bytes,
208    /// A `string` Avro schema.
209    /// `String` represents a unicode character sequence.
210    String,
211    /// A `string` Avro schema that is tagged as representing JSON data
212    Json,
213    /// A `string` Avro schema with a logical type of `uuid`.
214    Uuid,
215    /// A `array` Avro schema. Avro arrays are required to have the same type for each element.
216    /// This variant holds the `Schema` for the array element type.
217    Array(Box<SchemaPieceOrNamed>),
218    /// A `map` Avro schema.
219    /// `Map` holds a pointer to the `Schema` of its values, which must all be the same schema.
220    /// `Map` keys are assumed to be `string`.
221    Map(Box<SchemaPieceOrNamed>),
222    /// A `union` Avro schema.
223    Union(UnionSchema),
224    /// A value written as `int` and read as `long`,
225    /// for the timestamp-millis logicalType.
226    ResolveIntTsMilli,
227    /// A value written as `int` and read as `long`,
228    /// for the timestamp-micros logicalType.
229    ResolveIntTsMicro,
230    /// A value written as an `int` with `date` logical type,
231    /// and read as any timestamp type
232    ResolveDateTimestamp,
233    /// A value written as `int` and read as `long`
234    ResolveIntLong,
235    /// A value written as `int` and read as `float`
236    ResolveIntFloat,
237    /// A value written as `int` and read as `double`
238    ResolveIntDouble,
239    /// A value written as `long` and read as `float`
240    ResolveLongFloat,
241    /// A value written as `long` and read as `double`
242    ResolveLongDouble,
243    /// A value written as `float` and read as `double`
244    ResolveFloatDouble,
245    /// A concrete (i.e., non-`union`) type in the writer,
246    /// resolved against one specific variant of a `union` in the reader.
247    ResolveConcreteUnion {
248        /// The index of the variant in the reader
249        index: usize,
250        /// The concrete type
251        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    /// A union in the writer, resolved against a union in the reader.
256    /// The two schemas may have different variants and the variants may be in a different order.
257    ResolveUnionUnion {
258        /// A mapping of the fields in the writer to those in the reader.
259        /// If the `i`th element is `Err(e)`, the `i`th field in the writer
260        /// did not match any field in the reader (or even if it matched by name, resolution failed).
261        /// If the `i`th element is `Ok((j, piece))`, then the `i`th field of the writer
262        /// matched the `j`th field of the reader, and `piece` is their resolved node.
263        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    /// The inverse of `ResolveConcreteUnion`
268    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    /// A `record` Avro schema.
273    ///
274    /// The `lookup` table maps field names to their position in the `Vec`
275    /// of `fields`.
276    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    /// An `enum` Avro schema.
282    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        /// The index of the default value.
286        ///
287        /// This is only used in schema resolution: it is the value that
288        /// will be read by a reader when a writer writes a value that the reader
289        /// does not expect.
290        default_idx: Option<usize>,
291    },
292    /// A `fixed` Avro schema.
293    Fixed { size: usize },
294    /// A record in the writer, resolved against a record in the reader.
295    /// The two schemas may have different fields and the fields may be in a different order.
296    ResolveRecord {
297        /// Fields that do not exist in the writer schema, but had a default
298        /// value specified in the reader schema, which we use.
299        defaults: Vec<ResolvedDefaultValueField>,
300        /// Fields in the order of their appearance in the writer schema.
301        /// `Present` if they could be resolved against a field in the reader schema;
302        /// `Absent` otherwise.
303        fields: Vec<ResolvedRecordField>,
304        /// The size of `defaults`, plus the number of `Present` values in `fields`.
305        n_reader_fields: usize,
306    },
307    /// An enum in the writer, resolved against an enum in the reader.
308    /// The two schemas may have different values and the values may be in a different order.
309    ResolveEnum {
310        doc: Documentation,
311        /// Symbols in order of the writer schema along with their index in the reader schema,
312        /// or `Err(symbol_name)` if they don't exist in the reader schema.
313        symbols: Vec<Result<(usize, String), String>>,
314        /// The value to decode if the writer writes some value not expected by the reader.
315        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    /// Returns whether the schema node is "underlyingly" an Int (but possibly a logicalType typedef)
321    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    /// Returns whether the schema node is "underlyingly" an Int64 (but possibly a logicalType typedef)
325    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    /// Constructs an `avro::Value` if this is of underlying int type.
329    /// Guaranteed to be `Some` when `is_underlying_int` is `true`.
330    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            // TODO[btv] - should we bounds-check the date here? We
334            // don't elsewhere... maybe we should everywhere.
335            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    /// Constructs an `avro::Value` if this is of underlying long type.
340    /// Guaranteed to be `Some` when `is_underlying_long` is `true`.
341    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351/// Represents any valid Avro schema
352/// More information about Avro schemas can be found in the
353/// [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration)
354#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode<'_> {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407/// This type is used to simplify enum variant comparison between `Schema` and `types::Value`.
408///
409/// **NOTE** This type was introduced due to a limitation of `mem::discriminant` requiring a _value_
410/// be constructed in order to get the discriminant, which makes it difficult to implement a
411/// function that maps from `Discriminant<Schema> -> Discriminant<Value>`. Conversion into this
412/// intermediate type should be especially fast, as the number of enum variants is small, which
413/// _should_ compile into a jump-table for the conversion.
414#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    // Fixed-length types
417    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    // Variable-length types
424    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    // This can arise in resolved schemas, particularly when a union resolves to a non-union.
433    // We would need to do a lookup to find the actual type.
434    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521/// Represents names for `record`, `enum` and `fixed` Avro schemas.
522///
523/// Each of these `Schema`s have a `fullname` composed of two parts:
524///   * a name
525///   * a namespace
526///
527/// `aliases` can also be defined, to facilitate schema evolution.
528///
529/// More information about schema names can be found in the
530/// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
531#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    // [XXX] btv - what happens if `name` contains dots, _and_ `namespace` is `Some` ?
546    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    /// Get the shortest unambiguous synonym of this name
573    /// at a given point in the schema graph. If this name
574    /// is in the same namespace as the enclosing node, this
575    /// returns the short name; otherwise, it returns the fully qualified name.
576    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    /// Returns the namespace of the name
584    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595/// Represents documentation for complex Avro schemas.
596pub type Documentation = Option<String>;
597
598impl Name {
599    /// Reports whether the given string is a valid Avro name.
600    ///
601    /// See: <https://avro.apache.org/docs/++version++/specification/#names>
602    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    /// Returns an error if the given name is invalid.
609    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    /// Rewrites `name` to be valid.
622    ///
623    /// Any non alphanumeric characters are replaced with underscores. If the
624    /// name begins with a number, it is prefixed with `_`.
625    ///
626    /// `make_valid` is not injective. Multiple invalid names are mapped to the
627    /// the same valid name.
628    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    /// Parse a `serde_json::map::Map` into a `Name`.
653    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    /// Parses a name from a simple string.
702    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    /// Return the `fullname` of this `Name`
709    ///
710    /// More information about fullnames can be found in the
711    /// [Avro specification](https://avro.apache.org/docs/++version++/specification/#names)
712    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732/// Represents a `field` in a `record` Avro schema.
733#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    /// Name of the field.
736    pub name: String,
737    /// Documentation of the field.
738    pub doc: Documentation,
739    /// Default value of the field.
740    /// This value will be used when reading Avro datum if schema resolution
741    /// is enabled.
742    pub default: Option<Value>,
743    /// Schema of the field.
744    pub schema: SchemaPieceOrNamed,
745    /// Order of the field.
746    ///
747    /// **NOTE** This currently has no effect.
748    pub order: RecordFieldOrder,
749    /// Position of the field in the list of `field` of its parent `Schema`
750    pub position: usize,
751}
752
753/// Represents any valid order for a `field` in a `record` Avro schema.
754#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    // Used to ensure uniqueness of anonymous schema inputs, and provide constant time finding of the
768    // schema index given a value.
769    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    // Same as above, for named input references
772    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    /// Returns a slice to all variants of this schema.
812    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    /// Returns true if the first variant of this `UnionSchema` is `Null`.
817    pub fn is_nullable(&self) -> bool {
818        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
819    }
820
821    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
822        self.anon_variant_index
823            .get(&SchemaKind::from(sp))
824            .map(|idx| (*idx, &self.schemas[*idx]))
825    }
826
827    pub fn match_ref(
828        &self,
829        other: SchemaPieceRefOrNamed,
830        names_map: &BTreeMap<usize, usize>,
831    ) -> Option<(usize, &SchemaPieceOrNamed)> {
832        match other {
833            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
834            SchemaPieceRefOrNamed::Named(idx) => names_map
835                .get(&idx)
836                .and_then(|idx| self.named_variant_index.get(idx))
837                .map(|idx| (*idx, &self.schemas[*idx])),
838        }
839    }
840
841    #[inline(always)]
842    pub fn match_(
843        &self,
844        other: &SchemaPieceOrNamed,
845        names_map: &BTreeMap<usize, usize>,
846    ) -> Option<(usize, &SchemaPieceOrNamed)> {
847        self.match_ref(other.as_ref(), names_map)
848    }
849}
850
851// No need to compare variant_index, it is derivative of schemas.
852impl PartialEq for UnionSchema {
853    fn eq(&self, other: &UnionSchema) -> bool {
854        self.schemas.eq(&other.schemas)
855    }
856}
857
858#[derive(Default)]
859struct SchemaParser {
860    named: Vec<Option<NamedSchemaPiece>>,
861    indices: BTreeMap<FullName, usize>,
862}
863
864impl SchemaParser {
865    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
866        let top = self.parse_inner("", value)?;
867        let SchemaParser { named, indices } = self;
868        Ok(Schema {
869            named: named.into_iter().map(|o| o.unwrap()).collect(),
870            indices,
871            top,
872        })
873    }
874
875    fn parse_inner(
876        &mut self,
877        default_namespace: &str,
878        value: &Value,
879    ) -> Result<SchemaPieceOrNamed, AvroError> {
880        match *value {
881            Value::String(ref t) => {
882                let name = FullName::from_parts(t.as_str(), None, default_namespace);
883                if let Some(idx) = self.indices.get(&name) {
884                    Ok(SchemaPieceOrNamed::Named(*idx))
885                } else {
886                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
887                        t.as_str(),
888                    )?))
889                }
890            }
891            Value::Object(ref data) => self.parse_complex(default_namespace, data),
892            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
893                self.parse_union(default_namespace, data)?,
894            )),
895            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
896        }
897    }
898
899    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
900        let idx = match self.indices.entry(fullname) {
901            Entry::Vacant(ve) => *ve.insert(self.named.len()),
902            Entry::Occupied(oe) => {
903                return Err(ParseSchemaError::new(format!(
904                    "Sub-schema with name {:?} encountered multiple times",
905                    oe.key()
906                ))
907                .into());
908            }
909        };
910        self.named.push(None);
911        Ok(idx)
912    }
913
914    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
915        assert_none!(self.named[index]);
916        self.named[index] = Some(schema);
917    }
918
919    fn parse_named_type(
920        &mut self,
921        type_name: &str,
922        default_namespace: &str,
923        complex: &Map<String, Value>,
924    ) -> Result<usize, AvroError> {
925        let name = Name::parse(complex)?;
926        match name.name.as_str() {
927            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
928                return Err(ParseSchemaError::new(format!(
929                    "{} may not be used as a custom type name",
930                    name.name
931                ))
932                .into());
933            }
934            _ => {}
935        };
936        let fullname = name.fullname(default_namespace);
937        let default_namespace = fullname.namespace.clone();
938        let idx = self.alloc_name(fullname.clone())?;
939        let piece = match type_name {
940            "record" => self.parse_record(&default_namespace, complex),
941            "enum" => self.parse_enum(complex),
942            "fixed" => self.parse_fixed(&default_namespace, complex),
943            _ => unreachable!("Unknown named type kind: {}", type_name),
944        }?;
945
946        self.insert(
947            idx,
948            NamedSchemaPiece {
949                name: fullname,
950                piece,
951            },
952        );
953
954        Ok(idx)
955    }
956
957    /// Parse a `serde_json::Value` representing a complex Avro type into a
958    /// `Schema`.
959    ///
960    /// Avro supports "recursive" definition of types.
961    /// e.g: {"type": {"type": "string"}}
962    fn parse_complex(
963        &mut self,
964        default_namespace: &str,
965        complex: &Map<String, Value>,
966    ) -> Result<SchemaPieceOrNamed, AvroError> {
967        match complex.get("type") {
968            Some(&Value::String(ref t)) => Ok(match t.as_str() {
969                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
970                    t,
971                    default_namespace,
972                    complex,
973                )?),
974                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
975                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
976                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
977                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
978                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
979                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
980                other => {
981                    let name = FullName {
982                        name: other.into(),
983                        namespace: default_namespace.into(),
984                    };
985                    if let Some(idx) = self.indices.get(&name) {
986                        SchemaPieceOrNamed::Named(*idx)
987                    } else {
988                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
989                    }
990                }
991            }),
992            Some(&Value::Object(ref data)) => match data.get("type") {
993                Some(value) => self.parse_inner(default_namespace, value),
994                None => Err(
995                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
996                ),
997            },
998            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
999        }
1000    }
1001
1002    /// Parse a `serde_json::Value` representing a Avro record type into a
1003    /// `Schema`.
1004    fn parse_record(
1005        &mut self,
1006        default_namespace: &str,
1007        complex: &Map<String, Value>,
1008    ) -> Result<SchemaPiece, AvroError> {
1009        let mut lookup = BTreeMap::new();
1010
1011        let fields: Vec<RecordField> = complex
1012            .get("fields")
1013            .and_then(|fields| fields.as_array())
1014            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1015            .and_then(|fields| {
1016                fields
1017                    .iter()
1018                    .filter_map(|field| field.as_object())
1019                    .enumerate()
1020                    .map(|(position, field)| {
1021                        self.parse_record_field(default_namespace, field, position)
1022                    })
1023                    .collect::<Result<_, _>>()
1024            })?;
1025
1026        for field in &fields {
1027            lookup.insert(field.name.clone(), field.position);
1028        }
1029
1030        Ok(SchemaPiece::Record {
1031            doc: complex.doc(),
1032            fields,
1033            lookup,
1034        })
1035    }
1036
1037    /// Parse a `serde_json::Value` into a `RecordField`.
1038    fn parse_record_field(
1039        &mut self,
1040        default_namespace: &str,
1041        field: &Map<String, Value>,
1042        position: usize,
1043    ) -> Result<RecordField, AvroError> {
1044        let name = field
1045            .name()
1046            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1047
1048        Name::validate(&name)?;
1049
1050        let schema = field
1051            .get("type")
1052            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1053            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1054
1055        let default = field.get("default").cloned();
1056
1057        let order = field
1058            .get("order")
1059            .and_then(|order| order.as_str())
1060            .and_then(|order| match order {
1061                "ascending" => Some(RecordFieldOrder::Ascending),
1062                "descending" => Some(RecordFieldOrder::Descending),
1063                "ignore" => Some(RecordFieldOrder::Ignore),
1064                _ => None,
1065            })
1066            .unwrap_or(RecordFieldOrder::Ascending);
1067
1068        Ok(RecordField {
1069            name,
1070            doc: field.doc(),
1071            default,
1072            schema,
1073            order,
1074            position,
1075        })
1076    }
1077
1078    /// Parse a `serde_json::Value` representing a Avro enum type into a
1079    /// `Schema`.
1080    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1081        let symbols: Vec<String> = complex
1082            .get("symbols")
1083            .and_then(|v| v.as_array())
1084            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1085            .and_then(|symbols| {
1086                symbols
1087                    .iter()
1088                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1089                    .collect::<Option<_>>()
1090                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1091            })?;
1092
1093        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1094        for symbol in symbols.iter() {
1095            if unique_symbols.contains(symbol) {
1096                return Err(ParseSchemaError::new(format!(
1097                    "Enum symbols must be unique, found multiple: {}",
1098                    symbol
1099                ))
1100                .into());
1101            } else {
1102                unique_symbols.insert(symbol);
1103            }
1104        }
1105
1106        let default_idx = if let Some(default) = complex.get("default") {
1107            let default_str = default.as_str().ok_or_else(|| {
1108                ParseSchemaError::new(format!(
1109                    "Enum default should be a string, got: {:?}",
1110                    default
1111                ))
1112            })?;
1113            let default_idx = symbols
1114                .iter()
1115                .position(|x| x == default_str)
1116                .ok_or_else(|| {
1117                    ParseSchemaError::new(format!(
1118                        "Enum default not found in list of symbols: {}",
1119                        default_str
1120                    ))
1121                })?;
1122            Some(default_idx)
1123        } else {
1124            None
1125        };
1126
1127        Ok(SchemaPiece::Enum {
1128            doc: complex.doc(),
1129            symbols,
1130            default_idx,
1131        })
1132    }
1133
1134    /// Parse a `serde_json::Value` representing a Avro array type into a
1135    /// `Schema`.
1136    fn parse_array(
1137        &mut self,
1138        default_namespace: &str,
1139        complex: &Map<String, Value>,
1140    ) -> Result<SchemaPiece, AvroError> {
1141        complex
1142            .get("items")
1143            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1144            .and_then(|items| self.parse_inner(default_namespace, items))
1145            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1146    }
1147
1148    /// Parse a `serde_json::Value` representing a Avro map type into a
1149    /// `Schema`.
1150    fn parse_map(
1151        &mut self,
1152        default_namespace: &str,
1153        complex: &Map<String, Value>,
1154    ) -> Result<SchemaPiece, AvroError> {
1155        complex
1156            .get("values")
1157            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1158            .and_then(|items| self.parse_inner(default_namespace, items))
1159            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1160    }
1161
1162    /// Parse a `serde_json::Value` representing a Avro union type into a
1163    /// `Schema`.
1164    fn parse_union(
1165        &mut self,
1166        default_namespace: &str,
1167        items: &[Value],
1168    ) -> Result<SchemaPiece, AvroError> {
1169        items
1170            .iter()
1171            .map(|value| self.parse_inner(default_namespace, value))
1172            .collect::<Result<Vec<_>, _>>()
1173            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1174    }
1175
1176    /// Parse a `serde_json::Value` representing a logical decimal type into a
1177    /// `Schema`.
1178    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1179        let precision = complex
1180            .get("precision")
1181            .and_then(|v| v.as_i64())
1182            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1183
1184        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1185
1186        if scale < 0 {
1187            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1188        }
1189
1190        if precision < 0 {
1191            return Err(
1192                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1193            );
1194        }
1195
1196        if scale > precision {
1197            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1198        }
1199
1200        Ok((precision as usize, scale as usize))
1201    }
1202
1203    /// Parse a `serde_json::Value` representing an Avro bytes type into a
1204    /// `Schema`.
1205    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1206        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1207
1208        if let Some("decimal") = logical_type {
1209            match Self::parse_decimal(complex) {
1210                Ok((precision, scale)) => {
1211                    return Ok(SchemaPiece::Decimal {
1212                        precision,
1213                        scale,
1214                        fixed_size: None,
1215                    });
1216                }
1217                Err(e) => warn!(
1218                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1219                    complex, e
1220                ),
1221            }
1222        }
1223
1224        Ok(SchemaPiece::Bytes)
1225    }
1226
1227    /// Parse a [`serde_json::Value`] representing an Avro Int type
1228    ///
1229    /// If the complex type has a `connect.name` tag (as [emitted by
1230    /// Debezium][1]) that matches a `Date` tag, we specify that the correct
1231    /// schema to use is `Date`.
1232    ///
1233    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1234    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1235        const AVRO_DATE: &str = "date";
1236        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1237        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1238        if let Some(name) = complex.get("connect.name") {
1239            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1240                if name == KAFKA_DATE {
1241                    warn!("using deprecated debezium date format");
1242                }
1243                return Ok(SchemaPiece::Date);
1244            }
1245        }
1246        // Put this after the custom semantic types so that the debezium
1247        // warning is emitted, since the logicalType tag shows up in the
1248        // deprecated debezium format :-/
1249        if let Some(name) = complex.get("logicalType") {
1250            if name == AVRO_DATE {
1251                return Ok(SchemaPiece::Date);
1252            }
1253        }
1254        if !complex.is_empty() {
1255            debug!("parsing complex type as regular int: {:?}", complex);
1256        }
1257        Ok(SchemaPiece::Int)
1258    }
1259
1260    /// Parse a [`serde_json::Value`] representing an Avro Int64/Long type
1261    ///
1262    /// The debezium/kafka types are document at [the debezium site][1], and the
1263    /// avro ones are documented at [Avro][2].
1264    ///
1265    /// [1]: https://debezium.io/docs/connectors/mysql/#temporal-values
1266    /// [2]: https://avro.apache.org/docs/++version++/specification/
1267    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1268        const AVRO_MILLI_TS: &str = "timestamp-millis";
1269        const AVRO_MICRO_TS: &str = "timestamp-micros";
1270
1271        const CONNECT_MILLI_TS: &[&str] = &[
1272            "io.debezium.time.Timestamp",
1273            "org.apache.kafka.connect.data.Timestamp",
1274        ];
1275        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1276
1277        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1278            if CONNECT_MILLI_TS.contains(&&**name) {
1279                return Ok(SchemaPiece::TimestampMilli);
1280            }
1281            if name == CONNECT_MICRO_TS {
1282                return Ok(SchemaPiece::TimestampMicro);
1283            }
1284        }
1285        if let Some(name) = complex.get("logicalType") {
1286            if name == AVRO_MILLI_TS {
1287                return Ok(SchemaPiece::TimestampMilli);
1288            }
1289            if name == AVRO_MICRO_TS {
1290                return Ok(SchemaPiece::TimestampMicro);
1291            }
1292        }
1293        if !complex.is_empty() {
1294            debug!("parsing complex type as regular long: {:?}", complex);
1295        }
1296        Ok(SchemaPiece::Long)
1297    }
1298
1299    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1300        const CONNECT_JSON: &str = "io.debezium.data.Json";
1301
1302        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1303            if CONNECT_JSON == name.as_str() {
1304                return SchemaPiece::Json;
1305            }
1306        }
1307        if let Some(name) = complex.get("logicalType") {
1308            if name == "uuid" {
1309                return SchemaPiece::Uuid;
1310            }
1311        }
1312        debug!("parsing complex type as regular string: {:?}", complex);
1313        SchemaPiece::String
1314    }
1315
1316    /// Parse a `serde_json::Value` representing a Avro fixed type into a
1317    /// `Schema`.
1318    fn parse_fixed(
1319        &self,
1320        _default_namespace: &str,
1321        complex: &Map<String, Value>,
1322    ) -> Result<SchemaPiece, AvroError> {
1323        let _name = Name::parse(complex)?;
1324
1325        let size = complex
1326            .get("size")
1327            .and_then(|v| v.as_i64())
1328            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1329        if size <= 0 {
1330            return Err(ParseSchemaError::new(format!(
1331                "Fixed values require a positive size attribute, found: {}",
1332                size
1333            ))
1334            .into());
1335        }
1336
1337        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1338
1339        if let Some("decimal") = logical_type {
1340            match Self::parse_decimal(complex) {
1341                Ok((precision, scale)) => {
1342                    let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1343                    if precision > max {
1344                        warn!(
1345                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1346                            precision, size
1347                        );
1348                    } else {
1349                        return Ok(SchemaPiece::Decimal {
1350                            precision,
1351                            scale,
1352                            fixed_size: Some(size as usize),
1353                        });
1354                    }
1355                }
1356                Err(e) => warn!(
1357                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1358                    complex, e
1359                ),
1360            }
1361        }
1362
1363        Ok(SchemaPiece::Fixed {
1364            size: size as usize,
1365        })
1366    }
1367}
1368
1369impl Schema {
1370    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
1371    /// schema.
1372    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1373        let p = SchemaParser {
1374            named: vec![],
1375            indices: Default::default(),
1376        };
1377        p.parse(value)
1378    }
1379
1380    /// Converts `self` into its [Parsing Canonical Form].
1381    ///
1382    /// [Parsing Canonical Form]:
1383    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1384    pub fn canonical_form(&self) -> String {
1385        let json = serde_json::to_value(self).unwrap();
1386        parsing_canonical_form(&json)
1387    }
1388
1389    /// Generate fingerprint of Schema's [Parsing Canonical Form].
1390    ///
1391    /// [Parsing Canonical Form]:
1392    /// https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas
1393    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1394        let mut d = D::new();
1395        d.update(self.canonical_form());
1396        SchemaFingerprint {
1397            bytes: d.finalize().to_vec(),
1398        }
1399    }
1400
1401    /// Parse a `serde_json::Value` representing a primitive Avro type into a
1402    /// `Schema`.
1403    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1404        match primitive {
1405            "null" => Ok(SchemaPiece::Null),
1406            "boolean" => Ok(SchemaPiece::Boolean),
1407            "int" => Ok(SchemaPiece::Int),
1408            "long" => Ok(SchemaPiece::Long),
1409            "double" => Ok(SchemaPiece::Double),
1410            "float" => Ok(SchemaPiece::Float),
1411            "bytes" => Ok(SchemaPiece::Bytes),
1412            "string" => Ok(SchemaPiece::String),
1413            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1414        }
1415    }
1416}
1417
1418impl FromStr for Schema {
1419    type Err = AvroError;
1420
1421    /// Create a `Schema` from a string representing a JSON Avro schema.
1422    fn from_str(input: &str) -> Result<Self, AvroError> {
1423        let value = serde_json::from_str(input)
1424            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1425        Self::parse(&value)
1426    }
1427}
1428
1429#[derive(Clone, Debug, PartialEq)]
1430pub struct NamedSchemaPiece {
1431    pub name: FullName,
1432    pub piece: SchemaPiece,
1433}
1434
1435#[derive(Copy, Clone, Debug)]
1436pub struct SchemaNode<'a> {
1437    pub root: &'a Schema,
1438    pub inner: &'a SchemaPiece,
1439    pub name: Option<&'a FullName>,
1440}
1441
1442#[derive(Copy, Clone, Debug)]
1443pub enum SchemaPieceRefOrNamed<'a> {
1444    Piece(&'a SchemaPiece),
1445    Named(usize),
1446}
1447
1448impl<'a> SchemaPieceRefOrNamed<'a> {
1449    pub fn get_human_name(&self, root: &Schema) -> String {
1450        match self {
1451            Self::Piece(piece) => format!("{:?}", piece),
1452            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1453        }
1454    }
1455
1456    #[inline(always)]
1457    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1458        match self {
1459            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1460            SchemaPieceRefOrNamed::Named(index) => {
1461                let named_piece = root.lookup(index);
1462                (&named_piece.piece, Some(&named_piece.name))
1463            }
1464        }
1465    }
1466}
1467
1468#[derive(Copy, Clone, Debug)]
1469pub struct SchemaNodeOrNamed<'a> {
1470    pub root: &'a Schema,
1471    pub inner: SchemaPieceRefOrNamed<'a>,
1472}
1473
1474impl<'a> SchemaNodeOrNamed<'a> {
1475    #[inline(always)]
1476    pub fn lookup(self) -> SchemaNode<'a> {
1477        let (inner, name) = self.inner.get_piece_and_name(self.root);
1478        SchemaNode {
1479            root: self.root,
1480            inner,
1481            name,
1482        }
1483    }
1484    #[inline(always)]
1485    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1486        self.step_ref(next.as_ref())
1487    }
1488    #[inline(always)]
1489    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1490        Self {
1491            root: self.root,
1492            inner: match next {
1493                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1494                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1495            },
1496        }
1497    }
1498
1499    pub fn to_schema(self) -> Schema {
1500        let mut cloner = SchemaSubtreeDeepCloner {
1501            old_root: self.root,
1502            old_to_new_names: Default::default(),
1503            named: Default::default(),
1504        };
1505        let piece = cloner.clone_piece_or_named(self.inner);
1506        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1507        let indices: BTreeMap<FullName, usize> = named
1508            .iter()
1509            .enumerate()
1510            .map(|(i, nsp)| (nsp.name.clone(), i))
1511            .collect();
1512        Schema {
1513            named,
1514            indices,
1515            top: piece,
1516        }
1517    }
1518
1519    pub fn namespace(self) -> Option<&'a str> {
1520        let SchemaNode { name, .. } = self.lookup();
1521        name.map(|FullName { namespace, .. }| namespace.as_str())
1522    }
1523}
1524
1525struct SchemaSubtreeDeepCloner<'a> {
1526    old_root: &'a Schema,
1527    old_to_new_names: BTreeMap<usize, usize>,
1528    named: Vec<Option<NamedSchemaPiece>>,
1529}
1530
1531impl<'a> SchemaSubtreeDeepCloner<'a> {
1532    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1533        match piece {
1534            SchemaPiece::Null => SchemaPiece::Null,
1535            SchemaPiece::Boolean => SchemaPiece::Boolean,
1536            SchemaPiece::Int => SchemaPiece::Int,
1537            SchemaPiece::Long => SchemaPiece::Long,
1538            SchemaPiece::Float => SchemaPiece::Float,
1539            SchemaPiece::Double => SchemaPiece::Double,
1540            SchemaPiece::Date => SchemaPiece::Date,
1541            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1542            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1543            SchemaPiece::Json => SchemaPiece::Json,
1544            SchemaPiece::Decimal {
1545                scale,
1546                precision,
1547                fixed_size,
1548            } => SchemaPiece::Decimal {
1549                scale: *scale,
1550                precision: *precision,
1551                fixed_size: *fixed_size,
1552            },
1553            SchemaPiece::Bytes => SchemaPiece::Bytes,
1554            SchemaPiece::String => SchemaPiece::String,
1555            SchemaPiece::Uuid => SchemaPiece::Uuid,
1556            SchemaPiece::Array(inner) => {
1557                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1558            }
1559            SchemaPiece::Map(inner) => {
1560                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1561            }
1562            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1563                schemas: us
1564                    .schemas
1565                    .iter()
1566                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1567                    .collect(),
1568                anon_variant_index: us.anon_variant_index.clone(),
1569                named_variant_index: us.named_variant_index.clone(),
1570            }),
1571            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1572            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1573            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1574            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1575            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1576            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1577            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1578            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1579            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1580            SchemaPiece::ResolveConcreteUnion {
1581                index,
1582                inner,
1583                n_reader_variants,
1584                reader_null_variant,
1585            } => SchemaPiece::ResolveConcreteUnion {
1586                index: *index,
1587                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1588                n_reader_variants: *n_reader_variants,
1589                reader_null_variant: *reader_null_variant,
1590            },
1591            SchemaPiece::ResolveUnionUnion {
1592                permutation,
1593                n_reader_variants,
1594                reader_null_variant,
1595            } => SchemaPiece::ResolveUnionUnion {
1596                permutation: permutation
1597                    .clone()
1598                    .into_iter()
1599                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1600                    .collect(),
1601                n_reader_variants: *n_reader_variants,
1602                reader_null_variant: *reader_null_variant,
1603            },
1604            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1605                SchemaPiece::ResolveUnionConcrete {
1606                    index: *index,
1607                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1608                }
1609            }
1610            SchemaPiece::Record {
1611                doc,
1612                fields,
1613                lookup,
1614            } => SchemaPiece::Record {
1615                doc: doc.clone(),
1616                fields: fields
1617                    .iter()
1618                    .map(|rf| RecordField {
1619                        name: rf.name.clone(),
1620                        doc: rf.doc.clone(),
1621                        default: rf.default.clone(),
1622                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1623                        order: rf.order,
1624                        position: rf.position,
1625                    })
1626                    .collect(),
1627                lookup: lookup.clone(),
1628            },
1629            SchemaPiece::Enum {
1630                doc,
1631                symbols,
1632                default_idx,
1633            } => SchemaPiece::Enum {
1634                doc: doc.clone(),
1635                symbols: symbols.clone(),
1636                default_idx: *default_idx,
1637            },
1638            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1639            SchemaPiece::ResolveRecord {
1640                defaults,
1641                fields,
1642                n_reader_fields,
1643            } => SchemaPiece::ResolveRecord {
1644                defaults: defaults.clone(),
1645                fields: fields
1646                    .iter()
1647                    .map(|rf| match rf {
1648                        ResolvedRecordField::Present(rf) => {
1649                            ResolvedRecordField::Present(RecordField {
1650                                name: rf.name.clone(),
1651                                doc: rf.doc.clone(),
1652                                default: rf.default.clone(),
1653                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1654                                order: rf.order,
1655                                position: rf.position,
1656                            })
1657                        }
1658                        ResolvedRecordField::Absent(writer_schema) => {
1659                            ResolvedRecordField::Absent(writer_schema.clone())
1660                        }
1661                    })
1662                    .collect(),
1663                n_reader_fields: *n_reader_fields,
1664            },
1665            SchemaPiece::ResolveEnum {
1666                doc,
1667                symbols,
1668                default,
1669            } => SchemaPiece::ResolveEnum {
1670                doc: doc.clone(),
1671                symbols: symbols.clone(),
1672                default: default.clone(),
1673            },
1674        }
1675    }
1676    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1677        match piece {
1678            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1679            SchemaPieceRefOrNamed::Named(index) => {
1680                let new_index = match self.old_to_new_names.entry(index) {
1681                    Entry::Vacant(ve) => {
1682                        let new_index = self.named.len();
1683                        self.named.push(None);
1684                        ve.insert(new_index);
1685                        let old_named_piece = self.old_root.lookup(index);
1686                        let new_named_piece = NamedSchemaPiece {
1687                            name: old_named_piece.name.clone(),
1688                            piece: self.clone_piece(&old_named_piece.piece),
1689                        };
1690                        self.named[new_index] = Some(new_named_piece);
1691                        new_index
1692                    }
1693                    Entry::Occupied(oe) => *oe.get(),
1694                };
1695                SchemaPieceOrNamed::Named(new_index)
1696            }
1697        }
1698    }
1699}
1700
1701impl<'a> SchemaNode<'a> {
1702    #[inline(always)]
1703    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1704        let (inner, name) = next.get_piece_and_name(self.root);
1705        Self {
1706            root: self.root,
1707            inner,
1708            name,
1709        }
1710    }
1711
1712    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1713        use serde_json::Value::*;
1714        let val = match (json, self.inner) {
1715            // A default value always matches the first variant of a union
1716            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1717                Some(variant) => AvroValue::Union {
1718                    index: 0,
1719                    inner: Box::new(self.step(variant).json_to_value(json)?),
1720                    n_variants: us.schemas.len(),
1721                    null_variant: us
1722                        .schemas
1723                        .iter()
1724                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1725                },
1726                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1727            },
1728            (Null, SchemaPiece::Null) => AvroValue::Null,
1729            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1730            (Number(n), piece) => {
1731                match piece {
1732                    piece if piece.is_underlying_int() => {
1733                        let i =
1734                            n.as_i64()
1735                                .and_then(|i| i32::try_from(i).ok())
1736                                .ok_or_else(|| {
1737                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1738                                })?;
1739                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1740                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1741                        })?
1742                    }
1743                    piece if piece.is_underlying_long() => {
1744                        let i = n.as_i64().ok_or_else(|| {
1745                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1746                        })?;
1747                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1748                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1749                        })?
1750                    }
1751                    SchemaPiece::Float => {
1752                        let f = n.as_f64().ok_or_else(|| {
1753                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1754                        })?;
1755                        AvroValue::Float(f as f32)
1756                    }
1757                    SchemaPiece::Double => {
1758                        let f = n.as_f64().ok_or_else(|| {
1759                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1760                        })?;
1761                        AvroValue::Double(f)
1762                    }
1763                    _ => {
1764                        return Err(ParseSchemaError(format!(
1765                            "Unexpected number in default: {}",
1766                            n
1767                        )));
1768                    }
1769                }
1770            }
1771            (String(s), piece)
1772                if s.eq_ignore_ascii_case("nan")
1773                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1774            {
1775                match piece {
1776                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1777                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1778                    _ => unreachable!(),
1779                }
1780            }
1781            (String(s), piece)
1782                if s.eq_ignore_ascii_case("infinity")
1783                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1784            {
1785                match piece {
1786                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1787                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1788                    _ => unreachable!(),
1789                }
1790            }
1791            (String(s), piece)
1792                if s.eq_ignore_ascii_case("-infinity")
1793                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1794            {
1795                match piece {
1796                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1797                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1798                    _ => unreachable!(),
1799                }
1800            }
1801            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1802            (
1803                String(s),
1804                SchemaPiece::Decimal {
1805                    precision, scale, ..
1806                },
1807            ) => AvroValue::Decimal(DecimalValue {
1808                precision: *precision,
1809                scale: *scale,
1810                unscaled: s.clone().into_bytes(),
1811            }),
1812            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1813            (Object(map), SchemaPiece::Record { fields, .. }) => {
1814                let field_values = fields
1815                    .iter()
1816                    .map(|rf| {
1817                        let jval = map.get(&rf.name).ok_or_else(|| {
1818                            ParseSchemaError(format!(
1819                                "Field not found in default value: {}",
1820                                rf.name
1821                            ))
1822                        })?;
1823                        let value = self.step(&rf.schema).json_to_value(jval)?;
1824                        Ok((rf.name.clone(), value))
1825                    })
1826                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1827                AvroValue::Record(field_values)
1828            }
1829            (String(s), SchemaPiece::Enum { symbols, .. }) => {
1830                match symbols.iter().find_position(|sym| s == *sym) {
1831                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1832                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1833                }
1834            }
1835            (Array(vals), SchemaPiece::Array(inner)) => {
1836                let node = self.step(&**inner);
1837                let vals = vals
1838                    .iter()
1839                    .map(|val| node.json_to_value(val))
1840                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1841                AvroValue::Array(vals)
1842            }
1843            (Object(map), SchemaPiece::Map(inner)) => {
1844                let node = self.step(&**inner);
1845                let map = map
1846                    .iter()
1847                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1848                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1849                AvroValue::Map(map)
1850            }
1851            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1852                AvroValue::Fixed(*size, s.clone().into_bytes())
1853            }
1854            _ => {
1855                return Err(ParseSchemaError(format!(
1856                    "Json default value {} does not match schema",
1857                    json
1858                )));
1859            }
1860        };
1861        Ok(val)
1862    }
1863}
1864
1865#[derive(Clone)]
1866struct SchemaSerContext<'a> {
1867    node: SchemaNodeOrNamed<'a>,
1868    // This does not logically need Rc<RefCell<_>> semantics --
1869    // it is only ever mutated in one stack frame at a time.
1870    // But AFAICT serde doesn't expose a way to
1871    // provide some mutable context to every node in the tree...
1872    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1873    /// The namespace of this node's parent, or "" by default
1874    enclosing_ns: &'a str,
1875}
1876
1877#[derive(Clone)]
1878struct RecordFieldSerContext<'a> {
1879    outer: &'a SchemaSerContext<'a>,
1880    inner: &'a RecordField,
1881}
1882
1883impl<'a> SchemaSerContext<'a> {
1884    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1885        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1886        Self {
1887            node: self.node.step_ref(next),
1888            seen_named: Rc::clone(&self.seen_named),
1889            enclosing_ns: ns,
1890        }
1891    }
1892}
1893
1894impl<'a> Serialize for SchemaSerContext<'a> {
1895    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1896    where
1897        S: Serializer,
1898    {
1899        match self.node.inner {
1900            SchemaPieceRefOrNamed::Piece(piece) => match piece {
1901                SchemaPiece::Null => serializer.serialize_str("null"),
1902                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
1903                SchemaPiece::Int => serializer.serialize_str("int"),
1904                SchemaPiece::Long => serializer.serialize_str("long"),
1905                SchemaPiece::Float => serializer.serialize_str("float"),
1906                SchemaPiece::Double => serializer.serialize_str("double"),
1907                SchemaPiece::Date => {
1908                    let mut map = serializer.serialize_map(Some(2))?;
1909                    map.serialize_entry("type", "int")?;
1910                    map.serialize_entry("logicalType", "date")?;
1911                    map.end()
1912                }
1913                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
1914                    let mut map = serializer.serialize_map(Some(2))?;
1915                    map.serialize_entry("type", "long")?;
1916                    if piece == &SchemaPiece::TimestampMilli {
1917                        map.serialize_entry("logicalType", "timestamp-millis")?;
1918                    } else {
1919                        map.serialize_entry("logicalType", "timestamp-micros")?;
1920                    }
1921                    map.end()
1922                }
1923                SchemaPiece::Decimal {
1924                    precision,
1925                    scale,
1926                    fixed_size: None,
1927                } => {
1928                    let mut map = serializer.serialize_map(Some(4))?;
1929                    map.serialize_entry("type", "bytes")?;
1930                    map.serialize_entry("precision", precision)?;
1931                    map.serialize_entry("scale", scale)?;
1932                    map.serialize_entry("logicalType", "decimal")?;
1933                    map.end()
1934                }
1935                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
1936                SchemaPiece::String => serializer.serialize_str("string"),
1937                SchemaPiece::Array(inner) => {
1938                    let mut map = serializer.serialize_map(Some(2))?;
1939                    map.serialize_entry("type", "array")?;
1940                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
1941                    map.end()
1942                }
1943                SchemaPiece::Map(inner) => {
1944                    let mut map = serializer.serialize_map(Some(2))?;
1945                    map.serialize_entry("type", "map")?;
1946                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
1947                    map.end()
1948                }
1949                SchemaPiece::Union(inner) => {
1950                    let variants = inner.variants();
1951                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
1952                    for v in variants {
1953                        seq.serialize_element(&self.step(v.as_ref()))?;
1954                    }
1955                    seq.end()
1956                }
1957                SchemaPiece::Json => {
1958                    let mut map = serializer.serialize_map(Some(2))?;
1959                    map.serialize_entry("type", "string")?;
1960                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
1961                    map.end()
1962                }
1963                SchemaPiece::Uuid => {
1964                    let mut map = serializer.serialize_map(Some(4))?;
1965                    map.serialize_entry("type", "string")?;
1966                    map.serialize_entry("logicalType", "uuid")?;
1967                    map.end()
1968                }
1969                SchemaPiece::Record { .. }
1970                | SchemaPiece::Decimal {
1971                    fixed_size: Some(_),
1972                    ..
1973                }
1974                | SchemaPiece::Enum { .. }
1975                | SchemaPiece::Fixed { .. } => {
1976                    unreachable!("Unexpected named schema piece in anonymous schema position")
1977                }
1978                SchemaPiece::ResolveIntLong
1979                | SchemaPiece::ResolveDateTimestamp
1980                | SchemaPiece::ResolveIntFloat
1981                | SchemaPiece::ResolveIntDouble
1982                | SchemaPiece::ResolveLongFloat
1983                | SchemaPiece::ResolveLongDouble
1984                | SchemaPiece::ResolveFloatDouble
1985                | SchemaPiece::ResolveConcreteUnion { .. }
1986                | SchemaPiece::ResolveUnionUnion { .. }
1987                | SchemaPiece::ResolveUnionConcrete { .. }
1988                | SchemaPiece::ResolveRecord { .. }
1989                | SchemaPiece::ResolveIntTsMicro
1990                | SchemaPiece::ResolveIntTsMilli
1991                | SchemaPiece::ResolveEnum { .. } => {
1992                    panic!("Attempted to serialize resolved schema")
1993                }
1994            },
1995            SchemaPieceRefOrNamed::Named(index) => {
1996                let mut map = self.seen_named.borrow_mut();
1997                let named_piece = match map.get(&index) {
1998                    Some(name) => {
1999                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2000                    }
2001                    None => self.node.root.lookup(index),
2002                };
2003                let name = &named_piece.name;
2004                map.insert(index, name.clone());
2005                std::mem::drop(map);
2006                match &named_piece.piece {
2007                    SchemaPiece::Record { doc, fields, .. } => {
2008                        let mut map = serializer.serialize_map(None)?;
2009                        map.serialize_entry("type", "record")?;
2010                        map.serialize_entry("name", &name.name)?;
2011                        if self.enclosing_ns != &name.namespace {
2012                            map.serialize_entry("namespace", &name.namespace)?;
2013                        }
2014                        if let Some(docstr) = doc {
2015                            map.serialize_entry("doc", docstr)?;
2016                        }
2017                        // TODO (brennan) - serialize aliases
2018                        map.serialize_entry(
2019                            "fields",
2020                            &fields
2021                                .iter()
2022                                .map(|f| RecordFieldSerContext {
2023                                    outer: self,
2024                                    inner: f,
2025                                })
2026                                .collect::<Vec<_>>(),
2027                        )?;
2028                        map.end()
2029                    }
2030                    SchemaPiece::Enum {
2031                        symbols,
2032                        default_idx,
2033                        ..
2034                    } => {
2035                        let mut map = serializer.serialize_map(None)?;
2036                        map.serialize_entry("type", "enum")?;
2037                        map.serialize_entry("name", &name.name)?;
2038                        if self.enclosing_ns != &name.namespace {
2039                            map.serialize_entry("namespace", &name.namespace)?;
2040                        }
2041                        map.serialize_entry("symbols", symbols)?;
2042                        if let Some(default_idx) = *default_idx {
2043                            assert!(default_idx < symbols.len());
2044                            map.serialize_entry("default", &symbols[default_idx])?;
2045                        }
2046                        map.end()
2047                    }
2048                    SchemaPiece::Fixed { size } => {
2049                        let mut map = serializer.serialize_map(None)?;
2050                        map.serialize_entry("type", "fixed")?;
2051                        map.serialize_entry("name", &name.name)?;
2052                        if self.enclosing_ns != &name.namespace {
2053                            map.serialize_entry("namespace", &name.namespace)?;
2054                        }
2055                        map.serialize_entry("size", size)?;
2056                        map.end()
2057                    }
2058                    SchemaPiece::Decimal {
2059                        scale,
2060                        precision,
2061                        fixed_size: Some(size),
2062                    } => {
2063                        let mut map = serializer.serialize_map(Some(6))?;
2064                        map.serialize_entry("type", "fixed")?;
2065                        map.serialize_entry("logicalType", "decimal")?;
2066                        map.serialize_entry("name", &name.name)?;
2067                        if self.enclosing_ns != &name.namespace {
2068                            map.serialize_entry("namespace", &name.namespace)?;
2069                        }
2070                        map.serialize_entry("size", size)?;
2071                        map.serialize_entry("precision", precision)?;
2072                        map.serialize_entry("scale", scale)?;
2073                        map.end()
2074                    }
2075                    SchemaPiece::Null
2076                    | SchemaPiece::Boolean
2077                    | SchemaPiece::Int
2078                    | SchemaPiece::Long
2079                    | SchemaPiece::Float
2080                    | SchemaPiece::Double
2081                    | SchemaPiece::Date
2082                    | SchemaPiece::TimestampMilli
2083                    | SchemaPiece::TimestampMicro
2084                    | SchemaPiece::Decimal {
2085                        fixed_size: None, ..
2086                    }
2087                    | SchemaPiece::Bytes
2088                    | SchemaPiece::String
2089                    | SchemaPiece::Array(_)
2090                    | SchemaPiece::Map(_)
2091                    | SchemaPiece::Union(_)
2092                    | SchemaPiece::Uuid
2093                    | SchemaPiece::Json => {
2094                        unreachable!("Unexpected anonymous schema piece in named schema position")
2095                    }
2096                    SchemaPiece::ResolveIntLong
2097                    | SchemaPiece::ResolveDateTimestamp
2098                    | SchemaPiece::ResolveIntFloat
2099                    | SchemaPiece::ResolveIntDouble
2100                    | SchemaPiece::ResolveLongFloat
2101                    | SchemaPiece::ResolveLongDouble
2102                    | SchemaPiece::ResolveFloatDouble
2103                    | SchemaPiece::ResolveConcreteUnion { .. }
2104                    | SchemaPiece::ResolveUnionUnion { .. }
2105                    | SchemaPiece::ResolveUnionConcrete { .. }
2106                    | SchemaPiece::ResolveRecord { .. }
2107                    | SchemaPiece::ResolveIntTsMilli
2108                    | SchemaPiece::ResolveIntTsMicro
2109                    | SchemaPiece::ResolveEnum { .. } => {
2110                        panic!("Attempted to serialize resolved schema")
2111                    }
2112                }
2113            }
2114        }
2115    }
2116}
2117
2118impl Serialize for Schema {
2119    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2120    where
2121        S: Serializer,
2122    {
2123        let ctx = SchemaSerContext {
2124            node: SchemaNodeOrNamed {
2125                root: self,
2126                inner: self.top.as_ref(),
2127            },
2128            seen_named: Rc::new(RefCell::new(Default::default())),
2129            enclosing_ns: "",
2130        };
2131        ctx.serialize(serializer)
2132    }
2133}
2134
2135impl<'a> Serialize for RecordFieldSerContext<'a> {
2136    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2137    where
2138        S: Serializer,
2139    {
2140        let mut map = serializer.serialize_map(None)?;
2141        map.serialize_entry("name", &self.inner.name)?;
2142        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2143        if let Some(default) = &self.inner.default {
2144            map.serialize_entry("default", default)?;
2145        }
2146        if let Some(doc) = &self.inner.doc {
2147            map.serialize_entry("doc", doc)?;
2148        }
2149        map.end()
2150    }
2151}
2152
2153/// Parses a **valid** avro schema into the Parsing Canonical Form.
2154/// <https://avro.apache.org/docs/++version++/specification#parsing-canonical-form-for-schemas>
2155fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2156    pcf(schema, "", false)
2157}
2158
2159fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2160    match schema {
2161        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2162        serde_json::Value::String(s) => pcf_string(s),
2163        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2164        serde_json::Value::Number(n) => n.to_string(),
2165        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2166    }
2167}
2168
2169fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2170    // Look for the namespace variant up front.
2171    let default_ns = schema
2172        .get("namespace")
2173        .and_then(|v| v.as_str())
2174        .unwrap_or(enclosing_ns);
2175    let mut fields = Vec::new();
2176    let mut found_next_ns = None;
2177    let mut deferred_values = vec![];
2178    for (k, v) in schema {
2179        // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
2180        if schema.len() == 1 && k == "type" {
2181            // Invariant: function is only callable from a valid schema, so this is acceptable.
2182            if let serde_json::Value::String(s) = v {
2183                return pcf_string(s);
2184            }
2185        }
2186
2187        // Strip out unused fields ([STRIP] rule)
2188        if field_ordering_position(k).is_none() {
2189            continue;
2190        }
2191
2192        // Fully qualify the name, if it isn't already ([FULLNAMES] rule).
2193        if k == "name" {
2194            // The `fields` stanza needs special handling, as it has "name"
2195            // fields that don't get canonicalized (since they are not type names).
2196            if in_fields {
2197                fields.push((
2198                    k,
2199                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2200                ));
2201                continue;
2202            }
2203            // Invariant: Only valid schemas. Must be a string.
2204            let name = v.as_str().unwrap();
2205            assert!(
2206                found_next_ns.is_none(),
2207                "`name` must not be specified multiple times"
2208            );
2209            let next_ns = match name.rsplit_once('.') {
2210                None => default_ns,
2211                Some((ns, _name)) => ns,
2212            };
2213            found_next_ns = Some(next_ns);
2214            let n = if next_ns.is_empty() {
2215                Cow::Borrowed(name)
2216            } else {
2217                Cow::Owned(format!("{next_ns}.{name}"))
2218            };
2219            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2220            continue;
2221        }
2222
2223        // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
2224        if k == "size" {
2225            let i = match v.as_str() {
2226                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2227                None => v.as_i64().unwrap(),
2228            };
2229            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2230            continue;
2231        }
2232
2233        // For anything else, recursively process the result
2234        // (deferred until we know the namespace)
2235        deferred_values.push((k, v));
2236    }
2237
2238    let next_ns = found_next_ns.unwrap_or(default_ns);
2239    for (k, v) in deferred_values {
2240        fields.push((
2241            k,
2242            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2243        ));
2244    }
2245
2246    // Sort the fields by their canonical ordering ([ORDER] rule).
2247    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2248    let inter = fields
2249        .into_iter()
2250        .map(|(_, v)| v)
2251        .collect::<Vec<_>>()
2252        .join(",");
2253    format!("{{{}}}", inter)
2254}
2255
2256fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2257    let inter = arr
2258        .iter()
2259        .map(|s| pcf(s, enclosing_ns, in_fields))
2260        .collect::<Vec<String>>()
2261        .join(",");
2262    format!("[{}]", inter)
2263}
2264
2265fn pcf_string(s: &str) -> String {
2266    format!("\"{}\"", s)
2267}
2268
2269// Used to define the ordering and inclusion of fields.
2270fn field_ordering_position(field: &str) -> Option<usize> {
2271    let v = match field {
2272        "name" => 1,
2273        "type" => 2,
2274        "fields" => 3,
2275        "symbols" => 4,
2276        "items" => 5,
2277        "values" => 6,
2278        "size" => 7,
2279        _ => return None,
2280    };
2281
2282    Some(v)
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287    use mz_ore::{assert_err, assert_ok};
2288
2289    use crate::types::{Record, ToAvro};
2290
2291    use super::*;
2292
2293    fn check_schema(schema: &str, expected: SchemaPiece) {
2294        let schema = Schema::from_str(schema).unwrap();
2295        assert_eq!(&expected, schema.top_node().inner);
2296
2297        // Test serialization round trip
2298        let schema = serde_json::to_string(&schema).unwrap();
2299        let schema = Schema::from_str(&schema).unwrap();
2300        assert_eq!(&expected, schema.top_node().inner);
2301    }
2302
2303    #[mz_ore::test]
2304    fn test_primitive_schema() {
2305        check_schema("\"null\"", SchemaPiece::Null);
2306        check_schema("\"int\"", SchemaPiece::Int);
2307        check_schema("\"double\"", SchemaPiece::Double);
2308    }
2309
2310    #[mz_ore::test]
2311    fn test_array_schema() {
2312        check_schema(
2313            r#"{"type": "array", "items": "string"}"#,
2314            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2315        );
2316    }
2317
2318    #[mz_ore::test]
2319    fn test_map_schema() {
2320        check_schema(
2321            r#"{"type": "map", "values": "double"}"#,
2322            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2323        );
2324    }
2325
2326    #[mz_ore::test]
2327    fn test_union_schema() {
2328        check_schema(
2329            r#"["null", "int"]"#,
2330            SchemaPiece::Union(
2331                UnionSchema::new(vec![
2332                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2333                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2334                ])
2335                .unwrap(),
2336            ),
2337        );
2338    }
2339
2340    #[mz_ore::test]
2341    fn test_multi_union_schema() {
2342        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2343        assert_ok!(schema);
2344        let schema = schema.unwrap();
2345        let node = schema.top_node();
2346        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2347        let union_schema = match node.inner {
2348            SchemaPiece::Union(u) => u,
2349            _ => unreachable!(),
2350        };
2351        assert_eq!(union_schema.variants().len(), 5);
2352        let mut variants = union_schema.variants().iter();
2353        assert_eq!(
2354            SchemaKind::from(node.step(variants.next().unwrap())),
2355            SchemaKind::Null
2356        );
2357        assert_eq!(
2358            SchemaKind::from(node.step(variants.next().unwrap())),
2359            SchemaKind::Int
2360        );
2361        assert_eq!(
2362            SchemaKind::from(node.step(variants.next().unwrap())),
2363            SchemaKind::Float
2364        );
2365        assert_eq!(
2366            SchemaKind::from(node.step(variants.next().unwrap())),
2367            SchemaKind::String
2368        );
2369        assert_eq!(
2370            SchemaKind::from(node.step(variants.next().unwrap())),
2371            SchemaKind::Bytes
2372        );
2373        assert_eq!(variants.next(), None);
2374    }
2375
2376    #[mz_ore::test]
2377    fn test_record_schema() {
2378        let schema = r#"
2379                {
2380                    "type": "record",
2381                    "name": "test",
2382                    "doc": "record doc",
2383                    "fields": [
2384                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2385                        {"name": "b", "doc": "b doc", "type": "string"}
2386                    ]
2387                }
2388            "#;
2389
2390        let mut lookup = BTreeMap::new();
2391        lookup.insert("a".to_owned(), 0);
2392        lookup.insert("b".to_owned(), 1);
2393
2394        let expected = SchemaPiece::Record {
2395            doc: Some("record doc".to_string()),
2396            fields: vec![
2397                RecordField {
2398                    name: "a".to_string(),
2399                    doc: Some("a doc".to_string()),
2400                    default: Some(Value::Number(42i64.into())),
2401                    schema: SchemaPiece::Long.into(),
2402                    order: RecordFieldOrder::Ascending,
2403                    position: 0,
2404                },
2405                RecordField {
2406                    name: "b".to_string(),
2407                    doc: Some("b doc".to_string()),
2408                    default: None,
2409                    schema: SchemaPiece::String.into(),
2410                    order: RecordFieldOrder::Ascending,
2411                    position: 1,
2412                },
2413            ],
2414            lookup,
2415        };
2416
2417        check_schema(schema, expected);
2418    }
2419
2420    #[mz_ore::test]
2421    fn test_enum_schema() {
2422        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2423
2424        let expected = SchemaPiece::Enum {
2425            doc: None,
2426            symbols: vec![
2427                "diamonds".to_owned(),
2428                "spades".to_owned(),
2429                "jokers".to_owned(),
2430                "clubs".to_owned(),
2431                "hearts".to_owned(),
2432            ],
2433            default_idx: Some(2),
2434        };
2435
2436        check_schema(schema, expected);
2437
2438        let bad_schema = Schema::from_str(
2439            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2440        );
2441
2442        assert_err!(bad_schema);
2443    }
2444
2445    #[mz_ore::test]
2446    fn test_fixed_schema() {
2447        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2448
2449        let expected = SchemaPiece::Fixed { size: 16usize };
2450
2451        check_schema(schema, expected);
2452    }
2453
2454    #[mz_ore::test]
2455    fn test_date_schema() {
2456        let kinds = &[
2457            r#"{
2458                    "type": "int",
2459                    "name": "datish",
2460                    "logicalType": "date"
2461                }"#,
2462            r#"{
2463                    "type": "int",
2464                    "name": "datish",
2465                    "connect.name": "io.debezium.time.Date"
2466                }"#,
2467            r#"{
2468                    "type": "int",
2469                    "name": "datish",
2470                    "connect.name": "org.apache.kafka.connect.data.Date"
2471                }"#,
2472        ];
2473        for kind in kinds {
2474            check_schema(*kind, SchemaPiece::Date);
2475
2476            let schema = Schema::from_str(*kind).unwrap();
2477            assert_eq!(
2478                serde_json::to_string(&schema).unwrap(),
2479                r#"{"type":"int","logicalType":"date"}"#
2480            );
2481        }
2482    }
2483
2484    #[mz_ore::test]
2485    fn new_field_in_middle() {
2486        let reader = r#"{
2487            "type": "record",
2488            "name": "MyRecord",
2489            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2490        }"#;
2491        let writer = r#"{
2492            "type": "record",
2493            "name": "MyRecord",
2494            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2495        }"#;
2496        let reader = Schema::from_str(reader).unwrap();
2497        let writer = Schema::from_str(writer).unwrap();
2498
2499        let mut record = Record::new(writer.top_node()).unwrap();
2500        record.put("f1", 1);
2501        record.put("f2", 2);
2502        record.put("f_interposed", 42);
2503
2504        let value = record.avro();
2505
2506        let mut buf = vec![];
2507        crate::encode::encode(&value, &writer, &mut buf);
2508
2509        let resolved = resolve_schemas(&writer, &reader).unwrap();
2510
2511        let reader = &mut &buf[..];
2512        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2513        let expected = crate::types::Value::Record(vec![
2514            ("f1".to_string(), crate::types::Value::Int(1)),
2515            ("f2".to_string(), crate::types::Value::Int(2)),
2516        ]);
2517        assert_eq!(reader_value, expected);
2518        assert!(reader.is_empty()); // all bytes should have been consumed
2519    }
2520
2521    #[mz_ore::test]
2522    fn new_field_at_end() {
2523        let reader = r#"{
2524            "type": "record",
2525            "name": "MyRecord",
2526            "fields": [{"name": "f1", "type": "int"}]
2527        }"#;
2528        let writer = r#"{
2529            "type": "record",
2530            "name": "MyRecord",
2531            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2532        }"#;
2533        let reader = Schema::from_str(reader).unwrap();
2534        let writer = Schema::from_str(writer).unwrap();
2535
2536        let mut record = Record::new(writer.top_node()).unwrap();
2537        record.put("f1", 1);
2538        record.put("f2", 2);
2539
2540        let value = record.avro();
2541
2542        let mut buf = vec![];
2543        crate::encode::encode(&value, &writer, &mut buf);
2544
2545        let resolved = resolve_schemas(&writer, &reader).unwrap();
2546
2547        let reader = &mut &buf[..];
2548        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2549        let expected =
2550            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2551        assert_eq!(reader_value, expected);
2552        assert!(reader.is_empty()); // all bytes should have been consumed
2553    }
2554
2555    #[mz_ore::test]
2556    fn default_non_nums() {
2557        let reader = r#"{
2558            "type": "record",
2559            "name": "MyRecord",
2560            "fields": [
2561                {"name": "f1", "type": "double", "default": "NaN"},
2562                {"name": "f2", "type": "double", "default": "Infinity"},
2563                {"name": "f3", "type": "double", "default": "-Infinity"}
2564            ]
2565        }
2566        "#;
2567        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2568
2569        let writer_schema = Schema::from_str(writer).unwrap();
2570        let reader_schema = Schema::from_str(reader).unwrap();
2571        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2572
2573        let record = Record::new(writer_schema.top_node()).unwrap();
2574
2575        let value = record.avro();
2576        let mut buf = vec![];
2577        crate::encode::encode(&value, &writer_schema, &mut buf);
2578
2579        let reader = &mut &buf[..];
2580        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2581        let expected = crate::types::Value::Record(vec![
2582            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2583            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2584            (
2585                "f3".to_string(),
2586                crate::types::Value::Double(f64::NEG_INFINITY),
2587            ),
2588        ]);
2589
2590        #[derive(Debug)]
2591        struct NanEq(crate::types::Value);
2592        impl std::cmp::PartialEq for NanEq {
2593            fn eq(&self, other: &Self) -> bool {
2594                match (self, other) {
2595                    (
2596                        NanEq(crate::types::Value::Double(x)),
2597                        NanEq(crate::types::Value::Double(y)),
2598                    ) if x.is_nan() && y.is_nan() => true,
2599                    (
2600                        NanEq(crate::types::Value::Float(x)),
2601                        NanEq(crate::types::Value::Float(y)),
2602                    ) if x.is_nan() && y.is_nan() => true,
2603                    (
2604                        NanEq(crate::types::Value::Record(xs)),
2605                        NanEq(crate::types::Value::Record(ys)),
2606                    ) => {
2607                        let xs = xs
2608                            .iter()
2609                            .cloned()
2610                            .map(|(k, v)| (k, NanEq(v)))
2611                            .collect::<Vec<_>>();
2612                        let ys = ys
2613                            .iter()
2614                            .cloned()
2615                            .map(|(k, v)| (k, NanEq(v)))
2616                            .collect::<Vec<_>>();
2617
2618                        xs == ys
2619                    }
2620                    (NanEq(x), NanEq(y)) => x == y,
2621                }
2622            }
2623        }
2624
2625        assert_eq!(NanEq(reader_value), NanEq(expected));
2626        assert!(reader.is_empty());
2627    }
2628
2629    #[mz_ore::test]
2630    fn test_decimal_schemas() {
2631        let schema = r#"{
2632                "type": "fixed",
2633                "name": "dec",
2634                "size": 8,
2635                "logicalType": "decimal",
2636                "precision": 12,
2637                "scale": 5
2638            }"#;
2639        let expected = SchemaPiece::Decimal {
2640            precision: 12,
2641            scale: 5,
2642            fixed_size: Some(8),
2643        };
2644        check_schema(schema, expected);
2645
2646        let schema = r#"{
2647                "type": "bytes",
2648                "logicalType": "decimal",
2649                "precision": 12,
2650                "scale": 5
2651            }"#;
2652        let expected = SchemaPiece::Decimal {
2653            precision: 12,
2654            scale: 5,
2655            fixed_size: None,
2656        };
2657        check_schema(schema, expected);
2658
2659        let res = Schema::from_str(
2660            r#"["bytes", {
2661                "type": "bytes",
2662                "logicalType": "decimal",
2663                "precision": 12,
2664                "scale": 5
2665            }]"#,
2666        );
2667        assert_eq!(
2668            res.unwrap_err().to_string(),
2669            "Schema parse error: Unions cannot contain duplicate types"
2670        );
2671
2672        let writer_schema = Schema::from_str(
2673            r#"["null", {
2674                "type": "bytes"
2675            }]"#,
2676        )
2677        .unwrap();
2678        let reader_schema = Schema::from_str(
2679            r#"["null", {
2680                "type": "bytes",
2681                "logicalType": "decimal",
2682                "precision": 12,
2683                "scale": 5
2684            }]"#,
2685        )
2686        .unwrap();
2687        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2688
2689        let expected = SchemaPiece::ResolveUnionUnion {
2690            permutation: vec![
2691                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2692                Ok((
2693                    1,
2694                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2695                        precision: 12,
2696                        scale: 5,
2697                        fixed_size: None,
2698                    }),
2699                )),
2700            ],
2701            n_reader_variants: 2,
2702            reader_null_variant: Some(0),
2703        };
2704        assert_eq!(resolved.top_node().inner, &expected);
2705    }
2706
2707    #[mz_ore::test]
2708    fn test_no_documentation() {
2709        let schema =
2710            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2711                .unwrap();
2712
2713        let doc = match schema.top_node().inner {
2714            SchemaPiece::Enum { doc, .. } => doc.clone(),
2715            _ => panic!(),
2716        };
2717
2718        assert_none!(doc);
2719    }
2720
2721    #[mz_ore::test]
2722    fn test_documentation() {
2723        let schema = Schema::from_str(
2724                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2725            ).unwrap();
2726
2727        let doc = match schema.top_node().inner {
2728            SchemaPiece::Enum { doc, .. } => doc.clone(),
2729            _ => None,
2730        };
2731
2732        assert_eq!("Some documentation".to_owned(), doc.unwrap());
2733    }
2734
2735    #[mz_ore::test]
2736    fn test_namespaces_and_names() {
2737        // When name and namespace specified, full name should contain both.
2738        let schema = Schema::from_str(
2739            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2740        )
2741        .unwrap();
2742        assert_eq!(schema.named.len(), 1);
2743        assert_eq!(
2744            schema.named[0].name,
2745            FullName {
2746                name: "name".into(),
2747                namespace: "namespace".into()
2748            }
2749        );
2750
2751        // When name contains dots, parse the dot-separated name as the namespace.
2752        let schema =
2753            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2754                .unwrap();
2755        assert_eq!(schema.named.len(), 1);
2756        assert_eq!(
2757            schema.named[0].name,
2758            FullName {
2759                name: "dots".into(),
2760                namespace: "name.has".into()
2761            }
2762        );
2763
2764        // Same as above, ignore any provided namespace.
2765        let schema = Schema::from_str(
2766            r#"{"type": "enum", "namespace": "namespace",
2767            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2768        )
2769        .unwrap();
2770        assert_eq!(schema.named.len(), 1);
2771        assert_eq!(
2772            schema.named[0].name,
2773            FullName {
2774                name: "dots".into(),
2775                namespace: "name.has".into()
2776            }
2777        );
2778
2779        // Use default namespace when namespace is not provided.
2780        // Materialize uses "" as the default namespace.
2781        let schema = Schema::from_str(
2782            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2783            "fields": [{"name": "name", "type": "string"}]}"#,
2784        )
2785        .unwrap();
2786        assert_eq!(schema.named.len(), 1);
2787        assert_eq!(
2788            schema.named[0].name,
2789            FullName {
2790                name: "TestDoc".into(),
2791                namespace: "".into()
2792            }
2793        );
2794
2795        // Empty namespace strings should be allowed.
2796        let schema = Schema::from_str(
2797            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2798            "fields": [{"name": "name", "type": "string"}]}"#,
2799        )
2800        .unwrap();
2801        assert_eq!(schema.named.len(), 1);
2802        assert_eq!(
2803            schema.named[0].name,
2804            FullName {
2805                name: "TestDoc".into(),
2806                namespace: "".into()
2807            }
2808        );
2809
2810        // Equality of names is defined on the FullName and is case-sensitive.
2811        let first = Schema::from_str(
2812            r#"{"type": "fixed", "namespace": "namespace",
2813            "name": "name", "size": 1}"#,
2814        )
2815        .unwrap();
2816        let second = Schema::from_str(
2817            r#"{"type": "fixed", "name": "namespace.name",
2818            "size": 1}"#,
2819        )
2820        .unwrap();
2821        assert_eq!(first.named[0].name, second.named[0].name);
2822
2823        let first = Schema::from_str(
2824            r#"{"type": "fixed", "namespace": "namespace",
2825            "name": "name", "size": 1}"#,
2826        )
2827        .unwrap();
2828        let second = Schema::from_str(
2829            r#"{"type": "fixed", "name": "namespace.Name",
2830            "size": 1}"#,
2831        )
2832        .unwrap();
2833        assert_ne!(first.named[0].name, second.named[0].name);
2834
2835        let first = Schema::from_str(
2836            r#"{"type": "fixed", "namespace": "Namespace",
2837            "name": "name", "size": 1}"#,
2838        )
2839        .unwrap();
2840        let second = Schema::from_str(
2841            r#"{"type": "fixed", "namespace": "namespace",
2842            "name": "name", "size": 1}"#,
2843        )
2844        .unwrap();
2845        assert_ne!(first.named[0].name, second.named[0].name);
2846
2847        // The name portion of a fullname, record field names, and enum symbols must:
2848        // start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
2849        assert!(
2850            Schema::from_str(
2851                r#"{"type": "record", "name": "99 problems but a name aint one",
2852            "fields": [{"name": "name", "type": "string"}]}"#
2853            )
2854            .is_err()
2855        );
2856
2857        assert!(
2858            Schema::from_str(
2859                r#"{"type": "record", "name": "!!!",
2860            "fields": [{"name": "name", "type": "string"}]}"#
2861            )
2862            .is_err()
2863        );
2864
2865        assert!(
2866            Schema::from_str(
2867                r#"{"type": "record", "name": "_valid_until_©",
2868            "fields": [{"name": "name", "type": "string"}]}"#
2869            )
2870            .is_err()
2871        );
2872
2873        // Use previously defined names and namespaces as type.
2874        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2875              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2876              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2877              {"name": "f3", "type": "MyEnum"},
2878              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2879              {"name": "f5", "type": "other.namespace.OtherEnum"},
2880              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2881              {"name": "f7", "type": "some.other.ThirdEnum"}
2882             ]}"#).unwrap();
2883        assert_eq!(schema.named.len(), 4);
2884
2885        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2886            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // f1
2887            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // f2
2888            assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); // f3
2889            assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); // f4
2890            assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); // f5
2891            assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); // f6
2892            assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); // f7
2893        } else {
2894            panic!("Expected SchemaPiece::Record, found something else");
2895        }
2896
2897        let schema = Schema::from_str(
2898            r#"{"type": "record", "name": "x.Y", "fields": [
2899              {"name": "e", "type":
2900                {"type": "record", "name": "Z", "fields": [
2901                  {"name": "f", "type": "x.Y"},
2902                  {"name": "g", "type": "x.Z"}
2903                ]}
2904              }
2905            ]}"#,
2906        )
2907        .unwrap();
2908        assert_eq!(schema.named.len(), 2);
2909
2910        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2911            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // e
2912        } else {
2913            panic!("Expected SchemaPiece::Record, found something else");
2914        }
2915
2916        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
2917            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); // f
2918            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); // g
2919        } else {
2920            panic!("Expected SchemaPiece::Record, found something else");
2921        }
2922
2923        let schema = Schema::from_str(
2924            r#"{"type": "record", "name": "R", "fields": [
2925              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
2926                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
2927                 "symbols": ["Foo", "Bar"]}
2928                }
2929              ]}},
2930              {"name": "t", "type": "Z"}
2931            ]}"#,
2932        )
2933        .unwrap();
2934        assert_eq!(schema.named.len(), 3);
2935
2936        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2937            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); // s
2938            assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); // t - refers to "".Z
2939        } else {
2940            panic!("Expected SchemaPiece::Record, found something else");
2941        }
2942    }
2943
2944    // Tests to ensure Schema is Send + Sync. These tests don't need to _do_ anything, if they can
2945    // compile, they pass.
2946    #[mz_ore::test]
2947    fn test_schema_is_send() {
2948        fn send<S: Send>(_s: S) {}
2949
2950        let schema = Schema {
2951            named: vec![],
2952            indices: Default::default(),
2953            top: SchemaPiece::Null.into(),
2954        };
2955        send(schema);
2956    }
2957
2958    #[mz_ore::test]
2959    fn test_schema_is_sync() {
2960        fn sync<S: Sync>(_s: S) {}
2961
2962        let schema = Schema {
2963            named: vec![],
2964            indices: Default::default(),
2965            top: SchemaPiece::Null.into(),
2966        };
2967        sync(&schema);
2968        sync(schema);
2969    }
2970
2971    #[mz_ore::test]
2972    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
2973    fn test_schema_fingerprint() {
2974        use sha2::Sha256;
2975
2976        let raw_schema = r#"
2977        {
2978            "type": "record",
2979            "name": "test",
2980            "fields": [
2981                {"name": "a", "type": "long", "default": 42},
2982                {"name": "b", "type": "string"}
2983            ]
2984        }
2985    "#;
2986        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
2987        let schema = Schema::from_str(raw_schema).unwrap();
2988        assert_eq!(&schema.canonical_form(), expected_canonical);
2989        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
2990        assert_eq!(
2991            format!("{}", schema.fingerprint::<Sha256>()),
2992            expected_fingerprint
2993        );
2994
2995        let raw_schema = r#"
2996{
2997  "type": "record",
2998  "name": "ns.r1",
2999  "namespace": "ignored",
3000  "fields": [
3001    {
3002      "name": "f1",
3003      "type": {
3004        "type": "fixed",
3005        "name": "r2",
3006        "size": 1
3007      }
3008    }
3009  ]
3010}
3011"#;
3012        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3013        let schema = Schema::from_str(raw_schema).unwrap();
3014        assert_eq!(&schema.canonical_form(), expected_canonical);
3015        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3016        assert_eq!(
3017            format!("{}", schema.fingerprint::<Sha256>()),
3018            expected_fingerprint
3019        );
3020    }
3021
3022    #[mz_ore::test]
3023    fn test_make_valid() {
3024        for (input, expected) in [
3025            ("foo", "foo"),
3026            ("az99", "az99"),
3027            ("99az", "_99az"),
3028            ("is,bad", "is_bad"),
3029            ("@#$%", "____"),
3030            ("i-amMisBehaved!", "i_amMisBehaved_"),
3031            ("", "_"),
3032        ] {
3033            let actual = Name::make_valid(input);
3034            assert_eq!(expected, actual, "Name::make_valid({input})")
3035        }
3036    }
3037}