1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51 writer_schema: &Schema,
52 reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54 let r_indices = reader_schema.indices.clone();
55 let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56 writer_schema
57 .indices
58 .iter()
59 .flat_map(|(name, widx)| {
60 r_indices
61 .get(name)
62 .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63 })
64 .unzip();
65 let reader_fullnames = reader_schema
66 .indices
67 .iter()
68 .map(|(f, i)| (*i, f))
69 .collect::<BTreeMap<_, _>>();
70 let mut resolver = SchemaResolver {
71 named: Default::default(),
72 indices: Default::default(),
73 human_readable_field_path: Vec::new(),
74 current_human_readable_path_start: 0,
75 writer_to_reader_names,
76 reader_to_writer_names,
77 reader_to_resolved_names: Default::default(),
78 reader_fullnames,
79 reader_schema,
80 };
81 let writer_node = writer_schema.top_node_or_named();
82 let reader_node = reader_schema.top_node_or_named();
83 let inner = resolver.resolve(writer_node, reader_node)?;
84 let sch = Schema {
85 named: resolver.named.into_iter().map(Option::unwrap).collect(),
86 indices: resolver.indices,
87 top: inner,
88 };
89 Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97 pub fn new<S>(msg: S) -> ParseSchemaError
98 where
99 S: Into<String>,
100 {
101 ParseSchemaError(msg.into())
102 }
103}
104
105impl fmt::Display for ParseSchemaError {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 self.0.fmt(f)
108 }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118 pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 self.bytes
127 .iter()
128 .map(|byte| format!("{:02x}", byte))
129 .collect::<Vec<String>>()
130 .join("")
131 )
132 }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137 Piece(SchemaPiece),
138 Named(usize),
139}
140impl SchemaPieceOrNamed {
141 pub fn get_human_name(&self, root: &Schema) -> String {
142 match self {
143 Self::Piece(piece) => format!("{:?}", piece),
144 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145 }
146 }
147 #[inline(always)]
148 pub fn get_piece_and_name<'a>(
149 &'a self,
150 root: &'a Schema,
151 ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152 self.as_ref().get_piece_and_name(root)
153 }
154
155 #[inline(always)]
156 pub fn as_ref(&self) -> SchemaPieceRefOrNamed {
157 match self {
158 SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159 SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160 }
161 }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165 #[inline(always)]
166 fn from(piece: SchemaPiece) -> Self {
167 Self::Piece(piece)
168 }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173 Null,
175 Boolean,
177 Int,
179 Long,
181 Float,
183 Double,
185 Date,
187 TimestampMilli,
191 TimestampMicro,
195 Decimal {
201 precision: usize,
202 scale: usize,
203 fixed_size: Option<usize>,
204 },
205 Bytes,
208 String,
211 Json,
213 Uuid,
215 Array(Box<SchemaPieceOrNamed>),
218 Map(Box<SchemaPieceOrNamed>),
222 Union(UnionSchema),
224 ResolveIntTsMilli,
227 ResolveIntTsMicro,
230 ResolveDateTimestamp,
233 ResolveIntLong,
235 ResolveIntFloat,
237 ResolveIntDouble,
239 ResolveLongFloat,
241 ResolveLongDouble,
243 ResolveFloatDouble,
245 ResolveConcreteUnion {
248 index: usize,
250 inner: Box<SchemaPieceOrNamed>,
252 n_reader_variants: usize,
253 reader_null_variant: Option<usize>,
254 },
255 ResolveUnionUnion {
258 permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264 n_reader_variants: usize,
265 reader_null_variant: Option<usize>,
266 },
267 ResolveUnionConcrete {
269 index: usize,
270 inner: Box<SchemaPieceOrNamed>,
271 },
272 Record {
277 doc: Documentation,
278 fields: Vec<RecordField>,
279 lookup: BTreeMap<String, usize>,
280 },
281 Enum {
283 doc: Documentation,
284 symbols: Vec<String>,
285 default_idx: Option<usize>,
291 },
292 Fixed { size: usize },
294 ResolveRecord {
297 defaults: Vec<ResolvedDefaultValueField>,
300 fields: Vec<ResolvedRecordField>,
304 n_reader_fields: usize,
306 },
307 ResolveEnum {
310 doc: Documentation,
311 symbols: Vec<Result<(usize, String), String>>,
314 default: Option<(usize, String)>,
316 },
317}
318
319impl SchemaPiece {
320 pub fn is_underlying_int(&self) -> bool {
322 self.try_make_int_value(0).is_some()
323 }
324 pub fn is_underlying_long(&self) -> bool {
326 self.try_make_long_value(0).is_some()
327 }
328 pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331 match self {
332 SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333 SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336 _ => None,
337 }
338 }
339 pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342 match self {
343 SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344 SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345 SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346 _ => None,
347 }
348 }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356 pub(crate) named: Vec<NamedSchemaPiece>,
357 pub(crate) indices: BTreeMap<FullName, usize>,
358 pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362 fn to_string(&self) -> String {
363 let json = serde_json::to_value(self).unwrap();
364 json.to_string()
365 }
366}
367
368impl std::fmt::Debug for Schema {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 if f.alternate() {
371 f.write_str(
372 &serde_json::to_string_pretty(self)
373 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374 )
375 } else {
376 f.write_str(
377 &serde_json::to_string(self)
378 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379 )
380 }
381 }
382}
383
384impl Schema {
385 pub fn top_node(&self) -> SchemaNode {
386 let (inner, name) = self.top.get_piece_and_name(self);
387 SchemaNode {
388 root: self,
389 inner,
390 name,
391 }
392 }
393 pub fn top_node_or_named(&self) -> SchemaNodeOrNamed {
394 SchemaNodeOrNamed {
395 root: self,
396 inner: self.top.as_ref(),
397 }
398 }
399 pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400 &self.named[idx]
401 }
402 pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403 self.indices.get(name).map(|&idx| &self.named[idx])
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416 Null,
418 Boolean,
419 Int,
420 Long,
421 Float,
422 Double,
423 Bytes,
425 String,
426 Array,
427 Map,
428 Union,
429 Record,
430 Enum,
431 Fixed,
432 Unknown,
435}
436
437impl SchemaKind {
438 pub fn name(self) -> &'static str {
439 match self {
440 SchemaKind::Null => "null",
441 SchemaKind::Boolean => "boolean",
442 SchemaKind::Int => "int",
443 SchemaKind::Long => "long",
444 SchemaKind::Float => "float",
445 SchemaKind::Double => "double",
446 SchemaKind::Bytes => "bytes",
447 SchemaKind::String => "string",
448 SchemaKind::Array => "array",
449 SchemaKind::Map => "map",
450 SchemaKind::Union => "union",
451 SchemaKind::Record => "record",
452 SchemaKind::Enum => "enum",
453 SchemaKind::Fixed => "fixed",
454 SchemaKind::Unknown => "unknown",
455 }
456 }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460 #[inline(always)]
461 fn from(piece: &'a SchemaPiece) -> SchemaKind {
462 match piece {
463 SchemaPiece::Null => SchemaKind::Null,
464 SchemaPiece::Boolean => SchemaKind::Boolean,
465 SchemaPiece::Int => SchemaKind::Int,
466 SchemaPiece::Long => SchemaKind::Long,
467 SchemaPiece::Float => SchemaKind::Float,
468 SchemaPiece::Double => SchemaKind::Double,
469 SchemaPiece::Date => SchemaKind::Int,
470 SchemaPiece::TimestampMilli
471 | SchemaPiece::TimestampMicro
472 | SchemaPiece::ResolveIntTsMilli
473 | SchemaPiece::ResolveDateTimestamp
474 | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475 SchemaPiece::Decimal {
476 fixed_size: None, ..
477 } => SchemaKind::Bytes,
478 SchemaPiece::Decimal {
479 fixed_size: Some(_),
480 ..
481 } => SchemaKind::Fixed,
482 SchemaPiece::Bytes => SchemaKind::Bytes,
483 SchemaPiece::String => SchemaKind::String,
484 SchemaPiece::Array(_) => SchemaKind::Array,
485 SchemaPiece::Map(_) => SchemaKind::Map,
486 SchemaPiece::Union(_) => SchemaKind::Union,
487 SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488 SchemaPiece::ResolveIntLong => SchemaKind::Long,
489 SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490 SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491 SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492 SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493 SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494 SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495 SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496 SchemaPiece::Record { .. } => SchemaKind::Record,
497 SchemaPiece::Enum { .. } => SchemaKind::Enum,
498 SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499 SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500 SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501 SchemaPiece::Json => SchemaKind::String,
502 SchemaPiece::Uuid => SchemaKind::String,
503 }
504 }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508 #[inline(always)]
509 fn from(schema: SchemaNode<'a>) -> SchemaKind {
510 SchemaKind::from(schema.inner)
511 }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515 #[inline(always)]
516 fn from(schema: &'a Schema) -> SchemaKind {
517 Self::from(schema.top_node())
518 }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533 pub name: String,
534 pub namespace: Option<String>,
535 pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540 name: String,
541 namespace: String,
542}
543
544impl FullName {
545 pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547 if let Some(ns) = namespace {
548 FullName {
549 name: name.to_owned(),
550 namespace: ns.to_owned(),
551 }
552 } else {
553 let mut split = name.rsplitn(2, '.');
554 let name = split.next().unwrap();
555 let namespace = split.next().unwrap_or(default_namespace);
556
557 FullName {
558 name: name.into(),
559 namespace: namespace.into(),
560 }
561 }
562 }
563 pub fn base_name(&self) -> &str {
564 &self.name
565 }
566 pub fn human_name(&self) -> String {
567 if self.namespace.is_empty() {
568 return self.name.clone();
569 }
570 format!("{}.{}", self.namespace, self.name)
571 }
572 pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577 if enclosing_ns == &self.namespace {
578 Cow::Borrowed(&self.name)
579 } else {
580 Cow::Owned(format!("{}.{}", self.namespace, self.name))
581 }
582 }
583 pub fn namespace(&self) -> &str {
585 &self.namespace
586 }
587}
588
589impl fmt::Debug for FullName {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 write!(f, "{}.{}", self.namespace, self.name)
592 }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599 pub fn is_valid(name: &str) -> bool {
603 static MATCHER: LazyLock<Regex> =
604 LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605 MATCHER.is_match(name)
606 }
607
608 pub fn validate(name: &str) -> Result<(), AvroError> {
610 match Self::is_valid(name) {
611 true => Ok(()),
612 false => {
613 Err(ParseSchemaError::new(format!(
614 "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615 name
616 )).into())
617 }
618 }
619 }
620
621 pub fn make_valid(name: &str) -> String {
629 let mut out = String::new();
630 let mut chars = name.chars();
631 match chars.next() {
632 Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633 Some(ch @ '0'..='9') => {
634 out.push('_');
635 out.push(ch);
636 }
637 _ => out.push('_'),
638 }
639 for ch in chars {
640 match ch {
641 '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642 _ => out.push('_'),
643 }
644 }
645 debug_assert!(
646 Name::is_valid(&out),
647 "make_valid({name}) produced invalid name: {out}"
648 );
649 out
650 }
651
652 pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654 let name = complex
655 .name()
656 .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657 if name.is_empty() {
658 return Err(ParseSchemaError::new(format!(
659 "Name cannot be the empty string: {:?}",
660 complex
661 ))
662 .into());
663 }
664
665 let (namespace, name) = if let Some(index) = name.rfind('.') {
666 let computed_namespace = name[..index].to_owned();
667 let computed_name = name[index + 1..].to_owned();
668 if let Some(provided_namespace) = complex.string("namespace") {
669 if provided_namespace != computed_namespace {
670 warn!(
671 "Found dots in name {}, updating to namespace {} and name {}",
672 name, computed_namespace, computed_name
673 );
674 }
675 }
676 (Some(computed_namespace), computed_name)
677 } else {
678 (complex.string("namespace"), name)
679 };
680
681 Self::validate(&name)?;
682
683 let aliases: Option<Vec<String>> = complex
684 .get("aliases")
685 .and_then(|aliases| aliases.as_array())
686 .and_then(|aliases| {
687 aliases
688 .iter()
689 .map(|alias| alias.as_str())
690 .map(|alias| alias.map(|a| a.to_string()))
691 .collect::<Option<_>>()
692 });
693
694 Ok(Name {
695 name,
696 namespace,
697 aliases,
698 })
699 }
700
701 pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703 let mut map = serde_json::map::Map::new();
704 map.insert("name".into(), serde_json::Value::String(name.into()));
705 Self::parse(&map)
706 }
707
708 pub fn fullname(&self, default_namespace: &str) -> FullName {
713 FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714 }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719 pub name: String,
720 pub doc: Documentation,
721 pub default: types::Value,
722 pub order: RecordFieldOrder,
723 pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728 Absent(Schema),
729 Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735 pub name: String,
737 pub doc: Documentation,
739 pub default: Option<Value>,
743 pub schema: SchemaPieceOrNamed,
745 pub order: RecordFieldOrder,
749 pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756 Ascending,
757 Descending,
758 Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765 schemas: Vec<SchemaPieceOrNamed>,
766
767 anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771 named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776 pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777 let mut avindex = BTreeMap::new();
778 let mut nvindex = BTreeMap::new();
779 for (i, schema) in schemas.iter().enumerate() {
780 match schema {
781 SchemaPieceOrNamed::Piece(sp) => {
782 if let SchemaPiece::Union(_) = sp {
783 return Err(ParseSchemaError::new(
784 "Unions may not directly contain a union",
785 )
786 .into());
787 }
788 let kind = SchemaKind::from(sp);
789 if avindex.insert(kind, i).is_some() {
790 return Err(
791 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792 );
793 }
794 }
795 SchemaPieceOrNamed::Named(idx) => {
796 if nvindex.insert(*idx, i).is_some() {
797 return Err(
798 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799 );
800 }
801 }
802 }
803 }
804 Ok(UnionSchema {
805 schemas,
806 anon_variant_index: avindex,
807 named_variant_index: nvindex,
808 })
809 }
810
811 pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813 &self.schemas
814 }
815
816 pub fn is_nullable(&self) -> bool {
818 !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
819 }
820
821 pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
822 self.anon_variant_index
823 .get(&SchemaKind::from(sp))
824 .map(|idx| (*idx, &self.schemas[*idx]))
825 }
826
827 pub fn match_ref(
828 &self,
829 other: SchemaPieceRefOrNamed,
830 names_map: &BTreeMap<usize, usize>,
831 ) -> Option<(usize, &SchemaPieceOrNamed)> {
832 match other {
833 SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
834 SchemaPieceRefOrNamed::Named(idx) => names_map
835 .get(&idx)
836 .and_then(|idx| self.named_variant_index.get(idx))
837 .map(|idx| (*idx, &self.schemas[*idx])),
838 }
839 }
840
841 #[inline(always)]
842 pub fn match_(
843 &self,
844 other: &SchemaPieceOrNamed,
845 names_map: &BTreeMap<usize, usize>,
846 ) -> Option<(usize, &SchemaPieceOrNamed)> {
847 self.match_ref(other.as_ref(), names_map)
848 }
849}
850
851impl PartialEq for UnionSchema {
853 fn eq(&self, other: &UnionSchema) -> bool {
854 self.schemas.eq(&other.schemas)
855 }
856}
857
858#[derive(Default)]
859struct SchemaParser {
860 named: Vec<Option<NamedSchemaPiece>>,
861 indices: BTreeMap<FullName, usize>,
862}
863
864impl SchemaParser {
865 fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
866 let top = self.parse_inner("", value)?;
867 let SchemaParser { named, indices } = self;
868 Ok(Schema {
869 named: named.into_iter().map(|o| o.unwrap()).collect(),
870 indices,
871 top,
872 })
873 }
874
875 fn parse_inner(
876 &mut self,
877 default_namespace: &str,
878 value: &Value,
879 ) -> Result<SchemaPieceOrNamed, AvroError> {
880 match *value {
881 Value::String(ref t) => {
882 let name = FullName::from_parts(t.as_str(), None, default_namespace);
883 if let Some(idx) = self.indices.get(&name) {
884 Ok(SchemaPieceOrNamed::Named(*idx))
885 } else {
886 Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
887 t.as_str(),
888 )?))
889 }
890 }
891 Value::Object(ref data) => self.parse_complex(default_namespace, data),
892 Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
893 self.parse_union(default_namespace, data)?,
894 )),
895 _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
896 }
897 }
898
899 fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
900 let idx = match self.indices.entry(fullname) {
901 Entry::Vacant(ve) => *ve.insert(self.named.len()),
902 Entry::Occupied(oe) => {
903 return Err(ParseSchemaError::new(format!(
904 "Sub-schema with name {:?} encountered multiple times",
905 oe.key()
906 ))
907 .into());
908 }
909 };
910 self.named.push(None);
911 Ok(idx)
912 }
913
914 fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
915 assert_none!(self.named[index]);
916 self.named[index] = Some(schema);
917 }
918
919 fn parse_named_type(
920 &mut self,
921 type_name: &str,
922 default_namespace: &str,
923 complex: &Map<String, Value>,
924 ) -> Result<usize, AvroError> {
925 let name = Name::parse(complex)?;
926 match name.name.as_str() {
927 "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
928 return Err(ParseSchemaError::new(format!(
929 "{} may not be used as a custom type name",
930 name.name
931 ))
932 .into());
933 }
934 _ => {}
935 };
936 let fullname = name.fullname(default_namespace);
937 let default_namespace = fullname.namespace.clone();
938 let idx = self.alloc_name(fullname.clone())?;
939 let piece = match type_name {
940 "record" => self.parse_record(&default_namespace, complex),
941 "enum" => self.parse_enum(complex),
942 "fixed" => self.parse_fixed(&default_namespace, complex),
943 _ => unreachable!("Unknown named type kind: {}", type_name),
944 }?;
945
946 self.insert(
947 idx,
948 NamedSchemaPiece {
949 name: fullname,
950 piece,
951 },
952 );
953
954 Ok(idx)
955 }
956
957 fn parse_complex(
963 &mut self,
964 default_namespace: &str,
965 complex: &Map<String, Value>,
966 ) -> Result<SchemaPieceOrNamed, AvroError> {
967 match complex.get("type") {
968 Some(&Value::String(ref t)) => Ok(match t.as_str() {
969 "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
970 t,
971 default_namespace,
972 complex,
973 )?),
974 "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
975 "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
976 "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
977 "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
978 "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
979 "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
980 other => {
981 let name = FullName {
982 name: other.into(),
983 namespace: default_namespace.into(),
984 };
985 if let Some(idx) = self.indices.get(&name) {
986 SchemaPieceOrNamed::Named(*idx)
987 } else {
988 SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
989 }
990 }
991 }),
992 Some(&Value::Object(ref data)) => match data.get("type") {
993 Some(value) => self.parse_inner(default_namespace, value),
994 None => Err(
995 ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
996 ),
997 },
998 _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
999 }
1000 }
1001
1002 fn parse_record(
1005 &mut self,
1006 default_namespace: &str,
1007 complex: &Map<String, Value>,
1008 ) -> Result<SchemaPiece, AvroError> {
1009 let mut lookup = BTreeMap::new();
1010
1011 let fields: Vec<RecordField> = complex
1012 .get("fields")
1013 .and_then(|fields| fields.as_array())
1014 .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1015 .and_then(|fields| {
1016 fields
1017 .iter()
1018 .filter_map(|field| field.as_object())
1019 .enumerate()
1020 .map(|(position, field)| {
1021 self.parse_record_field(default_namespace, field, position)
1022 })
1023 .collect::<Result<_, _>>()
1024 })?;
1025
1026 for field in &fields {
1027 lookup.insert(field.name.clone(), field.position);
1028 }
1029
1030 Ok(SchemaPiece::Record {
1031 doc: complex.doc(),
1032 fields,
1033 lookup,
1034 })
1035 }
1036
1037 fn parse_record_field(
1039 &mut self,
1040 default_namespace: &str,
1041 field: &Map<String, Value>,
1042 position: usize,
1043 ) -> Result<RecordField, AvroError> {
1044 let name = field
1045 .name()
1046 .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1047
1048 Name::validate(&name)?;
1049
1050 let schema = field
1051 .get("type")
1052 .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1053 .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1054
1055 let default = field.get("default").cloned();
1056
1057 let order = field
1058 .get("order")
1059 .and_then(|order| order.as_str())
1060 .and_then(|order| match order {
1061 "ascending" => Some(RecordFieldOrder::Ascending),
1062 "descending" => Some(RecordFieldOrder::Descending),
1063 "ignore" => Some(RecordFieldOrder::Ignore),
1064 _ => None,
1065 })
1066 .unwrap_or(RecordFieldOrder::Ascending);
1067
1068 Ok(RecordField {
1069 name,
1070 doc: field.doc(),
1071 default,
1072 schema,
1073 order,
1074 position,
1075 })
1076 }
1077
1078 fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1081 let symbols: Vec<String> = complex
1082 .get("symbols")
1083 .and_then(|v| v.as_array())
1084 .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1085 .and_then(|symbols| {
1086 symbols
1087 .iter()
1088 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1089 .collect::<Option<_>>()
1090 .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1091 })?;
1092
1093 let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1094 for symbol in symbols.iter() {
1095 if unique_symbols.contains(symbol) {
1096 return Err(ParseSchemaError::new(format!(
1097 "Enum symbols must be unique, found multiple: {}",
1098 symbol
1099 ))
1100 .into());
1101 } else {
1102 unique_symbols.insert(symbol);
1103 }
1104 }
1105
1106 let default_idx = if let Some(default) = complex.get("default") {
1107 let default_str = default.as_str().ok_or_else(|| {
1108 ParseSchemaError::new(format!(
1109 "Enum default should be a string, got: {:?}",
1110 default
1111 ))
1112 })?;
1113 let default_idx = symbols
1114 .iter()
1115 .position(|x| x == default_str)
1116 .ok_or_else(|| {
1117 ParseSchemaError::new(format!(
1118 "Enum default not found in list of symbols: {}",
1119 default_str
1120 ))
1121 })?;
1122 Some(default_idx)
1123 } else {
1124 None
1125 };
1126
1127 Ok(SchemaPiece::Enum {
1128 doc: complex.doc(),
1129 symbols,
1130 default_idx,
1131 })
1132 }
1133
1134 fn parse_array(
1137 &mut self,
1138 default_namespace: &str,
1139 complex: &Map<String, Value>,
1140 ) -> Result<SchemaPiece, AvroError> {
1141 complex
1142 .get("items")
1143 .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1144 .and_then(|items| self.parse_inner(default_namespace, items))
1145 .map(|schema| SchemaPiece::Array(Box::new(schema)))
1146 }
1147
1148 fn parse_map(
1151 &mut self,
1152 default_namespace: &str,
1153 complex: &Map<String, Value>,
1154 ) -> Result<SchemaPiece, AvroError> {
1155 complex
1156 .get("values")
1157 .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1158 .and_then(|items| self.parse_inner(default_namespace, items))
1159 .map(|schema| SchemaPiece::Map(Box::new(schema)))
1160 }
1161
1162 fn parse_union(
1165 &mut self,
1166 default_namespace: &str,
1167 items: &[Value],
1168 ) -> Result<SchemaPiece, AvroError> {
1169 items
1170 .iter()
1171 .map(|value| self.parse_inner(default_namespace, value))
1172 .collect::<Result<Vec<_>, _>>()
1173 .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1174 }
1175
1176 fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1179 let precision = complex
1180 .get("precision")
1181 .and_then(|v| v.as_i64())
1182 .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1183
1184 let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1185
1186 if scale < 0 {
1187 return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1188 }
1189
1190 if precision < 0 {
1191 return Err(
1192 ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1193 );
1194 }
1195
1196 if scale > precision {
1197 return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1198 }
1199
1200 Ok((precision as usize, scale as usize))
1201 }
1202
1203 fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1206 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1207
1208 if let Some("decimal") = logical_type {
1209 match Self::parse_decimal(complex) {
1210 Ok((precision, scale)) => {
1211 return Ok(SchemaPiece::Decimal {
1212 precision,
1213 scale,
1214 fixed_size: None,
1215 });
1216 }
1217 Err(e) => warn!(
1218 "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1219 complex, e
1220 ),
1221 }
1222 }
1223
1224 Ok(SchemaPiece::Bytes)
1225 }
1226
1227 fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1235 const AVRO_DATE: &str = "date";
1236 const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1237 const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1238 if let Some(name) = complex.get("connect.name") {
1239 if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1240 if name == KAFKA_DATE {
1241 warn!("using deprecated debezium date format");
1242 }
1243 return Ok(SchemaPiece::Date);
1244 }
1245 }
1246 if let Some(name) = complex.get("logicalType") {
1250 if name == AVRO_DATE {
1251 return Ok(SchemaPiece::Date);
1252 }
1253 }
1254 if !complex.is_empty() {
1255 debug!("parsing complex type as regular int: {:?}", complex);
1256 }
1257 Ok(SchemaPiece::Int)
1258 }
1259
1260 fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1268 const AVRO_MILLI_TS: &str = "timestamp-millis";
1269 const AVRO_MICRO_TS: &str = "timestamp-micros";
1270
1271 const CONNECT_MILLI_TS: &[&str] = &[
1272 "io.debezium.time.Timestamp",
1273 "org.apache.kafka.connect.data.Timestamp",
1274 ];
1275 const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1276
1277 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1278 if CONNECT_MILLI_TS.contains(&&**name) {
1279 return Ok(SchemaPiece::TimestampMilli);
1280 }
1281 if name == CONNECT_MICRO_TS {
1282 return Ok(SchemaPiece::TimestampMicro);
1283 }
1284 }
1285 if let Some(name) = complex.get("logicalType") {
1286 if name == AVRO_MILLI_TS {
1287 return Ok(SchemaPiece::TimestampMilli);
1288 }
1289 if name == AVRO_MICRO_TS {
1290 return Ok(SchemaPiece::TimestampMicro);
1291 }
1292 }
1293 if !complex.is_empty() {
1294 debug!("parsing complex type as regular long: {:?}", complex);
1295 }
1296 Ok(SchemaPiece::Long)
1297 }
1298
1299 fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1300 const CONNECT_JSON: &str = "io.debezium.data.Json";
1301
1302 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1303 if CONNECT_JSON == name.as_str() {
1304 return SchemaPiece::Json;
1305 }
1306 }
1307 if let Some(name) = complex.get("logicalType") {
1308 if name == "uuid" {
1309 return SchemaPiece::Uuid;
1310 }
1311 }
1312 debug!("parsing complex type as regular string: {:?}", complex);
1313 SchemaPiece::String
1314 }
1315
1316 fn parse_fixed(
1319 &self,
1320 _default_namespace: &str,
1321 complex: &Map<String, Value>,
1322 ) -> Result<SchemaPiece, AvroError> {
1323 let _name = Name::parse(complex)?;
1324
1325 let size = complex
1326 .get("size")
1327 .and_then(|v| v.as_i64())
1328 .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1329 if size <= 0 {
1330 return Err(ParseSchemaError::new(format!(
1331 "Fixed values require a positive size attribute, found: {}",
1332 size
1333 ))
1334 .into());
1335 }
1336
1337 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1338
1339 if let Some("decimal") = logical_type {
1340 match Self::parse_decimal(complex) {
1341 Ok((precision, scale)) => {
1342 let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1343 if precision > max {
1344 warn!(
1345 "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1346 precision, size
1347 );
1348 } else {
1349 return Ok(SchemaPiece::Decimal {
1350 precision,
1351 scale,
1352 fixed_size: Some(size as usize),
1353 });
1354 }
1355 }
1356 Err(e) => warn!(
1357 "parsing decimal as fixed due to parse error: {:?}, {:?}",
1358 complex, e
1359 ),
1360 }
1361 }
1362
1363 Ok(SchemaPiece::Fixed {
1364 size: size as usize,
1365 })
1366 }
1367}
1368
1369impl Schema {
1370 pub fn parse(value: &Value) -> Result<Self, AvroError> {
1373 let p = SchemaParser {
1374 named: vec![],
1375 indices: Default::default(),
1376 };
1377 p.parse(value)
1378 }
1379
1380 pub fn canonical_form(&self) -> String {
1385 let json = serde_json::to_value(self).unwrap();
1386 parsing_canonical_form(&json)
1387 }
1388
1389 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1396 let mut d = D::new();
1397 d.update(self.canonical_form());
1398 SchemaFingerprint {
1399 bytes: d.finalize().to_vec(),
1400 }
1401 }
1402
1403 fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1406 match primitive {
1407 "null" => Ok(SchemaPiece::Null),
1408 "boolean" => Ok(SchemaPiece::Boolean),
1409 "int" => Ok(SchemaPiece::Int),
1410 "long" => Ok(SchemaPiece::Long),
1411 "double" => Ok(SchemaPiece::Double),
1412 "float" => Ok(SchemaPiece::Float),
1413 "bytes" => Ok(SchemaPiece::Bytes),
1414 "string" => Ok(SchemaPiece::String),
1415 other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1416 }
1417 }
1418}
1419
1420impl FromStr for Schema {
1421 type Err = AvroError;
1422
1423 fn from_str(input: &str) -> Result<Self, AvroError> {
1425 let value = serde_json::from_str(input)
1426 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1427 Self::parse(&value)
1428 }
1429}
1430
1431#[derive(Clone, Debug, PartialEq)]
1432pub struct NamedSchemaPiece {
1433 pub name: FullName,
1434 pub piece: SchemaPiece,
1435}
1436
1437#[derive(Copy, Clone, Debug)]
1438pub struct SchemaNode<'a> {
1439 pub root: &'a Schema,
1440 pub inner: &'a SchemaPiece,
1441 pub name: Option<&'a FullName>,
1442}
1443
1444#[derive(Copy, Clone, Debug)]
1445pub enum SchemaPieceRefOrNamed<'a> {
1446 Piece(&'a SchemaPiece),
1447 Named(usize),
1448}
1449
1450impl<'a> SchemaPieceRefOrNamed<'a> {
1451 pub fn get_human_name(&self, root: &Schema) -> String {
1452 match self {
1453 Self::Piece(piece) => format!("{:?}", piece),
1454 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1455 }
1456 }
1457
1458 #[inline(always)]
1459 pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1460 match self {
1461 SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1462 SchemaPieceRefOrNamed::Named(index) => {
1463 let named_piece = root.lookup(index);
1464 (&named_piece.piece, Some(&named_piece.name))
1465 }
1466 }
1467 }
1468}
1469
1470#[derive(Copy, Clone, Debug)]
1471pub struct SchemaNodeOrNamed<'a> {
1472 pub root: &'a Schema,
1473 pub inner: SchemaPieceRefOrNamed<'a>,
1474}
1475
1476impl<'a> SchemaNodeOrNamed<'a> {
1477 #[inline(always)]
1478 pub fn lookup(self) -> SchemaNode<'a> {
1479 let (inner, name) = self.inner.get_piece_and_name(self.root);
1480 SchemaNode {
1481 root: self.root,
1482 inner,
1483 name,
1484 }
1485 }
1486 #[inline(always)]
1487 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1488 self.step_ref(next.as_ref())
1489 }
1490 #[inline(always)]
1491 pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1492 Self {
1493 root: self.root,
1494 inner: match next {
1495 SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1496 SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1497 },
1498 }
1499 }
1500
1501 pub fn to_schema(self) -> Schema {
1502 let mut cloner = SchemaSubtreeDeepCloner {
1503 old_root: self.root,
1504 old_to_new_names: Default::default(),
1505 named: Default::default(),
1506 };
1507 let piece = cloner.clone_piece_or_named(self.inner);
1508 let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1509 let indices: BTreeMap<FullName, usize> = named
1510 .iter()
1511 .enumerate()
1512 .map(|(i, nsp)| (nsp.name.clone(), i))
1513 .collect();
1514 Schema {
1515 named,
1516 indices,
1517 top: piece,
1518 }
1519 }
1520
1521 pub fn namespace(self) -> Option<&'a str> {
1522 let SchemaNode { name, .. } = self.lookup();
1523 name.map(|FullName { namespace, .. }| namespace.as_str())
1524 }
1525}
1526
1527struct SchemaSubtreeDeepCloner<'a> {
1528 old_root: &'a Schema,
1529 old_to_new_names: BTreeMap<usize, usize>,
1530 named: Vec<Option<NamedSchemaPiece>>,
1531}
1532
1533impl<'a> SchemaSubtreeDeepCloner<'a> {
1534 fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1535 match piece {
1536 SchemaPiece::Null => SchemaPiece::Null,
1537 SchemaPiece::Boolean => SchemaPiece::Boolean,
1538 SchemaPiece::Int => SchemaPiece::Int,
1539 SchemaPiece::Long => SchemaPiece::Long,
1540 SchemaPiece::Float => SchemaPiece::Float,
1541 SchemaPiece::Double => SchemaPiece::Double,
1542 SchemaPiece::Date => SchemaPiece::Date,
1543 SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1544 SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1545 SchemaPiece::Json => SchemaPiece::Json,
1546 SchemaPiece::Decimal {
1547 scale,
1548 precision,
1549 fixed_size,
1550 } => SchemaPiece::Decimal {
1551 scale: *scale,
1552 precision: *precision,
1553 fixed_size: *fixed_size,
1554 },
1555 SchemaPiece::Bytes => SchemaPiece::Bytes,
1556 SchemaPiece::String => SchemaPiece::String,
1557 SchemaPiece::Uuid => SchemaPiece::Uuid,
1558 SchemaPiece::Array(inner) => {
1559 SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1560 }
1561 SchemaPiece::Map(inner) => {
1562 SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1563 }
1564 SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1565 schemas: us
1566 .schemas
1567 .iter()
1568 .map(|s| self.clone_piece_or_named(s.as_ref()))
1569 .collect(),
1570 anon_variant_index: us.anon_variant_index.clone(),
1571 named_variant_index: us.named_variant_index.clone(),
1572 }),
1573 SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1574 SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1575 SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1576 SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1577 SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1578 SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1579 SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1580 SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1581 SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1582 SchemaPiece::ResolveConcreteUnion {
1583 index,
1584 inner,
1585 n_reader_variants,
1586 reader_null_variant,
1587 } => SchemaPiece::ResolveConcreteUnion {
1588 index: *index,
1589 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1590 n_reader_variants: *n_reader_variants,
1591 reader_null_variant: *reader_null_variant,
1592 },
1593 SchemaPiece::ResolveUnionUnion {
1594 permutation,
1595 n_reader_variants,
1596 reader_null_variant,
1597 } => SchemaPiece::ResolveUnionUnion {
1598 permutation: permutation
1599 .clone()
1600 .into_iter()
1601 .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1602 .collect(),
1603 n_reader_variants: *n_reader_variants,
1604 reader_null_variant: *reader_null_variant,
1605 },
1606 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1607 SchemaPiece::ResolveUnionConcrete {
1608 index: *index,
1609 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1610 }
1611 }
1612 SchemaPiece::Record {
1613 doc,
1614 fields,
1615 lookup,
1616 } => SchemaPiece::Record {
1617 doc: doc.clone(),
1618 fields: fields
1619 .iter()
1620 .map(|rf| RecordField {
1621 name: rf.name.clone(),
1622 doc: rf.doc.clone(),
1623 default: rf.default.clone(),
1624 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1625 order: rf.order,
1626 position: rf.position,
1627 })
1628 .collect(),
1629 lookup: lookup.clone(),
1630 },
1631 SchemaPiece::Enum {
1632 doc,
1633 symbols,
1634 default_idx,
1635 } => SchemaPiece::Enum {
1636 doc: doc.clone(),
1637 symbols: symbols.clone(),
1638 default_idx: *default_idx,
1639 },
1640 SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1641 SchemaPiece::ResolveRecord {
1642 defaults,
1643 fields,
1644 n_reader_fields,
1645 } => SchemaPiece::ResolveRecord {
1646 defaults: defaults.clone(),
1647 fields: fields
1648 .iter()
1649 .map(|rf| match rf {
1650 ResolvedRecordField::Present(rf) => {
1651 ResolvedRecordField::Present(RecordField {
1652 name: rf.name.clone(),
1653 doc: rf.doc.clone(),
1654 default: rf.default.clone(),
1655 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1656 order: rf.order,
1657 position: rf.position,
1658 })
1659 }
1660 ResolvedRecordField::Absent(writer_schema) => {
1661 ResolvedRecordField::Absent(writer_schema.clone())
1662 }
1663 })
1664 .collect(),
1665 n_reader_fields: *n_reader_fields,
1666 },
1667 SchemaPiece::ResolveEnum {
1668 doc,
1669 symbols,
1670 default,
1671 } => SchemaPiece::ResolveEnum {
1672 doc: doc.clone(),
1673 symbols: symbols.clone(),
1674 default: default.clone(),
1675 },
1676 }
1677 }
1678 fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1679 match piece {
1680 SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1681 SchemaPieceRefOrNamed::Named(index) => {
1682 let new_index = match self.old_to_new_names.entry(index) {
1683 Entry::Vacant(ve) => {
1684 let new_index = self.named.len();
1685 self.named.push(None);
1686 ve.insert(new_index);
1687 let old_named_piece = self.old_root.lookup(index);
1688 let new_named_piece = NamedSchemaPiece {
1689 name: old_named_piece.name.clone(),
1690 piece: self.clone_piece(&old_named_piece.piece),
1691 };
1692 self.named[new_index] = Some(new_named_piece);
1693 new_index
1694 }
1695 Entry::Occupied(oe) => *oe.get(),
1696 };
1697 SchemaPieceOrNamed::Named(new_index)
1698 }
1699 }
1700 }
1701}
1702
1703impl<'a> SchemaNode<'a> {
1704 #[inline(always)]
1705 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1706 let (inner, name) = next.get_piece_and_name(self.root);
1707 Self {
1708 root: self.root,
1709 inner,
1710 name,
1711 }
1712 }
1713
1714 pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1715 use serde_json::Value::*;
1716 let val = match (json, self.inner) {
1717 (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1719 Some(variant) => AvroValue::Union {
1720 index: 0,
1721 inner: Box::new(self.step(variant).json_to_value(json)?),
1722 n_variants: us.schemas.len(),
1723 null_variant: us
1724 .schemas
1725 .iter()
1726 .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1727 },
1728 None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1729 },
1730 (Null, SchemaPiece::Null) => AvroValue::Null,
1731 (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1732 (Number(n), piece) => {
1733 match piece {
1734 piece if piece.is_underlying_int() => {
1735 let i =
1736 n.as_i64()
1737 .and_then(|i| i32::try_from(i).ok())
1738 .ok_or_else(|| {
1739 ParseSchemaError(format!("{} is not a 32-bit integer", n))
1740 })?;
1741 piece.try_make_int_value(i).unwrap().map_err(|e| {
1742 ParseSchemaError(format!("invalid default int {i}: {e}"))
1743 })?
1744 }
1745 piece if piece.is_underlying_long() => {
1746 let i = n.as_i64().ok_or_else(|| {
1747 ParseSchemaError(format!("{} is not a 64-bit integer", n))
1748 })?;
1749 piece.try_make_long_value(i).unwrap().map_err(|e| {
1750 ParseSchemaError(format!("invalid default long {i}: {e}"))
1751 })?
1752 }
1753 SchemaPiece::Float => {
1754 let f = n.as_f64().ok_or_else(|| {
1755 ParseSchemaError(format!("{} is not a 32-bit float", n))
1756 })?;
1757 AvroValue::Float(f as f32)
1758 }
1759 SchemaPiece::Double => {
1760 let f = n.as_f64().ok_or_else(|| {
1761 ParseSchemaError(format!("{} is not a 64-bit float", n))
1762 })?;
1763 AvroValue::Double(f)
1764 }
1765 _ => {
1766 return Err(ParseSchemaError(format!(
1767 "Unexpected number in default: {}",
1768 n
1769 )));
1770 }
1771 }
1772 }
1773 (String(s), piece)
1774 if s.eq_ignore_ascii_case("nan")
1775 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1776 {
1777 match piece {
1778 SchemaPiece::Float => AvroValue::Float(f32::NAN),
1779 SchemaPiece::Double => AvroValue::Double(f64::NAN),
1780 _ => unreachable!(),
1781 }
1782 }
1783 (String(s), piece)
1784 if s.eq_ignore_ascii_case("infinity")
1785 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1786 {
1787 match piece {
1788 SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1789 SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1790 _ => unreachable!(),
1791 }
1792 }
1793 (String(s), piece)
1794 if s.eq_ignore_ascii_case("-infinity")
1795 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1796 {
1797 match piece {
1798 SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1799 SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1800 _ => unreachable!(),
1801 }
1802 }
1803 (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1804 (
1805 String(s),
1806 SchemaPiece::Decimal {
1807 precision, scale, ..
1808 },
1809 ) => AvroValue::Decimal(DecimalValue {
1810 precision: *precision,
1811 scale: *scale,
1812 unscaled: s.clone().into_bytes(),
1813 }),
1814 (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1815 (Object(map), SchemaPiece::Record { fields, .. }) => {
1816 let field_values = fields
1817 .iter()
1818 .map(|rf| {
1819 let jval = map.get(&rf.name).ok_or_else(|| {
1820 ParseSchemaError(format!(
1821 "Field not found in default value: {}",
1822 rf.name
1823 ))
1824 })?;
1825 let value = self.step(&rf.schema).json_to_value(jval)?;
1826 Ok((rf.name.clone(), value))
1827 })
1828 .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1829 AvroValue::Record(field_values)
1830 }
1831 (String(s), SchemaPiece::Enum { symbols, .. }) => {
1832 match symbols.iter().find_position(|sym| s == *sym) {
1833 Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1834 None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1835 }
1836 }
1837 (Array(vals), SchemaPiece::Array(inner)) => {
1838 let node = self.step(&**inner);
1839 let vals = vals
1840 .iter()
1841 .map(|val| node.json_to_value(val))
1842 .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1843 AvroValue::Array(vals)
1844 }
1845 (Object(map), SchemaPiece::Map(inner)) => {
1846 let node = self.step(&**inner);
1847 let map = map
1848 .iter()
1849 .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1850 .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1851 AvroValue::Map(map)
1852 }
1853 (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1854 AvroValue::Fixed(*size, s.clone().into_bytes())
1855 }
1856 _ => {
1857 return Err(ParseSchemaError(format!(
1858 "Json default value {} does not match schema",
1859 json
1860 )));
1861 }
1862 };
1863 Ok(val)
1864 }
1865}
1866
1867#[derive(Clone)]
1868struct SchemaSerContext<'a> {
1869 node: SchemaNodeOrNamed<'a>,
1870 seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1875 enclosing_ns: &'a str,
1877}
1878
1879#[derive(Clone)]
1880struct RecordFieldSerContext<'a> {
1881 outer: &'a SchemaSerContext<'a>,
1882 inner: &'a RecordField,
1883}
1884
1885impl<'a> SchemaSerContext<'a> {
1886 fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1887 let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1888 Self {
1889 node: self.node.step_ref(next),
1890 seen_named: Rc::clone(&self.seen_named),
1891 enclosing_ns: ns,
1892 }
1893 }
1894}
1895
1896impl<'a> Serialize for SchemaSerContext<'a> {
1897 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1898 where
1899 S: Serializer,
1900 {
1901 match self.node.inner {
1902 SchemaPieceRefOrNamed::Piece(piece) => match piece {
1903 SchemaPiece::Null => serializer.serialize_str("null"),
1904 SchemaPiece::Boolean => serializer.serialize_str("boolean"),
1905 SchemaPiece::Int => serializer.serialize_str("int"),
1906 SchemaPiece::Long => serializer.serialize_str("long"),
1907 SchemaPiece::Float => serializer.serialize_str("float"),
1908 SchemaPiece::Double => serializer.serialize_str("double"),
1909 SchemaPiece::Date => {
1910 let mut map = serializer.serialize_map(Some(2))?;
1911 map.serialize_entry("type", "int")?;
1912 map.serialize_entry("logicalType", "date")?;
1913 map.end()
1914 }
1915 SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
1916 let mut map = serializer.serialize_map(Some(2))?;
1917 map.serialize_entry("type", "long")?;
1918 if piece == &SchemaPiece::TimestampMilli {
1919 map.serialize_entry("logicalType", "timestamp-millis")?;
1920 } else {
1921 map.serialize_entry("logicalType", "timestamp-micros")?;
1922 }
1923 map.end()
1924 }
1925 SchemaPiece::Decimal {
1926 precision,
1927 scale,
1928 fixed_size: None,
1929 } => {
1930 let mut map = serializer.serialize_map(Some(4))?;
1931 map.serialize_entry("type", "bytes")?;
1932 map.serialize_entry("precision", precision)?;
1933 map.serialize_entry("scale", scale)?;
1934 map.serialize_entry("logicalType", "decimal")?;
1935 map.end()
1936 }
1937 SchemaPiece::Bytes => serializer.serialize_str("bytes"),
1938 SchemaPiece::String => serializer.serialize_str("string"),
1939 SchemaPiece::Array(inner) => {
1940 let mut map = serializer.serialize_map(Some(2))?;
1941 map.serialize_entry("type", "array")?;
1942 map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
1943 map.end()
1944 }
1945 SchemaPiece::Map(inner) => {
1946 let mut map = serializer.serialize_map(Some(2))?;
1947 map.serialize_entry("type", "map")?;
1948 map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
1949 map.end()
1950 }
1951 SchemaPiece::Union(inner) => {
1952 let variants = inner.variants();
1953 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
1954 for v in variants {
1955 seq.serialize_element(&self.step(v.as_ref()))?;
1956 }
1957 seq.end()
1958 }
1959 SchemaPiece::Json => {
1960 let mut map = serializer.serialize_map(Some(2))?;
1961 map.serialize_entry("type", "string")?;
1962 map.serialize_entry("connect.name", "io.debezium.data.Json")?;
1963 map.end()
1964 }
1965 SchemaPiece::Uuid => {
1966 let mut map = serializer.serialize_map(Some(4))?;
1967 map.serialize_entry("type", "string")?;
1968 map.serialize_entry("logicalType", "uuid")?;
1969 map.end()
1970 }
1971 SchemaPiece::Record { .. }
1972 | SchemaPiece::Decimal {
1973 fixed_size: Some(_),
1974 ..
1975 }
1976 | SchemaPiece::Enum { .. }
1977 | SchemaPiece::Fixed { .. } => {
1978 unreachable!("Unexpected named schema piece in anonymous schema position")
1979 }
1980 SchemaPiece::ResolveIntLong
1981 | SchemaPiece::ResolveDateTimestamp
1982 | SchemaPiece::ResolveIntFloat
1983 | SchemaPiece::ResolveIntDouble
1984 | SchemaPiece::ResolveLongFloat
1985 | SchemaPiece::ResolveLongDouble
1986 | SchemaPiece::ResolveFloatDouble
1987 | SchemaPiece::ResolveConcreteUnion { .. }
1988 | SchemaPiece::ResolveUnionUnion { .. }
1989 | SchemaPiece::ResolveUnionConcrete { .. }
1990 | SchemaPiece::ResolveRecord { .. }
1991 | SchemaPiece::ResolveIntTsMicro
1992 | SchemaPiece::ResolveIntTsMilli
1993 | SchemaPiece::ResolveEnum { .. } => {
1994 panic!("Attempted to serialize resolved schema")
1995 }
1996 },
1997 SchemaPieceRefOrNamed::Named(index) => {
1998 let mut map = self.seen_named.borrow_mut();
1999 let named_piece = match map.get(&index) {
2000 Some(name) => {
2001 return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2002 }
2003 None => self.node.root.lookup(index),
2004 };
2005 let name = &named_piece.name;
2006 map.insert(index, name.clone());
2007 std::mem::drop(map);
2008 match &named_piece.piece {
2009 SchemaPiece::Record { doc, fields, .. } => {
2010 let mut map = serializer.serialize_map(None)?;
2011 map.serialize_entry("type", "record")?;
2012 map.serialize_entry("name", &name.name)?;
2013 if self.enclosing_ns != &name.namespace {
2014 map.serialize_entry("namespace", &name.namespace)?;
2015 }
2016 if let Some(docstr) = doc {
2017 map.serialize_entry("doc", docstr)?;
2018 }
2019 map.serialize_entry(
2021 "fields",
2022 &fields
2023 .iter()
2024 .map(|f| RecordFieldSerContext {
2025 outer: self,
2026 inner: f,
2027 })
2028 .collect::<Vec<_>>(),
2029 )?;
2030 map.end()
2031 }
2032 SchemaPiece::Enum {
2033 symbols,
2034 default_idx,
2035 ..
2036 } => {
2037 let mut map = serializer.serialize_map(None)?;
2038 map.serialize_entry("type", "enum")?;
2039 map.serialize_entry("name", &name.name)?;
2040 if self.enclosing_ns != &name.namespace {
2041 map.serialize_entry("namespace", &name.namespace)?;
2042 }
2043 map.serialize_entry("symbols", symbols)?;
2044 if let Some(default_idx) = *default_idx {
2045 assert!(default_idx < symbols.len());
2046 map.serialize_entry("default", &symbols[default_idx])?;
2047 }
2048 map.end()
2049 }
2050 SchemaPiece::Fixed { size } => {
2051 let mut map = serializer.serialize_map(None)?;
2052 map.serialize_entry("type", "fixed")?;
2053 map.serialize_entry("name", &name.name)?;
2054 if self.enclosing_ns != &name.namespace {
2055 map.serialize_entry("namespace", &name.namespace)?;
2056 }
2057 map.serialize_entry("size", size)?;
2058 map.end()
2059 }
2060 SchemaPiece::Decimal {
2061 scale,
2062 precision,
2063 fixed_size: Some(size),
2064 } => {
2065 let mut map = serializer.serialize_map(Some(6))?;
2066 map.serialize_entry("type", "fixed")?;
2067 map.serialize_entry("logicalType", "decimal")?;
2068 map.serialize_entry("name", &name.name)?;
2069 if self.enclosing_ns != &name.namespace {
2070 map.serialize_entry("namespace", &name.namespace)?;
2071 }
2072 map.serialize_entry("size", size)?;
2073 map.serialize_entry("precision", precision)?;
2074 map.serialize_entry("scale", scale)?;
2075 map.end()
2076 }
2077 SchemaPiece::Null
2078 | SchemaPiece::Boolean
2079 | SchemaPiece::Int
2080 | SchemaPiece::Long
2081 | SchemaPiece::Float
2082 | SchemaPiece::Double
2083 | SchemaPiece::Date
2084 | SchemaPiece::TimestampMilli
2085 | SchemaPiece::TimestampMicro
2086 | SchemaPiece::Decimal {
2087 fixed_size: None, ..
2088 }
2089 | SchemaPiece::Bytes
2090 | SchemaPiece::String
2091 | SchemaPiece::Array(_)
2092 | SchemaPiece::Map(_)
2093 | SchemaPiece::Union(_)
2094 | SchemaPiece::Uuid
2095 | SchemaPiece::Json => {
2096 unreachable!("Unexpected anonymous schema piece in named schema position")
2097 }
2098 SchemaPiece::ResolveIntLong
2099 | SchemaPiece::ResolveDateTimestamp
2100 | SchemaPiece::ResolveIntFloat
2101 | SchemaPiece::ResolveIntDouble
2102 | SchemaPiece::ResolveLongFloat
2103 | SchemaPiece::ResolveLongDouble
2104 | SchemaPiece::ResolveFloatDouble
2105 | SchemaPiece::ResolveConcreteUnion { .. }
2106 | SchemaPiece::ResolveUnionUnion { .. }
2107 | SchemaPiece::ResolveUnionConcrete { .. }
2108 | SchemaPiece::ResolveRecord { .. }
2109 | SchemaPiece::ResolveIntTsMilli
2110 | SchemaPiece::ResolveIntTsMicro
2111 | SchemaPiece::ResolveEnum { .. } => {
2112 panic!("Attempted to serialize resolved schema")
2113 }
2114 }
2115 }
2116 }
2117 }
2118}
2119
2120impl Serialize for Schema {
2121 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2122 where
2123 S: Serializer,
2124 {
2125 let ctx = SchemaSerContext {
2126 node: SchemaNodeOrNamed {
2127 root: self,
2128 inner: self.top.as_ref(),
2129 },
2130 seen_named: Rc::new(RefCell::new(Default::default())),
2131 enclosing_ns: "",
2132 };
2133 ctx.serialize(serializer)
2134 }
2135}
2136
2137impl<'a> Serialize for RecordFieldSerContext<'a> {
2138 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2139 where
2140 S: Serializer,
2141 {
2142 let mut map = serializer.serialize_map(None)?;
2143 map.serialize_entry("name", &self.inner.name)?;
2144 map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2145 if let Some(default) = &self.inner.default {
2146 map.serialize_entry("default", default)?;
2147 }
2148 if let Some(doc) = &self.inner.doc {
2149 map.serialize_entry("doc", doc)?;
2150 }
2151 map.end()
2152 }
2153}
2154
2155fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2158 pcf(schema, "", false)
2159}
2160
2161fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2162 match schema {
2163 serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2164 serde_json::Value::String(s) => pcf_string(s),
2165 serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2166 serde_json::Value::Number(n) => n.to_string(),
2167 _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2168 }
2169}
2170
2171fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2172 let default_ns = schema
2174 .get("namespace")
2175 .and_then(|v| v.as_str())
2176 .unwrap_or(enclosing_ns);
2177 let mut fields = Vec::new();
2178 let mut found_next_ns = None;
2179 let mut deferred_values = vec![];
2180 for (k, v) in schema {
2181 if schema.len() == 1 && k == "type" {
2183 if let serde_json::Value::String(s) = v {
2185 return pcf_string(s);
2186 }
2187 }
2188
2189 if field_ordering_position(k).is_none() {
2191 continue;
2192 }
2193
2194 if k == "name" {
2196 if in_fields {
2199 fields.push((
2200 k,
2201 format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2202 ));
2203 continue;
2204 }
2205 let name = v.as_str().unwrap();
2207 assert!(
2208 found_next_ns.is_none(),
2209 "`name` must not be specified multiple times"
2210 );
2211 let next_ns = match name.rsplit_once('.') {
2212 None => default_ns,
2213 Some((ns, _name)) => ns,
2214 };
2215 found_next_ns = Some(next_ns);
2216 let n = if next_ns.is_empty() {
2217 Cow::Borrowed(name)
2218 } else {
2219 Cow::Owned(format!("{next_ns}.{name}"))
2220 };
2221 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2222 continue;
2223 }
2224
2225 if k == "size" {
2227 let i = match v.as_str() {
2228 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2229 None => v.as_i64().unwrap(),
2230 };
2231 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2232 continue;
2233 }
2234
2235 deferred_values.push((k, v));
2238 }
2239
2240 let next_ns = found_next_ns.unwrap_or(default_ns);
2241 for (k, v) in deferred_values {
2242 fields.push((
2243 k,
2244 format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2245 ));
2246 }
2247
2248 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2250 let inter = fields
2251 .into_iter()
2252 .map(|(_, v)| v)
2253 .collect::<Vec<_>>()
2254 .join(",");
2255 format!("{{{}}}", inter)
2256}
2257
2258fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2259 let inter = arr
2260 .iter()
2261 .map(|s| pcf(s, enclosing_ns, in_fields))
2262 .collect::<Vec<String>>()
2263 .join(",");
2264 format!("[{}]", inter)
2265}
2266
2267fn pcf_string(s: &str) -> String {
2268 format!("\"{}\"", s)
2269}
2270
2271fn field_ordering_position(field: &str) -> Option<usize> {
2273 let v = match field {
2274 "name" => 1,
2275 "type" => 2,
2276 "fields" => 3,
2277 "symbols" => 4,
2278 "items" => 5,
2279 "values" => 6,
2280 "size" => 7,
2281 _ => return None,
2282 };
2283
2284 Some(v)
2285}
2286
2287#[cfg(test)]
2288mod tests {
2289 use mz_ore::{assert_err, assert_ok};
2290
2291 use crate::types::{Record, ToAvro};
2292
2293 use super::*;
2294
2295 fn check_schema(schema: &str, expected: SchemaPiece) {
2296 let schema = Schema::from_str(schema).unwrap();
2297 assert_eq!(&expected, schema.top_node().inner);
2298
2299 let schema = serde_json::to_string(&schema).unwrap();
2301 let schema = Schema::from_str(&schema).unwrap();
2302 assert_eq!(&expected, schema.top_node().inner);
2303 }
2304
2305 #[mz_ore::test]
2306 fn test_primitive_schema() {
2307 check_schema("\"null\"", SchemaPiece::Null);
2308 check_schema("\"int\"", SchemaPiece::Int);
2309 check_schema("\"double\"", SchemaPiece::Double);
2310 }
2311
2312 #[mz_ore::test]
2313 fn test_array_schema() {
2314 check_schema(
2315 r#"{"type": "array", "items": "string"}"#,
2316 SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2317 );
2318 }
2319
2320 #[mz_ore::test]
2321 fn test_map_schema() {
2322 check_schema(
2323 r#"{"type": "map", "values": "double"}"#,
2324 SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2325 );
2326 }
2327
2328 #[mz_ore::test]
2329 fn test_union_schema() {
2330 check_schema(
2331 r#"["null", "int"]"#,
2332 SchemaPiece::Union(
2333 UnionSchema::new(vec![
2334 SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2335 SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2336 ])
2337 .unwrap(),
2338 ),
2339 );
2340 }
2341
2342 #[mz_ore::test]
2343 fn test_multi_union_schema() {
2344 let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2345 assert_ok!(schema);
2346 let schema = schema.unwrap();
2347 let node = schema.top_node();
2348 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2349 let union_schema = match node.inner {
2350 SchemaPiece::Union(u) => u,
2351 _ => unreachable!(),
2352 };
2353 assert_eq!(union_schema.variants().len(), 5);
2354 let mut variants = union_schema.variants().iter();
2355 assert_eq!(
2356 SchemaKind::from(node.step(variants.next().unwrap())),
2357 SchemaKind::Null
2358 );
2359 assert_eq!(
2360 SchemaKind::from(node.step(variants.next().unwrap())),
2361 SchemaKind::Int
2362 );
2363 assert_eq!(
2364 SchemaKind::from(node.step(variants.next().unwrap())),
2365 SchemaKind::Float
2366 );
2367 assert_eq!(
2368 SchemaKind::from(node.step(variants.next().unwrap())),
2369 SchemaKind::String
2370 );
2371 assert_eq!(
2372 SchemaKind::from(node.step(variants.next().unwrap())),
2373 SchemaKind::Bytes
2374 );
2375 assert_eq!(variants.next(), None);
2376 }
2377
2378 #[mz_ore::test]
2379 fn test_record_schema() {
2380 let schema = r#"
2381 {
2382 "type": "record",
2383 "name": "test",
2384 "doc": "record doc",
2385 "fields": [
2386 {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2387 {"name": "b", "doc": "b doc", "type": "string"}
2388 ]
2389 }
2390 "#;
2391
2392 let mut lookup = BTreeMap::new();
2393 lookup.insert("a".to_owned(), 0);
2394 lookup.insert("b".to_owned(), 1);
2395
2396 let expected = SchemaPiece::Record {
2397 doc: Some("record doc".to_string()),
2398 fields: vec![
2399 RecordField {
2400 name: "a".to_string(),
2401 doc: Some("a doc".to_string()),
2402 default: Some(Value::Number(42i64.into())),
2403 schema: SchemaPiece::Long.into(),
2404 order: RecordFieldOrder::Ascending,
2405 position: 0,
2406 },
2407 RecordField {
2408 name: "b".to_string(),
2409 doc: Some("b doc".to_string()),
2410 default: None,
2411 schema: SchemaPiece::String.into(),
2412 order: RecordFieldOrder::Ascending,
2413 position: 1,
2414 },
2415 ],
2416 lookup,
2417 };
2418
2419 check_schema(schema, expected);
2420 }
2421
2422 #[mz_ore::test]
2423 fn test_enum_schema() {
2424 let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2425
2426 let expected = SchemaPiece::Enum {
2427 doc: None,
2428 symbols: vec![
2429 "diamonds".to_owned(),
2430 "spades".to_owned(),
2431 "jokers".to_owned(),
2432 "clubs".to_owned(),
2433 "hearts".to_owned(),
2434 ],
2435 default_idx: Some(2),
2436 };
2437
2438 check_schema(schema, expected);
2439
2440 let bad_schema = Schema::from_str(
2441 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2442 );
2443
2444 assert_err!(bad_schema);
2445 }
2446
2447 #[mz_ore::test]
2448 fn test_fixed_schema() {
2449 let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2450
2451 let expected = SchemaPiece::Fixed { size: 16usize };
2452
2453 check_schema(schema, expected);
2454 }
2455
2456 #[mz_ore::test]
2457 fn test_date_schema() {
2458 let kinds = &[
2459 r#"{
2460 "type": "int",
2461 "name": "datish",
2462 "logicalType": "date"
2463 }"#,
2464 r#"{
2465 "type": "int",
2466 "name": "datish",
2467 "connect.name": "io.debezium.time.Date"
2468 }"#,
2469 r#"{
2470 "type": "int",
2471 "name": "datish",
2472 "connect.name": "org.apache.kafka.connect.data.Date"
2473 }"#,
2474 ];
2475 for kind in kinds {
2476 check_schema(*kind, SchemaPiece::Date);
2477
2478 let schema = Schema::from_str(*kind).unwrap();
2479 assert_eq!(
2480 serde_json::to_string(&schema).unwrap(),
2481 r#"{"type":"int","logicalType":"date"}"#
2482 );
2483 }
2484 }
2485
2486 #[mz_ore::test]
2487 fn new_field_in_middle() {
2488 let reader = r#"{
2489 "type": "record",
2490 "name": "MyRecord",
2491 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2492 }"#;
2493 let writer = r#"{
2494 "type": "record",
2495 "name": "MyRecord",
2496 "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2497 }"#;
2498 let reader = Schema::from_str(reader).unwrap();
2499 let writer = Schema::from_str(writer).unwrap();
2500
2501 let mut record = Record::new(writer.top_node()).unwrap();
2502 record.put("f1", 1);
2503 record.put("f2", 2);
2504 record.put("f_interposed", 42);
2505
2506 let value = record.avro();
2507
2508 let mut buf = vec![];
2509 crate::encode::encode(&value, &writer, &mut buf);
2510
2511 let resolved = resolve_schemas(&writer, &reader).unwrap();
2512
2513 let reader = &mut &buf[..];
2514 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2515 let expected = crate::types::Value::Record(vec![
2516 ("f1".to_string(), crate::types::Value::Int(1)),
2517 ("f2".to_string(), crate::types::Value::Int(2)),
2518 ]);
2519 assert_eq!(reader_value, expected);
2520 assert!(reader.is_empty()); }
2522
2523 #[mz_ore::test]
2524 fn new_field_at_end() {
2525 let reader = r#"{
2526 "type": "record",
2527 "name": "MyRecord",
2528 "fields": [{"name": "f1", "type": "int"}]
2529 }"#;
2530 let writer = r#"{
2531 "type": "record",
2532 "name": "MyRecord",
2533 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2534 }"#;
2535 let reader = Schema::from_str(reader).unwrap();
2536 let writer = Schema::from_str(writer).unwrap();
2537
2538 let mut record = Record::new(writer.top_node()).unwrap();
2539 record.put("f1", 1);
2540 record.put("f2", 2);
2541
2542 let value = record.avro();
2543
2544 let mut buf = vec![];
2545 crate::encode::encode(&value, &writer, &mut buf);
2546
2547 let resolved = resolve_schemas(&writer, &reader).unwrap();
2548
2549 let reader = &mut &buf[..];
2550 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2551 let expected =
2552 crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2553 assert_eq!(reader_value, expected);
2554 assert!(reader.is_empty()); }
2556
2557 #[mz_ore::test]
2558 fn default_non_nums() {
2559 let reader = r#"{
2560 "type": "record",
2561 "name": "MyRecord",
2562 "fields": [
2563 {"name": "f1", "type": "double", "default": "NaN"},
2564 {"name": "f2", "type": "double", "default": "Infinity"},
2565 {"name": "f3", "type": "double", "default": "-Infinity"}
2566 ]
2567 }
2568 "#;
2569 let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2570
2571 let writer_schema = Schema::from_str(writer).unwrap();
2572 let reader_schema = Schema::from_str(reader).unwrap();
2573 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2574
2575 let record = Record::new(writer_schema.top_node()).unwrap();
2576
2577 let value = record.avro();
2578 let mut buf = vec![];
2579 crate::encode::encode(&value, &writer_schema, &mut buf);
2580
2581 let reader = &mut &buf[..];
2582 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2583 let expected = crate::types::Value::Record(vec![
2584 ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2585 ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2586 (
2587 "f3".to_string(),
2588 crate::types::Value::Double(f64::NEG_INFINITY),
2589 ),
2590 ]);
2591
2592 #[derive(Debug)]
2593 struct NanEq(crate::types::Value);
2594 impl std::cmp::PartialEq for NanEq {
2595 fn eq(&self, other: &Self) -> bool {
2596 match (self, other) {
2597 (
2598 NanEq(crate::types::Value::Double(x)),
2599 NanEq(crate::types::Value::Double(y)),
2600 ) if x.is_nan() && y.is_nan() => true,
2601 (
2602 NanEq(crate::types::Value::Float(x)),
2603 NanEq(crate::types::Value::Float(y)),
2604 ) if x.is_nan() && y.is_nan() => true,
2605 (
2606 NanEq(crate::types::Value::Record(xs)),
2607 NanEq(crate::types::Value::Record(ys)),
2608 ) => {
2609 let xs = xs
2610 .iter()
2611 .cloned()
2612 .map(|(k, v)| (k, NanEq(v)))
2613 .collect::<Vec<_>>();
2614 let ys = ys
2615 .iter()
2616 .cloned()
2617 .map(|(k, v)| (k, NanEq(v)))
2618 .collect::<Vec<_>>();
2619
2620 xs == ys
2621 }
2622 (NanEq(x), NanEq(y)) => x == y,
2623 }
2624 }
2625 }
2626
2627 assert_eq!(NanEq(reader_value), NanEq(expected));
2628 assert!(reader.is_empty());
2629 }
2630
2631 #[mz_ore::test]
2632 fn test_decimal_schemas() {
2633 let schema = r#"{
2634 "type": "fixed",
2635 "name": "dec",
2636 "size": 8,
2637 "logicalType": "decimal",
2638 "precision": 12,
2639 "scale": 5
2640 }"#;
2641 let expected = SchemaPiece::Decimal {
2642 precision: 12,
2643 scale: 5,
2644 fixed_size: Some(8),
2645 };
2646 check_schema(schema, expected);
2647
2648 let schema = r#"{
2649 "type": "bytes",
2650 "logicalType": "decimal",
2651 "precision": 12,
2652 "scale": 5
2653 }"#;
2654 let expected = SchemaPiece::Decimal {
2655 precision: 12,
2656 scale: 5,
2657 fixed_size: None,
2658 };
2659 check_schema(schema, expected);
2660
2661 let res = Schema::from_str(
2662 r#"["bytes", {
2663 "type": "bytes",
2664 "logicalType": "decimal",
2665 "precision": 12,
2666 "scale": 5
2667 }]"#,
2668 );
2669 assert_eq!(
2670 res.unwrap_err().to_string(),
2671 "Schema parse error: Unions cannot contain duplicate types"
2672 );
2673
2674 let writer_schema = Schema::from_str(
2675 r#"["null", {
2676 "type": "bytes"
2677 }]"#,
2678 )
2679 .unwrap();
2680 let reader_schema = Schema::from_str(
2681 r#"["null", {
2682 "type": "bytes",
2683 "logicalType": "decimal",
2684 "precision": 12,
2685 "scale": 5
2686 }]"#,
2687 )
2688 .unwrap();
2689 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2690
2691 let expected = SchemaPiece::ResolveUnionUnion {
2692 permutation: vec![
2693 Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2694 Ok((
2695 1,
2696 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2697 precision: 12,
2698 scale: 5,
2699 fixed_size: None,
2700 }),
2701 )),
2702 ],
2703 n_reader_variants: 2,
2704 reader_null_variant: Some(0),
2705 };
2706 assert_eq!(resolved.top_node().inner, &expected);
2707 }
2708
2709 #[mz_ore::test]
2710 fn test_no_documentation() {
2711 let schema =
2712 Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2713 .unwrap();
2714
2715 let doc = match schema.top_node().inner {
2716 SchemaPiece::Enum { doc, .. } => doc.clone(),
2717 _ => panic!(),
2718 };
2719
2720 assert_none!(doc);
2721 }
2722
2723 #[mz_ore::test]
2724 fn test_documentation() {
2725 let schema = Schema::from_str(
2726 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2727 ).unwrap();
2728
2729 let doc = match schema.top_node().inner {
2730 SchemaPiece::Enum { doc, .. } => doc.clone(),
2731 _ => None,
2732 };
2733
2734 assert_eq!("Some documentation".to_owned(), doc.unwrap());
2735 }
2736
2737 #[mz_ore::test]
2738 fn test_namespaces_and_names() {
2739 let schema = Schema::from_str(
2741 r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2742 )
2743 .unwrap();
2744 assert_eq!(schema.named.len(), 1);
2745 assert_eq!(
2746 schema.named[0].name,
2747 FullName {
2748 name: "name".into(),
2749 namespace: "namespace".into()
2750 }
2751 );
2752
2753 let schema =
2755 Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2756 .unwrap();
2757 assert_eq!(schema.named.len(), 1);
2758 assert_eq!(
2759 schema.named[0].name,
2760 FullName {
2761 name: "dots".into(),
2762 namespace: "name.has".into()
2763 }
2764 );
2765
2766 let schema = Schema::from_str(
2768 r#"{"type": "enum", "namespace": "namespace",
2769 "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2770 )
2771 .unwrap();
2772 assert_eq!(schema.named.len(), 1);
2773 assert_eq!(
2774 schema.named[0].name,
2775 FullName {
2776 name: "dots".into(),
2777 namespace: "name.has".into()
2778 }
2779 );
2780
2781 let schema = Schema::from_str(
2784 r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2785 "fields": [{"name": "name", "type": "string"}]}"#,
2786 )
2787 .unwrap();
2788 assert_eq!(schema.named.len(), 1);
2789 assert_eq!(
2790 schema.named[0].name,
2791 FullName {
2792 name: "TestDoc".into(),
2793 namespace: "".into()
2794 }
2795 );
2796
2797 let schema = Schema::from_str(
2799 r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2800 "fields": [{"name": "name", "type": "string"}]}"#,
2801 )
2802 .unwrap();
2803 assert_eq!(schema.named.len(), 1);
2804 assert_eq!(
2805 schema.named[0].name,
2806 FullName {
2807 name: "TestDoc".into(),
2808 namespace: "".into()
2809 }
2810 );
2811
2812 let first = Schema::from_str(
2814 r#"{"type": "fixed", "namespace": "namespace",
2815 "name": "name", "size": 1}"#,
2816 )
2817 .unwrap();
2818 let second = Schema::from_str(
2819 r#"{"type": "fixed", "name": "namespace.name",
2820 "size": 1}"#,
2821 )
2822 .unwrap();
2823 assert_eq!(first.named[0].name, second.named[0].name);
2824
2825 let first = Schema::from_str(
2826 r#"{"type": "fixed", "namespace": "namespace",
2827 "name": "name", "size": 1}"#,
2828 )
2829 .unwrap();
2830 let second = Schema::from_str(
2831 r#"{"type": "fixed", "name": "namespace.Name",
2832 "size": 1}"#,
2833 )
2834 .unwrap();
2835 assert_ne!(first.named[0].name, second.named[0].name);
2836
2837 let first = Schema::from_str(
2838 r#"{"type": "fixed", "namespace": "Namespace",
2839 "name": "name", "size": 1}"#,
2840 )
2841 .unwrap();
2842 let second = Schema::from_str(
2843 r#"{"type": "fixed", "namespace": "namespace",
2844 "name": "name", "size": 1}"#,
2845 )
2846 .unwrap();
2847 assert_ne!(first.named[0].name, second.named[0].name);
2848
2849 assert!(
2852 Schema::from_str(
2853 r#"{"type": "record", "name": "99 problems but a name aint one",
2854 "fields": [{"name": "name", "type": "string"}]}"#
2855 )
2856 .is_err()
2857 );
2858
2859 assert!(
2860 Schema::from_str(
2861 r#"{"type": "record", "name": "!!!",
2862 "fields": [{"name": "name", "type": "string"}]}"#
2863 )
2864 .is_err()
2865 );
2866
2867 assert!(
2868 Schema::from_str(
2869 r#"{"type": "record", "name": "_valid_until_©",
2870 "fields": [{"name": "name", "type": "string"}]}"#
2871 )
2872 .is_err()
2873 );
2874
2875 let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2877 {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2878 {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2879 {"name": "f3", "type": "MyEnum"},
2880 {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2881 {"name": "f5", "type": "other.namespace.OtherEnum"},
2882 {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2883 {"name": "f7", "type": "some.other.ThirdEnum"}
2884 ]}"#).unwrap();
2885 assert_eq!(schema.named.len(), 4);
2886
2887 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2888 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
2896 panic!("Expected SchemaPiece::Record, found something else");
2897 }
2898
2899 let schema = Schema::from_str(
2900 r#"{"type": "record", "name": "x.Y", "fields": [
2901 {"name": "e", "type":
2902 {"type": "record", "name": "Z", "fields": [
2903 {"name": "f", "type": "x.Y"},
2904 {"name": "g", "type": "x.Z"}
2905 ]}
2906 }
2907 ]}"#,
2908 )
2909 .unwrap();
2910 assert_eq!(schema.named.len(), 2);
2911
2912 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2913 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
2915 panic!("Expected SchemaPiece::Record, found something else");
2916 }
2917
2918 if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
2919 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
2922 panic!("Expected SchemaPiece::Record, found something else");
2923 }
2924
2925 let schema = Schema::from_str(
2926 r#"{"type": "record", "name": "R", "fields": [
2927 {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
2928 {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
2929 "symbols": ["Foo", "Bar"]}
2930 }
2931 ]}},
2932 {"name": "t", "type": "Z"}
2933 ]}"#,
2934 )
2935 .unwrap();
2936 assert_eq!(schema.named.len(), 3);
2937
2938 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2939 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
2942 panic!("Expected SchemaPiece::Record, found something else");
2943 }
2944 }
2945
2946 #[mz_ore::test]
2949 fn test_schema_is_send() {
2950 fn send<S: Send>(_s: S) {}
2951
2952 let schema = Schema {
2953 named: vec![],
2954 indices: Default::default(),
2955 top: SchemaPiece::Null.into(),
2956 };
2957 send(schema);
2958 }
2959
2960 #[mz_ore::test]
2961 fn test_schema_is_sync() {
2962 fn sync<S: Sync>(_s: S) {}
2963
2964 let schema = Schema {
2965 named: vec![],
2966 indices: Default::default(),
2967 top: SchemaPiece::Null.into(),
2968 };
2969 sync(&schema);
2970 sync(schema);
2971 }
2972
2973 #[mz_ore::test]
2974 #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
2976 use sha2::Sha256;
2977
2978 let raw_schema = r#"
2979 {
2980 "type": "record",
2981 "name": "test",
2982 "fields": [
2983 {"name": "a", "type": "long", "default": 42},
2984 {"name": "b", "type": "string"}
2985 ]
2986 }
2987 "#;
2988 let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
2989 let schema = Schema::from_str(raw_schema).unwrap();
2990 assert_eq!(&schema.canonical_form(), expected_canonical);
2991 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
2992 assert_eq!(
2993 format!("{}", schema.fingerprint::<Sha256>()),
2994 expected_fingerprint
2995 );
2996
2997 let raw_schema = r#"
2998{
2999 "type": "record",
3000 "name": "ns.r1",
3001 "namespace": "ignored",
3002 "fields": [
3003 {
3004 "name": "f1",
3005 "type": {
3006 "type": "fixed",
3007 "name": "r2",
3008 "size": 1
3009 }
3010 }
3011 ]
3012}
3013"#;
3014 let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3015 let schema = Schema::from_str(raw_schema).unwrap();
3016 assert_eq!(&schema.canonical_form(), expected_canonical);
3017 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3018 assert_eq!(
3019 format!("{}", schema.fingerprint::<Sha256>()),
3020 expected_fingerprint
3021 );
3022 }
3023
3024 #[mz_ore::test]
3025 fn test_make_valid() {
3026 for (input, expected) in [
3027 ("foo", "foo"),
3028 ("az99", "az99"),
3029 ("99az", "_99az"),
3030 ("is,bad", "is_bad"),
3031 ("@#$%", "____"),
3032 ("i-amMisBehaved!", "i_amMisBehaved_"),
3033 ("", "_"),
3034 ] {
3035 let actual = Name::make_valid(input);
3036 assert_eq!(expected, actual, "Name::make_valid({input})")
3037 }
3038 }
3039}