1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51    writer_schema: &Schema,
52    reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54    let r_indices = reader_schema.indices.clone();
55    let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56        writer_schema
57            .indices
58            .iter()
59            .flat_map(|(name, widx)| {
60                r_indices
61                    .get(name)
62                    .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63            })
64            .unzip();
65    let reader_fullnames = reader_schema
66        .indices
67        .iter()
68        .map(|(f, i)| (*i, f))
69        .collect::<BTreeMap<_, _>>();
70    let mut resolver = SchemaResolver {
71        named: Default::default(),
72        indices: Default::default(),
73        human_readable_field_path: Vec::new(),
74        current_human_readable_path_start: 0,
75        writer_to_reader_names,
76        reader_to_writer_names,
77        reader_to_resolved_names: Default::default(),
78        reader_fullnames,
79        reader_schema,
80    };
81    let writer_node = writer_schema.top_node_or_named();
82    let reader_node = reader_schema.top_node_or_named();
83    let inner = resolver.resolve(writer_node, reader_node)?;
84    let sch = Schema {
85        named: resolver.named.into_iter().map(Option::unwrap).collect(),
86        indices: resolver.indices,
87        top: inner,
88    };
89    Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97    pub fn new<S>(msg: S) -> ParseSchemaError
98    where
99        S: Into<String>,
100    {
101        ParseSchemaError(msg.into())
102    }
103}
104
105impl fmt::Display for ParseSchemaError {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        self.0.fmt(f)
108    }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118    pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123        write!(
124            f,
125            "{}",
126            self.bytes
127                .iter()
128                .map(|byte| format!("{:02x}", byte))
129                .collect::<Vec<String>>()
130                .join("")
131        )
132    }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137    Piece(SchemaPiece),
138    Named(usize),
139}
140impl SchemaPieceOrNamed {
141    pub fn get_human_name(&self, root: &Schema) -> String {
142        match self {
143            Self::Piece(piece) => format!("{:?}", piece),
144            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145        }
146    }
147    #[inline(always)]
148    pub fn get_piece_and_name<'a>(
149        &'a self,
150        root: &'a Schema,
151    ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152        self.as_ref().get_piece_and_name(root)
153    }
154
155    #[inline(always)]
156    pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157        match self {
158            SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159            SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160        }
161    }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165    #[inline(always)]
166    fn from(piece: SchemaPiece) -> Self {
167        Self::Piece(piece)
168    }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173    Null,
175    Boolean,
177    Int,
179    Long,
181    Float,
183    Double,
185    Date,
187    TimestampMilli,
191    TimestampMicro,
195    Decimal {
201        precision: usize,
202        scale: usize,
203        fixed_size: Option<usize>,
204    },
205    Bytes,
208    String,
211    Json,
213    Uuid,
215    Array(Box<SchemaPieceOrNamed>),
218    Map(Box<SchemaPieceOrNamed>),
222    Union(UnionSchema),
224    ResolveIntTsMilli,
227    ResolveIntTsMicro,
230    ResolveDateTimestamp,
233    ResolveIntLong,
235    ResolveIntFloat,
237    ResolveIntDouble,
239    ResolveLongFloat,
241    ResolveLongDouble,
243    ResolveFloatDouble,
245    ResolveConcreteUnion {
248        index: usize,
250        inner: Box<SchemaPieceOrNamed>,
252        n_reader_variants: usize,
253        reader_null_variant: Option<usize>,
254    },
255    ResolveUnionUnion {
258        permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264        n_reader_variants: usize,
265        reader_null_variant: Option<usize>,
266    },
267    ResolveUnionConcrete {
269        index: usize,
270        inner: Box<SchemaPieceOrNamed>,
271    },
272    Record {
277        doc: Documentation,
278        fields: Vec<RecordField>,
279        lookup: BTreeMap<String, usize>,
280    },
281    Enum {
283        doc: Documentation,
284        symbols: Vec<String>,
285        default_idx: Option<usize>,
291    },
292    Fixed { size: usize },
294    ResolveRecord {
297        defaults: Vec<ResolvedDefaultValueField>,
300        fields: Vec<ResolvedRecordField>,
304        n_reader_fields: usize,
306    },
307    ResolveEnum {
310        doc: Documentation,
311        symbols: Vec<Result<(usize, String), String>>,
314        default: Option<(usize, String)>,
316    },
317}
318
319impl SchemaPiece {
320    pub fn is_underlying_int(&self) -> bool {
322        self.try_make_int_value(0).is_some()
323    }
324    pub fn is_underlying_long(&self) -> bool {
326        self.try_make_long_value(0).is_some()
327    }
328    pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331        match self {
332            SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333            SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336            _ => None,
337        }
338    }
339    pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342        match self {
343            SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344            SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345            SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346            _ => None,
347        }
348    }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356    pub(crate) named: Vec<NamedSchemaPiece>,
357    pub(crate) indices: BTreeMap<FullName, usize>,
358    pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362    fn to_string(&self) -> String {
363        let json = serde_json::to_value(self).unwrap();
364        json.to_string()
365    }
366}
367
368impl std::fmt::Debug for Schema {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        if f.alternate() {
371            f.write_str(
372                &serde_json::to_string_pretty(self)
373                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374            )
375        } else {
376            f.write_str(
377                &serde_json::to_string(self)
378                    .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379            )
380        }
381    }
382}
383
384impl Schema {
385    pub fn top_node(&self) -> SchemaNode<'_> {
386        let (inner, name) = self.top.get_piece_and_name(self);
387        SchemaNode {
388            root: self,
389            inner,
390            name,
391        }
392    }
393    pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394        SchemaNodeOrNamed {
395            root: self,
396            inner: self.top.as_ref(),
397        }
398    }
399    pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400        &self.named[idx]
401    }
402    pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403        self.indices.get(name).map(|&idx| &self.named[idx])
404    }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416    Null,
418    Boolean,
419    Int,
420    Long,
421    Float,
422    Double,
423    Bytes,
425    String,
426    Array,
427    Map,
428    Union,
429    Record,
430    Enum,
431    Fixed,
432    Unknown,
435}
436
437impl SchemaKind {
438    pub fn name(self) -> &'static str {
439        match self {
440            SchemaKind::Null => "null",
441            SchemaKind::Boolean => "boolean",
442            SchemaKind::Int => "int",
443            SchemaKind::Long => "long",
444            SchemaKind::Float => "float",
445            SchemaKind::Double => "double",
446            SchemaKind::Bytes => "bytes",
447            SchemaKind::String => "string",
448            SchemaKind::Array => "array",
449            SchemaKind::Map => "map",
450            SchemaKind::Union => "union",
451            SchemaKind::Record => "record",
452            SchemaKind::Enum => "enum",
453            SchemaKind::Fixed => "fixed",
454            SchemaKind::Unknown => "unknown",
455        }
456    }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460    #[inline(always)]
461    fn from(piece: &'a SchemaPiece) -> SchemaKind {
462        match piece {
463            SchemaPiece::Null => SchemaKind::Null,
464            SchemaPiece::Boolean => SchemaKind::Boolean,
465            SchemaPiece::Int => SchemaKind::Int,
466            SchemaPiece::Long => SchemaKind::Long,
467            SchemaPiece::Float => SchemaKind::Float,
468            SchemaPiece::Double => SchemaKind::Double,
469            SchemaPiece::Date => SchemaKind::Int,
470            SchemaPiece::TimestampMilli
471            | SchemaPiece::TimestampMicro
472            | SchemaPiece::ResolveIntTsMilli
473            | SchemaPiece::ResolveDateTimestamp
474            | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475            SchemaPiece::Decimal {
476                fixed_size: None, ..
477            } => SchemaKind::Bytes,
478            SchemaPiece::Decimal {
479                fixed_size: Some(_),
480                ..
481            } => SchemaKind::Fixed,
482            SchemaPiece::Bytes => SchemaKind::Bytes,
483            SchemaPiece::String => SchemaKind::String,
484            SchemaPiece::Array(_) => SchemaKind::Array,
485            SchemaPiece::Map(_) => SchemaKind::Map,
486            SchemaPiece::Union(_) => SchemaKind::Union,
487            SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488            SchemaPiece::ResolveIntLong => SchemaKind::Long,
489            SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490            SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491            SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492            SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493            SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494            SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495            SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496            SchemaPiece::Record { .. } => SchemaKind::Record,
497            SchemaPiece::Enum { .. } => SchemaKind::Enum,
498            SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499            SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500            SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501            SchemaPiece::Json => SchemaKind::String,
502            SchemaPiece::Uuid => SchemaKind::String,
503        }
504    }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508    #[inline(always)]
509    fn from(schema: SchemaNode<'a>) -> SchemaKind {
510        SchemaKind::from(schema.inner)
511    }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515    #[inline(always)]
516    fn from(schema: &'a Schema) -> SchemaKind {
517        Self::from(schema.top_node())
518    }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533    pub name: String,
534    pub namespace: Option<String>,
535    pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540    name: String,
541    namespace: String,
542}
543
544impl FullName {
545    pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547        if let Some(ns) = namespace {
548            FullName {
549                name: name.to_owned(),
550                namespace: ns.to_owned(),
551            }
552        } else {
553            let mut split = name.rsplitn(2, '.');
554            let name = split.next().unwrap();
555            let namespace = split.next().unwrap_or(default_namespace);
556
557            FullName {
558                name: name.into(),
559                namespace: namespace.into(),
560            }
561        }
562    }
563    pub fn base_name(&self) -> &str {
564        &self.name
565    }
566    pub fn human_name(&self) -> String {
567        if self.namespace.is_empty() {
568            return self.name.clone();
569        }
570        format!("{}.{}", self.namespace, self.name)
571    }
572    pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577        if enclosing_ns == &self.namespace {
578            Cow::Borrowed(&self.name)
579        } else {
580            Cow::Owned(format!("{}.{}", self.namespace, self.name))
581        }
582    }
583    pub fn namespace(&self) -> &str {
585        &self.namespace
586    }
587}
588
589impl fmt::Debug for FullName {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        write!(f, "{}.{}", self.namespace, self.name)
592    }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599    pub fn is_valid(name: &str) -> bool {
603        static MATCHER: LazyLock<Regex> =
604            LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605        MATCHER.is_match(name)
606    }
607
608    pub fn validate(name: &str) -> Result<(), AvroError> {
610        match Self::is_valid(name) {
611            true => Ok(()),
612            false => {
613                Err(ParseSchemaError::new(format!(
614                    "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615                    name
616                )).into())
617            }
618        }
619    }
620
621    pub fn make_valid(name: &str) -> String {
629        let mut out = String::new();
630        let mut chars = name.chars();
631        match chars.next() {
632            Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633            Some(ch @ '0'..='9') => {
634                out.push('_');
635                out.push(ch);
636            }
637            _ => out.push('_'),
638        }
639        for ch in chars {
640            match ch {
641                '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642                _ => out.push('_'),
643            }
644        }
645        debug_assert!(
646            Name::is_valid(&out),
647            "make_valid({name}) produced invalid name: {out}"
648        );
649        out
650    }
651
652    pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654        let name = complex
655            .name()
656            .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657        if name.is_empty() {
658            return Err(ParseSchemaError::new(format!(
659                "Name cannot be the empty string: {:?}",
660                complex
661            ))
662            .into());
663        }
664
665        let (namespace, name) = if let Some(index) = name.rfind('.') {
666            let computed_namespace = name[..index].to_owned();
667            let computed_name = name[index + 1..].to_owned();
668            if let Some(provided_namespace) = complex.string("namespace") {
669                if provided_namespace != computed_namespace {
670                    warn!(
671                        "Found dots in name {}, updating to namespace {} and name {}",
672                        name, computed_namespace, computed_name
673                    );
674                }
675            }
676            (Some(computed_namespace), computed_name)
677        } else {
678            (complex.string("namespace"), name)
679        };
680
681        Self::validate(&name)?;
682
683        let aliases: Option<Vec<String>> = complex
684            .get("aliases")
685            .and_then(|aliases| aliases.as_array())
686            .and_then(|aliases| {
687                aliases
688                    .iter()
689                    .map(|alias| alias.as_str())
690                    .map(|alias| alias.map(|a| a.to_string()))
691                    .collect::<Option<_>>()
692            });
693
694        Ok(Name {
695            name,
696            namespace,
697            aliases,
698        })
699    }
700
701    pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703        let mut map = serde_json::map::Map::new();
704        map.insert("name".into(), serde_json::Value::String(name.into()));
705        Self::parse(&map)
706    }
707
708    pub fn fullname(&self, default_namespace: &str) -> FullName {
713        FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714    }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719    pub name: String,
720    pub doc: Documentation,
721    pub default: types::Value,
722    pub order: RecordFieldOrder,
723    pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728    Absent(Schema),
729    Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735    pub name: String,
737    pub doc: Documentation,
739    pub default: Option<Value>,
743    pub schema: SchemaPieceOrNamed,
745    pub order: RecordFieldOrder,
749    pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756    Ascending,
757    Descending,
758    Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765    schemas: Vec<SchemaPieceOrNamed>,
766
767    anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771    named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776    pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777        let mut avindex = BTreeMap::new();
778        let mut nvindex = BTreeMap::new();
779        for (i, schema) in schemas.iter().enumerate() {
780            match schema {
781                SchemaPieceOrNamed::Piece(sp) => {
782                    if let SchemaPiece::Union(_) = sp {
783                        return Err(ParseSchemaError::new(
784                            "Unions may not directly contain a union",
785                        )
786                        .into());
787                    }
788                    let kind = SchemaKind::from(sp);
789                    if avindex.insert(kind, i).is_some() {
790                        return Err(
791                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792                        );
793                    }
794                }
795                SchemaPieceOrNamed::Named(idx) => {
796                    if nvindex.insert(*idx, i).is_some() {
797                        return Err(
798                            ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799                        );
800                    }
801                }
802            }
803        }
804        Ok(UnionSchema {
805            schemas,
806            anon_variant_index: avindex,
807            named_variant_index: nvindex,
808        })
809    }
810
811    pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813        &self.schemas
814    }
815
816    pub fn is_nullable(&self) -> bool {
818        !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
819    }
820
821    pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
822        self.anon_variant_index
823            .get(&SchemaKind::from(sp))
824            .map(|idx| (*idx, &self.schemas[*idx]))
825    }
826
827    pub fn match_ref(
828        &self,
829        other: SchemaPieceRefOrNamed,
830        names_map: &BTreeMap<usize, usize>,
831    ) -> Option<(usize, &SchemaPieceOrNamed)> {
832        match other {
833            SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
834            SchemaPieceRefOrNamed::Named(idx) => names_map
835                .get(&idx)
836                .and_then(|idx| self.named_variant_index.get(idx))
837                .map(|idx| (*idx, &self.schemas[*idx])),
838        }
839    }
840
841    #[inline(always)]
842    pub fn match_(
843        &self,
844        other: &SchemaPieceOrNamed,
845        names_map: &BTreeMap<usize, usize>,
846    ) -> Option<(usize, &SchemaPieceOrNamed)> {
847        self.match_ref(other.as_ref(), names_map)
848    }
849}
850
851impl PartialEq for UnionSchema {
853    fn eq(&self, other: &UnionSchema) -> bool {
854        self.schemas.eq(&other.schemas)
855    }
856}
857
858#[derive(Default)]
859struct SchemaParser {
860    named: Vec<Option<NamedSchemaPiece>>,
861    indices: BTreeMap<FullName, usize>,
862}
863
864impl SchemaParser {
865    fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
866        let top = self.parse_inner("", value)?;
867        let SchemaParser { named, indices } = self;
868        Ok(Schema {
869            named: named.into_iter().map(|o| o.unwrap()).collect(),
870            indices,
871            top,
872        })
873    }
874
875    fn parse_inner(
876        &mut self,
877        default_namespace: &str,
878        value: &Value,
879    ) -> Result<SchemaPieceOrNamed, AvroError> {
880        match *value {
881            Value::String(ref t) => {
882                let name = FullName::from_parts(t.as_str(), None, default_namespace);
883                if let Some(idx) = self.indices.get(&name) {
884                    Ok(SchemaPieceOrNamed::Named(*idx))
885                } else {
886                    Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
887                        t.as_str(),
888                    )?))
889                }
890            }
891            Value::Object(ref data) => self.parse_complex(default_namespace, data),
892            Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
893                self.parse_union(default_namespace, data)?,
894            )),
895            _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
896        }
897    }
898
899    fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
900        let idx = match self.indices.entry(fullname) {
901            Entry::Vacant(ve) => *ve.insert(self.named.len()),
902            Entry::Occupied(oe) => {
903                return Err(ParseSchemaError::new(format!(
904                    "Sub-schema with name {:?} encountered multiple times",
905                    oe.key()
906                ))
907                .into());
908            }
909        };
910        self.named.push(None);
911        Ok(idx)
912    }
913
914    fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
915        assert_none!(self.named[index]);
916        self.named[index] = Some(schema);
917    }
918
919    fn parse_named_type(
920        &mut self,
921        type_name: &str,
922        default_namespace: &str,
923        complex: &Map<String, Value>,
924    ) -> Result<usize, AvroError> {
925        let name = Name::parse(complex)?;
926        match name.name.as_str() {
927            "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
928                return Err(ParseSchemaError::new(format!(
929                    "{} may not be used as a custom type name",
930                    name.name
931                ))
932                .into());
933            }
934            _ => {}
935        };
936        let fullname = name.fullname(default_namespace);
937        let default_namespace = fullname.namespace.clone();
938        let idx = self.alloc_name(fullname.clone())?;
939        let piece = match type_name {
940            "record" => self.parse_record(&default_namespace, complex),
941            "enum" => self.parse_enum(complex),
942            "fixed" => self.parse_fixed(&default_namespace, complex),
943            _ => unreachable!("Unknown named type kind: {}", type_name),
944        }?;
945
946        self.insert(
947            idx,
948            NamedSchemaPiece {
949                name: fullname,
950                piece,
951            },
952        );
953
954        Ok(idx)
955    }
956
957    fn parse_complex(
963        &mut self,
964        default_namespace: &str,
965        complex: &Map<String, Value>,
966    ) -> Result<SchemaPieceOrNamed, AvroError> {
967        match complex.get("type") {
968            Some(&Value::String(ref t)) => Ok(match t.as_str() {
969                "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
970                    t,
971                    default_namespace,
972                    complex,
973                )?),
974                "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
975                "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
976                "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
977                "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
978                "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
979                "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
980                other => {
981                    let name = FullName {
982                        name: other.into(),
983                        namespace: default_namespace.into(),
984                    };
985                    if let Some(idx) = self.indices.get(&name) {
986                        SchemaPieceOrNamed::Named(*idx)
987                    } else {
988                        SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
989                    }
990                }
991            }),
992            Some(&Value::Object(ref data)) => match data.get("type") {
993                Some(value) => self.parse_inner(default_namespace, value),
994                None => Err(
995                    ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
996                ),
997            },
998            _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
999        }
1000    }
1001
1002    fn parse_record(
1005        &mut self,
1006        default_namespace: &str,
1007        complex: &Map<String, Value>,
1008    ) -> Result<SchemaPiece, AvroError> {
1009        let mut lookup = BTreeMap::new();
1010
1011        let fields: Vec<RecordField> = complex
1012            .get("fields")
1013            .and_then(|fields| fields.as_array())
1014            .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1015            .and_then(|fields| {
1016                fields
1017                    .iter()
1018                    .filter_map(|field| field.as_object())
1019                    .enumerate()
1020                    .map(|(position, field)| {
1021                        self.parse_record_field(default_namespace, field, position)
1022                    })
1023                    .collect::<Result<_, _>>()
1024            })?;
1025
1026        for field in &fields {
1027            lookup.insert(field.name.clone(), field.position);
1028        }
1029
1030        Ok(SchemaPiece::Record {
1031            doc: complex.doc(),
1032            fields,
1033            lookup,
1034        })
1035    }
1036
1037    fn parse_record_field(
1039        &mut self,
1040        default_namespace: &str,
1041        field: &Map<String, Value>,
1042        position: usize,
1043    ) -> Result<RecordField, AvroError> {
1044        let name = field
1045            .name()
1046            .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1047
1048        Name::validate(&name)?;
1049
1050        let schema = field
1051            .get("type")
1052            .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1053            .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1054
1055        let default = field.get("default").cloned();
1056
1057        let order = field
1058            .get("order")
1059            .and_then(|order| order.as_str())
1060            .and_then(|order| match order {
1061                "ascending" => Some(RecordFieldOrder::Ascending),
1062                "descending" => Some(RecordFieldOrder::Descending),
1063                "ignore" => Some(RecordFieldOrder::Ignore),
1064                _ => None,
1065            })
1066            .unwrap_or(RecordFieldOrder::Ascending);
1067
1068        Ok(RecordField {
1069            name,
1070            doc: field.doc(),
1071            default,
1072            schema,
1073            order,
1074            position,
1075        })
1076    }
1077
1078    fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1081        let symbols: Vec<String> = complex
1082            .get("symbols")
1083            .and_then(|v| v.as_array())
1084            .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1085            .and_then(|symbols| {
1086                symbols
1087                    .iter()
1088                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1089                    .collect::<Option<_>>()
1090                    .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1091            })?;
1092
1093        let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1094        for symbol in symbols.iter() {
1095            if unique_symbols.contains(symbol) {
1096                return Err(ParseSchemaError::new(format!(
1097                    "Enum symbols must be unique, found multiple: {}",
1098                    symbol
1099                ))
1100                .into());
1101            } else {
1102                unique_symbols.insert(symbol);
1103            }
1104        }
1105
1106        let default_idx = if let Some(default) = complex.get("default") {
1107            let default_str = default.as_str().ok_or_else(|| {
1108                ParseSchemaError::new(format!(
1109                    "Enum default should be a string, got: {:?}",
1110                    default
1111                ))
1112            })?;
1113            let default_idx = symbols
1114                .iter()
1115                .position(|x| x == default_str)
1116                .ok_or_else(|| {
1117                    ParseSchemaError::new(format!(
1118                        "Enum default not found in list of symbols: {}",
1119                        default_str
1120                    ))
1121                })?;
1122            Some(default_idx)
1123        } else {
1124            None
1125        };
1126
1127        Ok(SchemaPiece::Enum {
1128            doc: complex.doc(),
1129            symbols,
1130            default_idx,
1131        })
1132    }
1133
1134    fn parse_array(
1137        &mut self,
1138        default_namespace: &str,
1139        complex: &Map<String, Value>,
1140    ) -> Result<SchemaPiece, AvroError> {
1141        complex
1142            .get("items")
1143            .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1144            .and_then(|items| self.parse_inner(default_namespace, items))
1145            .map(|schema| SchemaPiece::Array(Box::new(schema)))
1146    }
1147
1148    fn parse_map(
1151        &mut self,
1152        default_namespace: &str,
1153        complex: &Map<String, Value>,
1154    ) -> Result<SchemaPiece, AvroError> {
1155        complex
1156            .get("values")
1157            .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1158            .and_then(|items| self.parse_inner(default_namespace, items))
1159            .map(|schema| SchemaPiece::Map(Box::new(schema)))
1160    }
1161
1162    fn parse_union(
1165        &mut self,
1166        default_namespace: &str,
1167        items: &[Value],
1168    ) -> Result<SchemaPiece, AvroError> {
1169        items
1170            .iter()
1171            .map(|value| self.parse_inner(default_namespace, value))
1172            .collect::<Result<Vec<_>, _>>()
1173            .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1174    }
1175
1176    fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1179        let precision = complex
1180            .get("precision")
1181            .and_then(|v| v.as_i64())
1182            .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1183
1184        let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1185
1186        if scale < 0 {
1187            return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1188        }
1189
1190        if precision < 0 {
1191            return Err(
1192                ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1193            );
1194        }
1195
1196        if scale > precision {
1197            return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1198        }
1199
1200        Ok((precision as usize, scale as usize))
1201    }
1202
1203    fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1206        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1207
1208        if let Some("decimal") = logical_type {
1209            match Self::parse_decimal(complex) {
1210                Ok((precision, scale)) => {
1211                    return Ok(SchemaPiece::Decimal {
1212                        precision,
1213                        scale,
1214                        fixed_size: None,
1215                    });
1216                }
1217                Err(e) => warn!(
1218                    "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1219                    complex, e
1220                ),
1221            }
1222        }
1223
1224        Ok(SchemaPiece::Bytes)
1225    }
1226
1227    fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1235        const AVRO_DATE: &str = "date";
1236        const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1237        const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1238        if let Some(name) = complex.get("connect.name") {
1239            if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1240                if name == KAFKA_DATE {
1241                    warn!("using deprecated debezium date format");
1242                }
1243                return Ok(SchemaPiece::Date);
1244            }
1245        }
1246        if let Some(name) = complex.get("logicalType") {
1250            if name == AVRO_DATE {
1251                return Ok(SchemaPiece::Date);
1252            }
1253        }
1254        if !complex.is_empty() {
1255            debug!("parsing complex type as regular int: {:?}", complex);
1256        }
1257        Ok(SchemaPiece::Int)
1258    }
1259
1260    fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1268        const AVRO_MILLI_TS: &str = "timestamp-millis";
1269        const AVRO_MICRO_TS: &str = "timestamp-micros";
1270
1271        const CONNECT_MILLI_TS: &[&str] = &[
1272            "io.debezium.time.Timestamp",
1273            "org.apache.kafka.connect.data.Timestamp",
1274        ];
1275        const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1276
1277        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1278            if CONNECT_MILLI_TS.contains(&&**name) {
1279                return Ok(SchemaPiece::TimestampMilli);
1280            }
1281            if name == CONNECT_MICRO_TS {
1282                return Ok(SchemaPiece::TimestampMicro);
1283            }
1284        }
1285        if let Some(name) = complex.get("logicalType") {
1286            if name == AVRO_MILLI_TS {
1287                return Ok(SchemaPiece::TimestampMilli);
1288            }
1289            if name == AVRO_MICRO_TS {
1290                return Ok(SchemaPiece::TimestampMicro);
1291            }
1292        }
1293        if !complex.is_empty() {
1294            debug!("parsing complex type as regular long: {:?}", complex);
1295        }
1296        Ok(SchemaPiece::Long)
1297    }
1298
1299    fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1300        const CONNECT_JSON: &str = "io.debezium.data.Json";
1301
1302        if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1303            if CONNECT_JSON == name.as_str() {
1304                return SchemaPiece::Json;
1305            }
1306        }
1307        if let Some(name) = complex.get("logicalType") {
1308            if name == "uuid" {
1309                return SchemaPiece::Uuid;
1310            }
1311        }
1312        debug!("parsing complex type as regular string: {:?}", complex);
1313        SchemaPiece::String
1314    }
1315
1316    fn parse_fixed(
1319        &self,
1320        _default_namespace: &str,
1321        complex: &Map<String, Value>,
1322    ) -> Result<SchemaPiece, AvroError> {
1323        let _name = Name::parse(complex)?;
1324
1325        let size = complex
1326            .get("size")
1327            .and_then(|v| v.as_i64())
1328            .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1329        if size <= 0 {
1330            return Err(ParseSchemaError::new(format!(
1331                "Fixed values require a positive size attribute, found: {}",
1332                size
1333            ))
1334            .into());
1335        }
1336
1337        let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1338
1339        if let Some("decimal") = logical_type {
1340            match Self::parse_decimal(complex) {
1341                Ok((precision, scale)) => {
1342                    let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1343                    if precision > max {
1344                        warn!(
1345                            "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1346                            precision, size
1347                        );
1348                    } else {
1349                        return Ok(SchemaPiece::Decimal {
1350                            precision,
1351                            scale,
1352                            fixed_size: Some(size as usize),
1353                        });
1354                    }
1355                }
1356                Err(e) => warn!(
1357                    "parsing decimal as fixed due to parse error: {:?}, {:?}",
1358                    complex, e
1359                ),
1360            }
1361        }
1362
1363        Ok(SchemaPiece::Fixed {
1364            size: size as usize,
1365        })
1366    }
1367}
1368
1369impl Schema {
1370    pub fn parse(value: &Value) -> Result<Self, AvroError> {
1373        let p = SchemaParser {
1374            named: vec![],
1375            indices: Default::default(),
1376        };
1377        p.parse(value)
1378    }
1379
1380    pub fn canonical_form(&self) -> String {
1385        let json = serde_json::to_value(self).unwrap();
1386        parsing_canonical_form(&json)
1387    }
1388
1389    pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1396        let mut d = D::new();
1397        d.update(self.canonical_form());
1398        SchemaFingerprint {
1399            bytes: d.finalize().to_vec(),
1400        }
1401    }
1402
1403    fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1406        match primitive {
1407            "null" => Ok(SchemaPiece::Null),
1408            "boolean" => Ok(SchemaPiece::Boolean),
1409            "int" => Ok(SchemaPiece::Int),
1410            "long" => Ok(SchemaPiece::Long),
1411            "double" => Ok(SchemaPiece::Double),
1412            "float" => Ok(SchemaPiece::Float),
1413            "bytes" => Ok(SchemaPiece::Bytes),
1414            "string" => Ok(SchemaPiece::String),
1415            other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1416        }
1417    }
1418}
1419
1420impl FromStr for Schema {
1421    type Err = AvroError;
1422
1423    fn from_str(input: &str) -> Result<Self, AvroError> {
1425        let value = serde_json::from_str(input)
1426            .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1427        Self::parse(&value)
1428    }
1429}
1430
1431#[derive(Clone, Debug, PartialEq)]
1432pub struct NamedSchemaPiece {
1433    pub name: FullName,
1434    pub piece: SchemaPiece,
1435}
1436
1437#[derive(Copy, Clone, Debug)]
1438pub struct SchemaNode<'a> {
1439    pub root: &'a Schema,
1440    pub inner: &'a SchemaPiece,
1441    pub name: Option<&'a FullName>,
1442}
1443
1444#[derive(Copy, Clone, Debug)]
1445pub enum SchemaPieceRefOrNamed<'a> {
1446    Piece(&'a SchemaPiece),
1447    Named(usize),
1448}
1449
1450impl<'a> SchemaPieceRefOrNamed<'a> {
1451    pub fn get_human_name(&self, root: &Schema) -> String {
1452        match self {
1453            Self::Piece(piece) => format!("{:?}", piece),
1454            Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1455        }
1456    }
1457
1458    #[inline(always)]
1459    pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1460        match self {
1461            SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1462            SchemaPieceRefOrNamed::Named(index) => {
1463                let named_piece = root.lookup(index);
1464                (&named_piece.piece, Some(&named_piece.name))
1465            }
1466        }
1467    }
1468}
1469
1470#[derive(Copy, Clone, Debug)]
1471pub struct SchemaNodeOrNamed<'a> {
1472    pub root: &'a Schema,
1473    pub inner: SchemaPieceRefOrNamed<'a>,
1474}
1475
1476impl<'a> SchemaNodeOrNamed<'a> {
1477    #[inline(always)]
1478    pub fn lookup(self) -> SchemaNode<'a> {
1479        let (inner, name) = self.inner.get_piece_and_name(self.root);
1480        SchemaNode {
1481            root: self.root,
1482            inner,
1483            name,
1484        }
1485    }
1486    #[inline(always)]
1487    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1488        self.step_ref(next.as_ref())
1489    }
1490    #[inline(always)]
1491    pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1492        Self {
1493            root: self.root,
1494            inner: match next {
1495                SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1496                SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1497            },
1498        }
1499    }
1500
1501    pub fn to_schema(self) -> Schema {
1502        let mut cloner = SchemaSubtreeDeepCloner {
1503            old_root: self.root,
1504            old_to_new_names: Default::default(),
1505            named: Default::default(),
1506        };
1507        let piece = cloner.clone_piece_or_named(self.inner);
1508        let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1509        let indices: BTreeMap<FullName, usize> = named
1510            .iter()
1511            .enumerate()
1512            .map(|(i, nsp)| (nsp.name.clone(), i))
1513            .collect();
1514        Schema {
1515            named,
1516            indices,
1517            top: piece,
1518        }
1519    }
1520
1521    pub fn namespace(self) -> Option<&'a str> {
1522        let SchemaNode { name, .. } = self.lookup();
1523        name.map(|FullName { namespace, .. }| namespace.as_str())
1524    }
1525}
1526
1527struct SchemaSubtreeDeepCloner<'a> {
1528    old_root: &'a Schema,
1529    old_to_new_names: BTreeMap<usize, usize>,
1530    named: Vec<Option<NamedSchemaPiece>>,
1531}
1532
1533impl<'a> SchemaSubtreeDeepCloner<'a> {
1534    fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1535        match piece {
1536            SchemaPiece::Null => SchemaPiece::Null,
1537            SchemaPiece::Boolean => SchemaPiece::Boolean,
1538            SchemaPiece::Int => SchemaPiece::Int,
1539            SchemaPiece::Long => SchemaPiece::Long,
1540            SchemaPiece::Float => SchemaPiece::Float,
1541            SchemaPiece::Double => SchemaPiece::Double,
1542            SchemaPiece::Date => SchemaPiece::Date,
1543            SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1544            SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1545            SchemaPiece::Json => SchemaPiece::Json,
1546            SchemaPiece::Decimal {
1547                scale,
1548                precision,
1549                fixed_size,
1550            } => SchemaPiece::Decimal {
1551                scale: *scale,
1552                precision: *precision,
1553                fixed_size: *fixed_size,
1554            },
1555            SchemaPiece::Bytes => SchemaPiece::Bytes,
1556            SchemaPiece::String => SchemaPiece::String,
1557            SchemaPiece::Uuid => SchemaPiece::Uuid,
1558            SchemaPiece::Array(inner) => {
1559                SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1560            }
1561            SchemaPiece::Map(inner) => {
1562                SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1563            }
1564            SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1565                schemas: us
1566                    .schemas
1567                    .iter()
1568                    .map(|s| self.clone_piece_or_named(s.as_ref()))
1569                    .collect(),
1570                anon_variant_index: us.anon_variant_index.clone(),
1571                named_variant_index: us.named_variant_index.clone(),
1572            }),
1573            SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1574            SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1575            SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1576            SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1577            SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1578            SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1579            SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1580            SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1581            SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1582            SchemaPiece::ResolveConcreteUnion {
1583                index,
1584                inner,
1585                n_reader_variants,
1586                reader_null_variant,
1587            } => SchemaPiece::ResolveConcreteUnion {
1588                index: *index,
1589                inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1590                n_reader_variants: *n_reader_variants,
1591                reader_null_variant: *reader_null_variant,
1592            },
1593            SchemaPiece::ResolveUnionUnion {
1594                permutation,
1595                n_reader_variants,
1596                reader_null_variant,
1597            } => SchemaPiece::ResolveUnionUnion {
1598                permutation: permutation
1599                    .clone()
1600                    .into_iter()
1601                    .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1602                    .collect(),
1603                n_reader_variants: *n_reader_variants,
1604                reader_null_variant: *reader_null_variant,
1605            },
1606            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1607                SchemaPiece::ResolveUnionConcrete {
1608                    index: *index,
1609                    inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1610                }
1611            }
1612            SchemaPiece::Record {
1613                doc,
1614                fields,
1615                lookup,
1616            } => SchemaPiece::Record {
1617                doc: doc.clone(),
1618                fields: fields
1619                    .iter()
1620                    .map(|rf| RecordField {
1621                        name: rf.name.clone(),
1622                        doc: rf.doc.clone(),
1623                        default: rf.default.clone(),
1624                        schema: self.clone_piece_or_named(rf.schema.as_ref()),
1625                        order: rf.order,
1626                        position: rf.position,
1627                    })
1628                    .collect(),
1629                lookup: lookup.clone(),
1630            },
1631            SchemaPiece::Enum {
1632                doc,
1633                symbols,
1634                default_idx,
1635            } => SchemaPiece::Enum {
1636                doc: doc.clone(),
1637                symbols: symbols.clone(),
1638                default_idx: *default_idx,
1639            },
1640            SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1641            SchemaPiece::ResolveRecord {
1642                defaults,
1643                fields,
1644                n_reader_fields,
1645            } => SchemaPiece::ResolveRecord {
1646                defaults: defaults.clone(),
1647                fields: fields
1648                    .iter()
1649                    .map(|rf| match rf {
1650                        ResolvedRecordField::Present(rf) => {
1651                            ResolvedRecordField::Present(RecordField {
1652                                name: rf.name.clone(),
1653                                doc: rf.doc.clone(),
1654                                default: rf.default.clone(),
1655                                schema: self.clone_piece_or_named(rf.schema.as_ref()),
1656                                order: rf.order,
1657                                position: rf.position,
1658                            })
1659                        }
1660                        ResolvedRecordField::Absent(writer_schema) => {
1661                            ResolvedRecordField::Absent(writer_schema.clone())
1662                        }
1663                    })
1664                    .collect(),
1665                n_reader_fields: *n_reader_fields,
1666            },
1667            SchemaPiece::ResolveEnum {
1668                doc,
1669                symbols,
1670                default,
1671            } => SchemaPiece::ResolveEnum {
1672                doc: doc.clone(),
1673                symbols: symbols.clone(),
1674                default: default.clone(),
1675            },
1676        }
1677    }
1678    fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1679        match piece {
1680            SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1681            SchemaPieceRefOrNamed::Named(index) => {
1682                let new_index = match self.old_to_new_names.entry(index) {
1683                    Entry::Vacant(ve) => {
1684                        let new_index = self.named.len();
1685                        self.named.push(None);
1686                        ve.insert(new_index);
1687                        let old_named_piece = self.old_root.lookup(index);
1688                        let new_named_piece = NamedSchemaPiece {
1689                            name: old_named_piece.name.clone(),
1690                            piece: self.clone_piece(&old_named_piece.piece),
1691                        };
1692                        self.named[new_index] = Some(new_named_piece);
1693                        new_index
1694                    }
1695                    Entry::Occupied(oe) => *oe.get(),
1696                };
1697                SchemaPieceOrNamed::Named(new_index)
1698            }
1699        }
1700    }
1701}
1702
1703impl<'a> SchemaNode<'a> {
1704    #[inline(always)]
1705    pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1706        let (inner, name) = next.get_piece_and_name(self.root);
1707        Self {
1708            root: self.root,
1709            inner,
1710            name,
1711        }
1712    }
1713
1714    pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1715        use serde_json::Value::*;
1716        let val = match (json, self.inner) {
1717            (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1719                Some(variant) => AvroValue::Union {
1720                    index: 0,
1721                    inner: Box::new(self.step(variant).json_to_value(json)?),
1722                    n_variants: us.schemas.len(),
1723                    null_variant: us
1724                        .schemas
1725                        .iter()
1726                        .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1727                },
1728                None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1729            },
1730            (Null, SchemaPiece::Null) => AvroValue::Null,
1731            (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1732            (Number(n), piece) => {
1733                match piece {
1734                    piece if piece.is_underlying_int() => {
1735                        let i =
1736                            n.as_i64()
1737                                .and_then(|i| i32::try_from(i).ok())
1738                                .ok_or_else(|| {
1739                                    ParseSchemaError(format!("{} is not a 32-bit integer", n))
1740                                })?;
1741                        piece.try_make_int_value(i).unwrap().map_err(|e| {
1742                            ParseSchemaError(format!("invalid default int {i}: {e}"))
1743                        })?
1744                    }
1745                    piece if piece.is_underlying_long() => {
1746                        let i = n.as_i64().ok_or_else(|| {
1747                            ParseSchemaError(format!("{} is not a 64-bit integer", n))
1748                        })?;
1749                        piece.try_make_long_value(i).unwrap().map_err(|e| {
1750                            ParseSchemaError(format!("invalid default long {i}: {e}"))
1751                        })?
1752                    }
1753                    SchemaPiece::Float => {
1754                        let f = n.as_f64().ok_or_else(|| {
1755                            ParseSchemaError(format!("{} is not a 32-bit float", n))
1756                        })?;
1757                        AvroValue::Float(f as f32)
1758                    }
1759                    SchemaPiece::Double => {
1760                        let f = n.as_f64().ok_or_else(|| {
1761                            ParseSchemaError(format!("{} is not a 64-bit float", n))
1762                        })?;
1763                        AvroValue::Double(f)
1764                    }
1765                    _ => {
1766                        return Err(ParseSchemaError(format!(
1767                            "Unexpected number in default: {}",
1768                            n
1769                        )));
1770                    }
1771                }
1772            }
1773            (String(s), piece)
1774                if s.eq_ignore_ascii_case("nan")
1775                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1776            {
1777                match piece {
1778                    SchemaPiece::Float => AvroValue::Float(f32::NAN),
1779                    SchemaPiece::Double => AvroValue::Double(f64::NAN),
1780                    _ => unreachable!(),
1781                }
1782            }
1783            (String(s), piece)
1784                if s.eq_ignore_ascii_case("infinity")
1785                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1786            {
1787                match piece {
1788                    SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1789                    SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1790                    _ => unreachable!(),
1791                }
1792            }
1793            (String(s), piece)
1794                if s.eq_ignore_ascii_case("-infinity")
1795                    && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1796            {
1797                match piece {
1798                    SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1799                    SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1800                    _ => unreachable!(),
1801                }
1802            }
1803            (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1804            (
1805                String(s),
1806                SchemaPiece::Decimal {
1807                    precision, scale, ..
1808                },
1809            ) => AvroValue::Decimal(DecimalValue {
1810                precision: *precision,
1811                scale: *scale,
1812                unscaled: s.clone().into_bytes(),
1813            }),
1814            (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1815            (Object(map), SchemaPiece::Record { fields, .. }) => {
1816                let field_values = fields
1817                    .iter()
1818                    .map(|rf| {
1819                        let jval = map.get(&rf.name).ok_or_else(|| {
1820                            ParseSchemaError(format!(
1821                                "Field not found in default value: {}",
1822                                rf.name
1823                            ))
1824                        })?;
1825                        let value = self.step(&rf.schema).json_to_value(jval)?;
1826                        Ok((rf.name.clone(), value))
1827                    })
1828                    .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1829                AvroValue::Record(field_values)
1830            }
1831            (String(s), SchemaPiece::Enum { symbols, .. }) => {
1832                match symbols.iter().find_position(|sym| s == *sym) {
1833                    Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1834                    None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1835                }
1836            }
1837            (Array(vals), SchemaPiece::Array(inner)) => {
1838                let node = self.step(&**inner);
1839                let vals = vals
1840                    .iter()
1841                    .map(|val| node.json_to_value(val))
1842                    .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1843                AvroValue::Array(vals)
1844            }
1845            (Object(map), SchemaPiece::Map(inner)) => {
1846                let node = self.step(&**inner);
1847                let map = map
1848                    .iter()
1849                    .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1850                    .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1851                AvroValue::Map(map)
1852            }
1853            (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1854                AvroValue::Fixed(*size, s.clone().into_bytes())
1855            }
1856            _ => {
1857                return Err(ParseSchemaError(format!(
1858                    "Json default value {} does not match schema",
1859                    json
1860                )));
1861            }
1862        };
1863        Ok(val)
1864    }
1865}
1866
1867#[derive(Clone)]
1868struct SchemaSerContext<'a> {
1869    node: SchemaNodeOrNamed<'a>,
1870    seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1875    enclosing_ns: &'a str,
1877}
1878
1879#[derive(Clone)]
1880struct RecordFieldSerContext<'a> {
1881    outer: &'a SchemaSerContext<'a>,
1882    inner: &'a RecordField,
1883}
1884
1885impl<'a> SchemaSerContext<'a> {
1886    fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1887        let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1888        Self {
1889            node: self.node.step_ref(next),
1890            seen_named: Rc::clone(&self.seen_named),
1891            enclosing_ns: ns,
1892        }
1893    }
1894}
1895
1896impl<'a> Serialize for SchemaSerContext<'a> {
1897    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1898    where
1899        S: Serializer,
1900    {
1901        match self.node.inner {
1902            SchemaPieceRefOrNamed::Piece(piece) => match piece {
1903                SchemaPiece::Null => serializer.serialize_str("null"),
1904                SchemaPiece::Boolean => serializer.serialize_str("boolean"),
1905                SchemaPiece::Int => serializer.serialize_str("int"),
1906                SchemaPiece::Long => serializer.serialize_str("long"),
1907                SchemaPiece::Float => serializer.serialize_str("float"),
1908                SchemaPiece::Double => serializer.serialize_str("double"),
1909                SchemaPiece::Date => {
1910                    let mut map = serializer.serialize_map(Some(2))?;
1911                    map.serialize_entry("type", "int")?;
1912                    map.serialize_entry("logicalType", "date")?;
1913                    map.end()
1914                }
1915                SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
1916                    let mut map = serializer.serialize_map(Some(2))?;
1917                    map.serialize_entry("type", "long")?;
1918                    if piece == &SchemaPiece::TimestampMilli {
1919                        map.serialize_entry("logicalType", "timestamp-millis")?;
1920                    } else {
1921                        map.serialize_entry("logicalType", "timestamp-micros")?;
1922                    }
1923                    map.end()
1924                }
1925                SchemaPiece::Decimal {
1926                    precision,
1927                    scale,
1928                    fixed_size: None,
1929                } => {
1930                    let mut map = serializer.serialize_map(Some(4))?;
1931                    map.serialize_entry("type", "bytes")?;
1932                    map.serialize_entry("precision", precision)?;
1933                    map.serialize_entry("scale", scale)?;
1934                    map.serialize_entry("logicalType", "decimal")?;
1935                    map.end()
1936                }
1937                SchemaPiece::Bytes => serializer.serialize_str("bytes"),
1938                SchemaPiece::String => serializer.serialize_str("string"),
1939                SchemaPiece::Array(inner) => {
1940                    let mut map = serializer.serialize_map(Some(2))?;
1941                    map.serialize_entry("type", "array")?;
1942                    map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
1943                    map.end()
1944                }
1945                SchemaPiece::Map(inner) => {
1946                    let mut map = serializer.serialize_map(Some(2))?;
1947                    map.serialize_entry("type", "map")?;
1948                    map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
1949                    map.end()
1950                }
1951                SchemaPiece::Union(inner) => {
1952                    let variants = inner.variants();
1953                    let mut seq = serializer.serialize_seq(Some(variants.len()))?;
1954                    for v in variants {
1955                        seq.serialize_element(&self.step(v.as_ref()))?;
1956                    }
1957                    seq.end()
1958                }
1959                SchemaPiece::Json => {
1960                    let mut map = serializer.serialize_map(Some(2))?;
1961                    map.serialize_entry("type", "string")?;
1962                    map.serialize_entry("connect.name", "io.debezium.data.Json")?;
1963                    map.end()
1964                }
1965                SchemaPiece::Uuid => {
1966                    let mut map = serializer.serialize_map(Some(4))?;
1967                    map.serialize_entry("type", "string")?;
1968                    map.serialize_entry("logicalType", "uuid")?;
1969                    map.end()
1970                }
1971                SchemaPiece::Record { .. }
1972                | SchemaPiece::Decimal {
1973                    fixed_size: Some(_),
1974                    ..
1975                }
1976                | SchemaPiece::Enum { .. }
1977                | SchemaPiece::Fixed { .. } => {
1978                    unreachable!("Unexpected named schema piece in anonymous schema position")
1979                }
1980                SchemaPiece::ResolveIntLong
1981                | SchemaPiece::ResolveDateTimestamp
1982                | SchemaPiece::ResolveIntFloat
1983                | SchemaPiece::ResolveIntDouble
1984                | SchemaPiece::ResolveLongFloat
1985                | SchemaPiece::ResolveLongDouble
1986                | SchemaPiece::ResolveFloatDouble
1987                | SchemaPiece::ResolveConcreteUnion { .. }
1988                | SchemaPiece::ResolveUnionUnion { .. }
1989                | SchemaPiece::ResolveUnionConcrete { .. }
1990                | SchemaPiece::ResolveRecord { .. }
1991                | SchemaPiece::ResolveIntTsMicro
1992                | SchemaPiece::ResolveIntTsMilli
1993                | SchemaPiece::ResolveEnum { .. } => {
1994                    panic!("Attempted to serialize resolved schema")
1995                }
1996            },
1997            SchemaPieceRefOrNamed::Named(index) => {
1998                let mut map = self.seen_named.borrow_mut();
1999                let named_piece = match map.get(&index) {
2000                    Some(name) => {
2001                        return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2002                    }
2003                    None => self.node.root.lookup(index),
2004                };
2005                let name = &named_piece.name;
2006                map.insert(index, name.clone());
2007                std::mem::drop(map);
2008                match &named_piece.piece {
2009                    SchemaPiece::Record { doc, fields, .. } => {
2010                        let mut map = serializer.serialize_map(None)?;
2011                        map.serialize_entry("type", "record")?;
2012                        map.serialize_entry("name", &name.name)?;
2013                        if self.enclosing_ns != &name.namespace {
2014                            map.serialize_entry("namespace", &name.namespace)?;
2015                        }
2016                        if let Some(docstr) = doc {
2017                            map.serialize_entry("doc", docstr)?;
2018                        }
2019                        map.serialize_entry(
2021                            "fields",
2022                            &fields
2023                                .iter()
2024                                .map(|f| RecordFieldSerContext {
2025                                    outer: self,
2026                                    inner: f,
2027                                })
2028                                .collect::<Vec<_>>(),
2029                        )?;
2030                        map.end()
2031                    }
2032                    SchemaPiece::Enum {
2033                        symbols,
2034                        default_idx,
2035                        ..
2036                    } => {
2037                        let mut map = serializer.serialize_map(None)?;
2038                        map.serialize_entry("type", "enum")?;
2039                        map.serialize_entry("name", &name.name)?;
2040                        if self.enclosing_ns != &name.namespace {
2041                            map.serialize_entry("namespace", &name.namespace)?;
2042                        }
2043                        map.serialize_entry("symbols", symbols)?;
2044                        if let Some(default_idx) = *default_idx {
2045                            assert!(default_idx < symbols.len());
2046                            map.serialize_entry("default", &symbols[default_idx])?;
2047                        }
2048                        map.end()
2049                    }
2050                    SchemaPiece::Fixed { size } => {
2051                        let mut map = serializer.serialize_map(None)?;
2052                        map.serialize_entry("type", "fixed")?;
2053                        map.serialize_entry("name", &name.name)?;
2054                        if self.enclosing_ns != &name.namespace {
2055                            map.serialize_entry("namespace", &name.namespace)?;
2056                        }
2057                        map.serialize_entry("size", size)?;
2058                        map.end()
2059                    }
2060                    SchemaPiece::Decimal {
2061                        scale,
2062                        precision,
2063                        fixed_size: Some(size),
2064                    } => {
2065                        let mut map = serializer.serialize_map(Some(6))?;
2066                        map.serialize_entry("type", "fixed")?;
2067                        map.serialize_entry("logicalType", "decimal")?;
2068                        map.serialize_entry("name", &name.name)?;
2069                        if self.enclosing_ns != &name.namespace {
2070                            map.serialize_entry("namespace", &name.namespace)?;
2071                        }
2072                        map.serialize_entry("size", size)?;
2073                        map.serialize_entry("precision", precision)?;
2074                        map.serialize_entry("scale", scale)?;
2075                        map.end()
2076                    }
2077                    SchemaPiece::Null
2078                    | SchemaPiece::Boolean
2079                    | SchemaPiece::Int
2080                    | SchemaPiece::Long
2081                    | SchemaPiece::Float
2082                    | SchemaPiece::Double
2083                    | SchemaPiece::Date
2084                    | SchemaPiece::TimestampMilli
2085                    | SchemaPiece::TimestampMicro
2086                    | SchemaPiece::Decimal {
2087                        fixed_size: None, ..
2088                    }
2089                    | SchemaPiece::Bytes
2090                    | SchemaPiece::String
2091                    | SchemaPiece::Array(_)
2092                    | SchemaPiece::Map(_)
2093                    | SchemaPiece::Union(_)
2094                    | SchemaPiece::Uuid
2095                    | SchemaPiece::Json => {
2096                        unreachable!("Unexpected anonymous schema piece in named schema position")
2097                    }
2098                    SchemaPiece::ResolveIntLong
2099                    | SchemaPiece::ResolveDateTimestamp
2100                    | SchemaPiece::ResolveIntFloat
2101                    | SchemaPiece::ResolveIntDouble
2102                    | SchemaPiece::ResolveLongFloat
2103                    | SchemaPiece::ResolveLongDouble
2104                    | SchemaPiece::ResolveFloatDouble
2105                    | SchemaPiece::ResolveConcreteUnion { .. }
2106                    | SchemaPiece::ResolveUnionUnion { .. }
2107                    | SchemaPiece::ResolveUnionConcrete { .. }
2108                    | SchemaPiece::ResolveRecord { .. }
2109                    | SchemaPiece::ResolveIntTsMilli
2110                    | SchemaPiece::ResolveIntTsMicro
2111                    | SchemaPiece::ResolveEnum { .. } => {
2112                        panic!("Attempted to serialize resolved schema")
2113                    }
2114                }
2115            }
2116        }
2117    }
2118}
2119
2120impl Serialize for Schema {
2121    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2122    where
2123        S: Serializer,
2124    {
2125        let ctx = SchemaSerContext {
2126            node: SchemaNodeOrNamed {
2127                root: self,
2128                inner: self.top.as_ref(),
2129            },
2130            seen_named: Rc::new(RefCell::new(Default::default())),
2131            enclosing_ns: "",
2132        };
2133        ctx.serialize(serializer)
2134    }
2135}
2136
2137impl<'a> Serialize for RecordFieldSerContext<'a> {
2138    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2139    where
2140        S: Serializer,
2141    {
2142        let mut map = serializer.serialize_map(None)?;
2143        map.serialize_entry("name", &self.inner.name)?;
2144        map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2145        if let Some(default) = &self.inner.default {
2146            map.serialize_entry("default", default)?;
2147        }
2148        if let Some(doc) = &self.inner.doc {
2149            map.serialize_entry("doc", doc)?;
2150        }
2151        map.end()
2152    }
2153}
2154
2155fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2158    pcf(schema, "", false)
2159}
2160
2161fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2162    match schema {
2163        serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2164        serde_json::Value::String(s) => pcf_string(s),
2165        serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2166        serde_json::Value::Number(n) => n.to_string(),
2167        _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2168    }
2169}
2170
2171fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2172    let default_ns = schema
2174        .get("namespace")
2175        .and_then(|v| v.as_str())
2176        .unwrap_or(enclosing_ns);
2177    let mut fields = Vec::new();
2178    let mut found_next_ns = None;
2179    let mut deferred_values = vec![];
2180    for (k, v) in schema {
2181        if schema.len() == 1 && k == "type" {
2183            if let serde_json::Value::String(s) = v {
2185                return pcf_string(s);
2186            }
2187        }
2188
2189        if field_ordering_position(k).is_none() {
2191            continue;
2192        }
2193
2194        if k == "name" {
2196            if in_fields {
2199                fields.push((
2200                    k,
2201                    format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2202                ));
2203                continue;
2204            }
2205            let name = v.as_str().unwrap();
2207            assert!(
2208                found_next_ns.is_none(),
2209                "`name` must not be specified multiple times"
2210            );
2211            let next_ns = match name.rsplit_once('.') {
2212                None => default_ns,
2213                Some((ns, _name)) => ns,
2214            };
2215            found_next_ns = Some(next_ns);
2216            let n = if next_ns.is_empty() {
2217                Cow::Borrowed(name)
2218            } else {
2219                Cow::Owned(format!("{next_ns}.{name}"))
2220            };
2221            fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2222            continue;
2223        }
2224
2225        if k == "size" {
2227            let i = match v.as_str() {
2228                Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2229                None => v.as_i64().unwrap(),
2230            };
2231            fields.push((k, format!("{}:{}", pcf_string(k), i)));
2232            continue;
2233        }
2234
2235        deferred_values.push((k, v));
2238    }
2239
2240    let next_ns = found_next_ns.unwrap_or(default_ns);
2241    for (k, v) in deferred_values {
2242        fields.push((
2243            k,
2244            format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2245        ));
2246    }
2247
2248    fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2250    let inter = fields
2251        .into_iter()
2252        .map(|(_, v)| v)
2253        .collect::<Vec<_>>()
2254        .join(",");
2255    format!("{{{}}}", inter)
2256}
2257
2258fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2259    let inter = arr
2260        .iter()
2261        .map(|s| pcf(s, enclosing_ns, in_fields))
2262        .collect::<Vec<String>>()
2263        .join(",");
2264    format!("[{}]", inter)
2265}
2266
2267fn pcf_string(s: &str) -> String {
2268    format!("\"{}\"", s)
2269}
2270
2271fn field_ordering_position(field: &str) -> Option<usize> {
2273    let v = match field {
2274        "name" => 1,
2275        "type" => 2,
2276        "fields" => 3,
2277        "symbols" => 4,
2278        "items" => 5,
2279        "values" => 6,
2280        "size" => 7,
2281        _ => return None,
2282    };
2283
2284    Some(v)
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289    use mz_ore::{assert_err, assert_ok};
2290
2291    use crate::types::{Record, ToAvro};
2292
2293    use super::*;
2294
2295    fn check_schema(schema: &str, expected: SchemaPiece) {
2296        let schema = Schema::from_str(schema).unwrap();
2297        assert_eq!(&expected, schema.top_node().inner);
2298
2299        let schema = serde_json::to_string(&schema).unwrap();
2301        let schema = Schema::from_str(&schema).unwrap();
2302        assert_eq!(&expected, schema.top_node().inner);
2303    }
2304
2305    #[mz_ore::test]
2306    fn test_primitive_schema() {
2307        check_schema("\"null\"", SchemaPiece::Null);
2308        check_schema("\"int\"", SchemaPiece::Int);
2309        check_schema("\"double\"", SchemaPiece::Double);
2310    }
2311
2312    #[mz_ore::test]
2313    fn test_array_schema() {
2314        check_schema(
2315            r#"{"type": "array", "items": "string"}"#,
2316            SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2317        );
2318    }
2319
2320    #[mz_ore::test]
2321    fn test_map_schema() {
2322        check_schema(
2323            r#"{"type": "map", "values": "double"}"#,
2324            SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2325        );
2326    }
2327
2328    #[mz_ore::test]
2329    fn test_union_schema() {
2330        check_schema(
2331            r#"["null", "int"]"#,
2332            SchemaPiece::Union(
2333                UnionSchema::new(vec![
2334                    SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2335                    SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2336                ])
2337                .unwrap(),
2338            ),
2339        );
2340    }
2341
2342    #[mz_ore::test]
2343    fn test_multi_union_schema() {
2344        let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2345        assert_ok!(schema);
2346        let schema = schema.unwrap();
2347        let node = schema.top_node();
2348        assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2349        let union_schema = match node.inner {
2350            SchemaPiece::Union(u) => u,
2351            _ => unreachable!(),
2352        };
2353        assert_eq!(union_schema.variants().len(), 5);
2354        let mut variants = union_schema.variants().iter();
2355        assert_eq!(
2356            SchemaKind::from(node.step(variants.next().unwrap())),
2357            SchemaKind::Null
2358        );
2359        assert_eq!(
2360            SchemaKind::from(node.step(variants.next().unwrap())),
2361            SchemaKind::Int
2362        );
2363        assert_eq!(
2364            SchemaKind::from(node.step(variants.next().unwrap())),
2365            SchemaKind::Float
2366        );
2367        assert_eq!(
2368            SchemaKind::from(node.step(variants.next().unwrap())),
2369            SchemaKind::String
2370        );
2371        assert_eq!(
2372            SchemaKind::from(node.step(variants.next().unwrap())),
2373            SchemaKind::Bytes
2374        );
2375        assert_eq!(variants.next(), None);
2376    }
2377
2378    #[mz_ore::test]
2379    fn test_record_schema() {
2380        let schema = r#"
2381                {
2382                    "type": "record",
2383                    "name": "test",
2384                    "doc": "record doc",
2385                    "fields": [
2386                        {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2387                        {"name": "b", "doc": "b doc", "type": "string"}
2388                    ]
2389                }
2390            "#;
2391
2392        let mut lookup = BTreeMap::new();
2393        lookup.insert("a".to_owned(), 0);
2394        lookup.insert("b".to_owned(), 1);
2395
2396        let expected = SchemaPiece::Record {
2397            doc: Some("record doc".to_string()),
2398            fields: vec![
2399                RecordField {
2400                    name: "a".to_string(),
2401                    doc: Some("a doc".to_string()),
2402                    default: Some(Value::Number(42i64.into())),
2403                    schema: SchemaPiece::Long.into(),
2404                    order: RecordFieldOrder::Ascending,
2405                    position: 0,
2406                },
2407                RecordField {
2408                    name: "b".to_string(),
2409                    doc: Some("b doc".to_string()),
2410                    default: None,
2411                    schema: SchemaPiece::String.into(),
2412                    order: RecordFieldOrder::Ascending,
2413                    position: 1,
2414                },
2415            ],
2416            lookup,
2417        };
2418
2419        check_schema(schema, expected);
2420    }
2421
2422    #[mz_ore::test]
2423    fn test_enum_schema() {
2424        let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2425
2426        let expected = SchemaPiece::Enum {
2427            doc: None,
2428            symbols: vec![
2429                "diamonds".to_owned(),
2430                "spades".to_owned(),
2431                "jokers".to_owned(),
2432                "clubs".to_owned(),
2433                "hearts".to_owned(),
2434            ],
2435            default_idx: Some(2),
2436        };
2437
2438        check_schema(schema, expected);
2439
2440        let bad_schema = Schema::from_str(
2441            r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2442        );
2443
2444        assert_err!(bad_schema);
2445    }
2446
2447    #[mz_ore::test]
2448    fn test_fixed_schema() {
2449        let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2450
2451        let expected = SchemaPiece::Fixed { size: 16usize };
2452
2453        check_schema(schema, expected);
2454    }
2455
2456    #[mz_ore::test]
2457    fn test_date_schema() {
2458        let kinds = &[
2459            r#"{
2460                    "type": "int",
2461                    "name": "datish",
2462                    "logicalType": "date"
2463                }"#,
2464            r#"{
2465                    "type": "int",
2466                    "name": "datish",
2467                    "connect.name": "io.debezium.time.Date"
2468                }"#,
2469            r#"{
2470                    "type": "int",
2471                    "name": "datish",
2472                    "connect.name": "org.apache.kafka.connect.data.Date"
2473                }"#,
2474        ];
2475        for kind in kinds {
2476            check_schema(*kind, SchemaPiece::Date);
2477
2478            let schema = Schema::from_str(*kind).unwrap();
2479            assert_eq!(
2480                serde_json::to_string(&schema).unwrap(),
2481                r#"{"type":"int","logicalType":"date"}"#
2482            );
2483        }
2484    }
2485
2486    #[mz_ore::test]
2487    fn new_field_in_middle() {
2488        let reader = r#"{
2489            "type": "record",
2490            "name": "MyRecord",
2491            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2492        }"#;
2493        let writer = r#"{
2494            "type": "record",
2495            "name": "MyRecord",
2496            "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2497        }"#;
2498        let reader = Schema::from_str(reader).unwrap();
2499        let writer = Schema::from_str(writer).unwrap();
2500
2501        let mut record = Record::new(writer.top_node()).unwrap();
2502        record.put("f1", 1);
2503        record.put("f2", 2);
2504        record.put("f_interposed", 42);
2505
2506        let value = record.avro();
2507
2508        let mut buf = vec![];
2509        crate::encode::encode(&value, &writer, &mut buf);
2510
2511        let resolved = resolve_schemas(&writer, &reader).unwrap();
2512
2513        let reader = &mut &buf[..];
2514        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2515        let expected = crate::types::Value::Record(vec![
2516            ("f1".to_string(), crate::types::Value::Int(1)),
2517            ("f2".to_string(), crate::types::Value::Int(2)),
2518        ]);
2519        assert_eq!(reader_value, expected);
2520        assert!(reader.is_empty()); }
2522
2523    #[mz_ore::test]
2524    fn new_field_at_end() {
2525        let reader = r#"{
2526            "type": "record",
2527            "name": "MyRecord",
2528            "fields": [{"name": "f1", "type": "int"}]
2529        }"#;
2530        let writer = r#"{
2531            "type": "record",
2532            "name": "MyRecord",
2533            "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2534        }"#;
2535        let reader = Schema::from_str(reader).unwrap();
2536        let writer = Schema::from_str(writer).unwrap();
2537
2538        let mut record = Record::new(writer.top_node()).unwrap();
2539        record.put("f1", 1);
2540        record.put("f2", 2);
2541
2542        let value = record.avro();
2543
2544        let mut buf = vec![];
2545        crate::encode::encode(&value, &writer, &mut buf);
2546
2547        let resolved = resolve_schemas(&writer, &reader).unwrap();
2548
2549        let reader = &mut &buf[..];
2550        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2551        let expected =
2552            crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2553        assert_eq!(reader_value, expected);
2554        assert!(reader.is_empty()); }
2556
2557    #[mz_ore::test]
2558    fn default_non_nums() {
2559        let reader = r#"{
2560            "type": "record",
2561            "name": "MyRecord",
2562            "fields": [
2563                {"name": "f1", "type": "double", "default": "NaN"},
2564                {"name": "f2", "type": "double", "default": "Infinity"},
2565                {"name": "f3", "type": "double", "default": "-Infinity"}
2566            ]
2567        }
2568        "#;
2569        let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2570
2571        let writer_schema = Schema::from_str(writer).unwrap();
2572        let reader_schema = Schema::from_str(reader).unwrap();
2573        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2574
2575        let record = Record::new(writer_schema.top_node()).unwrap();
2576
2577        let value = record.avro();
2578        let mut buf = vec![];
2579        crate::encode::encode(&value, &writer_schema, &mut buf);
2580
2581        let reader = &mut &buf[..];
2582        let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2583        let expected = crate::types::Value::Record(vec![
2584            ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2585            ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2586            (
2587                "f3".to_string(),
2588                crate::types::Value::Double(f64::NEG_INFINITY),
2589            ),
2590        ]);
2591
2592        #[derive(Debug)]
2593        struct NanEq(crate::types::Value);
2594        impl std::cmp::PartialEq for NanEq {
2595            fn eq(&self, other: &Self) -> bool {
2596                match (self, other) {
2597                    (
2598                        NanEq(crate::types::Value::Double(x)),
2599                        NanEq(crate::types::Value::Double(y)),
2600                    ) if x.is_nan() && y.is_nan() => true,
2601                    (
2602                        NanEq(crate::types::Value::Float(x)),
2603                        NanEq(crate::types::Value::Float(y)),
2604                    ) if x.is_nan() && y.is_nan() => true,
2605                    (
2606                        NanEq(crate::types::Value::Record(xs)),
2607                        NanEq(crate::types::Value::Record(ys)),
2608                    ) => {
2609                        let xs = xs
2610                            .iter()
2611                            .cloned()
2612                            .map(|(k, v)| (k, NanEq(v)))
2613                            .collect::<Vec<_>>();
2614                        let ys = ys
2615                            .iter()
2616                            .cloned()
2617                            .map(|(k, v)| (k, NanEq(v)))
2618                            .collect::<Vec<_>>();
2619
2620                        xs == ys
2621                    }
2622                    (NanEq(x), NanEq(y)) => x == y,
2623                }
2624            }
2625        }
2626
2627        assert_eq!(NanEq(reader_value), NanEq(expected));
2628        assert!(reader.is_empty());
2629    }
2630
2631    #[mz_ore::test]
2632    fn test_decimal_schemas() {
2633        let schema = r#"{
2634                "type": "fixed",
2635                "name": "dec",
2636                "size": 8,
2637                "logicalType": "decimal",
2638                "precision": 12,
2639                "scale": 5
2640            }"#;
2641        let expected = SchemaPiece::Decimal {
2642            precision: 12,
2643            scale: 5,
2644            fixed_size: Some(8),
2645        };
2646        check_schema(schema, expected);
2647
2648        let schema = r#"{
2649                "type": "bytes",
2650                "logicalType": "decimal",
2651                "precision": 12,
2652                "scale": 5
2653            }"#;
2654        let expected = SchemaPiece::Decimal {
2655            precision: 12,
2656            scale: 5,
2657            fixed_size: None,
2658        };
2659        check_schema(schema, expected);
2660
2661        let res = Schema::from_str(
2662            r#"["bytes", {
2663                "type": "bytes",
2664                "logicalType": "decimal",
2665                "precision": 12,
2666                "scale": 5
2667            }]"#,
2668        );
2669        assert_eq!(
2670            res.unwrap_err().to_string(),
2671            "Schema parse error: Unions cannot contain duplicate types"
2672        );
2673
2674        let writer_schema = Schema::from_str(
2675            r#"["null", {
2676                "type": "bytes"
2677            }]"#,
2678        )
2679        .unwrap();
2680        let reader_schema = Schema::from_str(
2681            r#"["null", {
2682                "type": "bytes",
2683                "logicalType": "decimal",
2684                "precision": 12,
2685                "scale": 5
2686            }]"#,
2687        )
2688        .unwrap();
2689        let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2690
2691        let expected = SchemaPiece::ResolveUnionUnion {
2692            permutation: vec![
2693                Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2694                Ok((
2695                    1,
2696                    SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2697                        precision: 12,
2698                        scale: 5,
2699                        fixed_size: None,
2700                    }),
2701                )),
2702            ],
2703            n_reader_variants: 2,
2704            reader_null_variant: Some(0),
2705        };
2706        assert_eq!(resolved.top_node().inner, &expected);
2707    }
2708
2709    #[mz_ore::test]
2710    fn test_no_documentation() {
2711        let schema =
2712            Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2713                .unwrap();
2714
2715        let doc = match schema.top_node().inner {
2716            SchemaPiece::Enum { doc, .. } => doc.clone(),
2717            _ => panic!(),
2718        };
2719
2720        assert_none!(doc);
2721    }
2722
2723    #[mz_ore::test]
2724    fn test_documentation() {
2725        let schema = Schema::from_str(
2726                r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2727            ).unwrap();
2728
2729        let doc = match schema.top_node().inner {
2730            SchemaPiece::Enum { doc, .. } => doc.clone(),
2731            _ => None,
2732        };
2733
2734        assert_eq!("Some documentation".to_owned(), doc.unwrap());
2735    }
2736
2737    #[mz_ore::test]
2738    fn test_namespaces_and_names() {
2739        let schema = Schema::from_str(
2741            r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2742        )
2743        .unwrap();
2744        assert_eq!(schema.named.len(), 1);
2745        assert_eq!(
2746            schema.named[0].name,
2747            FullName {
2748                name: "name".into(),
2749                namespace: "namespace".into()
2750            }
2751        );
2752
2753        let schema =
2755            Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2756                .unwrap();
2757        assert_eq!(schema.named.len(), 1);
2758        assert_eq!(
2759            schema.named[0].name,
2760            FullName {
2761                name: "dots".into(),
2762                namespace: "name.has".into()
2763            }
2764        );
2765
2766        let schema = Schema::from_str(
2768            r#"{"type": "enum", "namespace": "namespace",
2769            "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2770        )
2771        .unwrap();
2772        assert_eq!(schema.named.len(), 1);
2773        assert_eq!(
2774            schema.named[0].name,
2775            FullName {
2776                name: "dots".into(),
2777                namespace: "name.has".into()
2778            }
2779        );
2780
2781        let schema = Schema::from_str(
2784            r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2785            "fields": [{"name": "name", "type": "string"}]}"#,
2786        )
2787        .unwrap();
2788        assert_eq!(schema.named.len(), 1);
2789        assert_eq!(
2790            schema.named[0].name,
2791            FullName {
2792                name: "TestDoc".into(),
2793                namespace: "".into()
2794            }
2795        );
2796
2797        let schema = Schema::from_str(
2799            r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2800            "fields": [{"name": "name", "type": "string"}]}"#,
2801        )
2802        .unwrap();
2803        assert_eq!(schema.named.len(), 1);
2804        assert_eq!(
2805            schema.named[0].name,
2806            FullName {
2807                name: "TestDoc".into(),
2808                namespace: "".into()
2809            }
2810        );
2811
2812        let first = Schema::from_str(
2814            r#"{"type": "fixed", "namespace": "namespace",
2815            "name": "name", "size": 1}"#,
2816        )
2817        .unwrap();
2818        let second = Schema::from_str(
2819            r#"{"type": "fixed", "name": "namespace.name",
2820            "size": 1}"#,
2821        )
2822        .unwrap();
2823        assert_eq!(first.named[0].name, second.named[0].name);
2824
2825        let first = Schema::from_str(
2826            r#"{"type": "fixed", "namespace": "namespace",
2827            "name": "name", "size": 1}"#,
2828        )
2829        .unwrap();
2830        let second = Schema::from_str(
2831            r#"{"type": "fixed", "name": "namespace.Name",
2832            "size": 1}"#,
2833        )
2834        .unwrap();
2835        assert_ne!(first.named[0].name, second.named[0].name);
2836
2837        let first = Schema::from_str(
2838            r#"{"type": "fixed", "namespace": "Namespace",
2839            "name": "name", "size": 1}"#,
2840        )
2841        .unwrap();
2842        let second = Schema::from_str(
2843            r#"{"type": "fixed", "namespace": "namespace",
2844            "name": "name", "size": 1}"#,
2845        )
2846        .unwrap();
2847        assert_ne!(first.named[0].name, second.named[0].name);
2848
2849        assert!(
2852            Schema::from_str(
2853                r#"{"type": "record", "name": "99 problems but a name aint one",
2854            "fields": [{"name": "name", "type": "string"}]}"#
2855            )
2856            .is_err()
2857        );
2858
2859        assert!(
2860            Schema::from_str(
2861                r#"{"type": "record", "name": "!!!",
2862            "fields": [{"name": "name", "type": "string"}]}"#
2863            )
2864            .is_err()
2865        );
2866
2867        assert!(
2868            Schema::from_str(
2869                r#"{"type": "record", "name": "_valid_until_©",
2870            "fields": [{"name": "name", "type": "string"}]}"#
2871            )
2872            .is_err()
2873        );
2874
2875        let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2877              {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2878              {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2879              {"name": "f3", "type": "MyEnum"},
2880              {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2881              {"name": "f5", "type": "other.namespace.OtherEnum"},
2882              {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2883              {"name": "f7", "type": "some.other.ThirdEnum"}
2884             ]}"#).unwrap();
2885        assert_eq!(schema.named.len(), 4);
2886
2887        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2888            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
2896            panic!("Expected SchemaPiece::Record, found something else");
2897        }
2898
2899        let schema = Schema::from_str(
2900            r#"{"type": "record", "name": "x.Y", "fields": [
2901              {"name": "e", "type":
2902                {"type": "record", "name": "Z", "fields": [
2903                  {"name": "f", "type": "x.Y"},
2904                  {"name": "g", "type": "x.Z"}
2905                ]}
2906              }
2907            ]}"#,
2908        )
2909        .unwrap();
2910        assert_eq!(schema.named.len(), 2);
2911
2912        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2913            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
2915            panic!("Expected SchemaPiece::Record, found something else");
2916        }
2917
2918        if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
2919            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
2922            panic!("Expected SchemaPiece::Record, found something else");
2923        }
2924
2925        let schema = Schema::from_str(
2926            r#"{"type": "record", "name": "R", "fields": [
2927              {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
2928                {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
2929                 "symbols": ["Foo", "Bar"]}
2930                }
2931              ]}},
2932              {"name": "t", "type": "Z"}
2933            ]}"#,
2934        )
2935        .unwrap();
2936        assert_eq!(schema.named.len(), 3);
2937
2938        if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2939            assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
2942            panic!("Expected SchemaPiece::Record, found something else");
2943        }
2944    }
2945
2946    #[mz_ore::test]
2949    fn test_schema_is_send() {
2950        fn send<S: Send>(_s: S) {}
2951
2952        let schema = Schema {
2953            named: vec![],
2954            indices: Default::default(),
2955            top: SchemaPiece::Null.into(),
2956        };
2957        send(schema);
2958    }
2959
2960    #[mz_ore::test]
2961    fn test_schema_is_sync() {
2962        fn sync<S: Sync>(_s: S) {}
2963
2964        let schema = Schema {
2965            named: vec![],
2966            indices: Default::default(),
2967            top: SchemaPiece::Null.into(),
2968        };
2969        sync(&schema);
2970        sync(schema);
2971    }
2972
2973    #[mz_ore::test]
2974    #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
2976        use sha2::Sha256;
2977
2978        let raw_schema = r#"
2979        {
2980            "type": "record",
2981            "name": "test",
2982            "fields": [
2983                {"name": "a", "type": "long", "default": 42},
2984                {"name": "b", "type": "string"}
2985            ]
2986        }
2987    "#;
2988        let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
2989        let schema = Schema::from_str(raw_schema).unwrap();
2990        assert_eq!(&schema.canonical_form(), expected_canonical);
2991        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
2992        assert_eq!(
2993            format!("{}", schema.fingerprint::<Sha256>()),
2994            expected_fingerprint
2995        );
2996
2997        let raw_schema = r#"
2998{
2999  "type": "record",
3000  "name": "ns.r1",
3001  "namespace": "ignored",
3002  "fields": [
3003    {
3004      "name": "f1",
3005      "type": {
3006        "type": "fixed",
3007        "name": "r2",
3008        "size": 1
3009      }
3010    }
3011  ]
3012}
3013"#;
3014        let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3015        let schema = Schema::from_str(raw_schema).unwrap();
3016        assert_eq!(&schema.canonical_form(), expected_canonical);
3017        let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3018        assert_eq!(
3019            format!("{}", schema.fingerprint::<Sha256>()),
3020            expected_fingerprint
3021        );
3022    }
3023
3024    #[mz_ore::test]
3025    fn test_make_valid() {
3026        for (input, expected) in [
3027            ("foo", "foo"),
3028            ("az99", "az99"),
3029            ("99az", "_99az"),
3030            ("is,bad", "is_bad"),
3031            ("@#$%", "____"),
3032            ("i-amMisBehaved!", "i_amMisBehaved_"),
3033            ("", "_"),
3034        ] {
3035            let actual = Name::make_valid(input);
3036            assert_eq!(expected, actual, "Name::make_valid({input})")
3037        }
3038    }
3039}