1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51 writer_schema: &Schema,
52 reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54 let r_indices = reader_schema.indices.clone();
55 let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56 writer_schema
57 .indices
58 .iter()
59 .flat_map(|(name, widx)| {
60 r_indices
61 .get(name)
62 .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63 })
64 .unzip();
65 let reader_fullnames = reader_schema
66 .indices
67 .iter()
68 .map(|(f, i)| (*i, f))
69 .collect::<BTreeMap<_, _>>();
70 let mut resolver = SchemaResolver {
71 named: Default::default(),
72 indices: Default::default(),
73 human_readable_field_path: Vec::new(),
74 current_human_readable_path_start: 0,
75 writer_to_reader_names,
76 reader_to_writer_names,
77 reader_to_resolved_names: Default::default(),
78 reader_fullnames,
79 reader_schema,
80 };
81 let writer_node = writer_schema.top_node_or_named();
82 let reader_node = reader_schema.top_node_or_named();
83 let inner = resolver.resolve(writer_node, reader_node)?;
84 let sch = Schema {
85 named: resolver.named.into_iter().map(Option::unwrap).collect(),
86 indices: resolver.indices,
87 top: inner,
88 };
89 Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97 pub fn new<S>(msg: S) -> ParseSchemaError
98 where
99 S: Into<String>,
100 {
101 ParseSchemaError(msg.into())
102 }
103}
104
105impl fmt::Display for ParseSchemaError {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 self.0.fmt(f)
108 }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118 pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 self.bytes
127 .iter()
128 .map(|byte| format!("{:02x}", byte))
129 .collect::<Vec<String>>()
130 .join("")
131 )
132 }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137 Piece(SchemaPiece),
138 Named(usize),
139}
140impl SchemaPieceOrNamed {
141 pub fn get_human_name(&self, root: &Schema) -> String {
142 match self {
143 Self::Piece(piece) => format!("{:?}", piece),
144 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145 }
146 }
147 #[inline(always)]
148 pub fn get_piece_and_name<'a>(
149 &'a self,
150 root: &'a Schema,
151 ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152 self.as_ref().get_piece_and_name(root)
153 }
154
155 #[inline(always)]
156 pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157 match self {
158 SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159 SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160 }
161 }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165 #[inline(always)]
166 fn from(piece: SchemaPiece) -> Self {
167 Self::Piece(piece)
168 }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173 Null,
175 Boolean,
177 Int,
179 Long,
181 Float,
183 Double,
185 Date,
187 TimestampMilli,
191 TimestampMicro,
195 Decimal {
201 precision: usize,
202 scale: usize,
203 fixed_size: Option<usize>,
204 },
205 Bytes,
208 String,
211 Json,
213 Uuid,
215 Array(Box<SchemaPieceOrNamed>),
218 Map(Box<SchemaPieceOrNamed>),
222 Union(UnionSchema),
224 ResolveIntTsMilli,
227 ResolveIntTsMicro,
230 ResolveDateTimestamp,
233 ResolveIntLong,
235 ResolveIntFloat,
237 ResolveIntDouble,
239 ResolveLongFloat,
241 ResolveLongDouble,
243 ResolveFloatDouble,
245 ResolveConcreteUnion {
248 index: usize,
250 inner: Box<SchemaPieceOrNamed>,
252 n_reader_variants: usize,
253 reader_null_variant: Option<usize>,
254 },
255 ResolveUnionUnion {
258 permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264 n_reader_variants: usize,
265 reader_null_variant: Option<usize>,
266 },
267 ResolveUnionConcrete {
269 index: usize,
270 inner: Box<SchemaPieceOrNamed>,
271 },
272 Record {
277 doc: Documentation,
278 fields: Vec<RecordField>,
279 lookup: BTreeMap<String, usize>,
280 },
281 Enum {
283 doc: Documentation,
284 symbols: Vec<String>,
285 default_idx: Option<usize>,
291 },
292 Fixed { size: usize },
294 ResolveRecord {
297 defaults: Vec<ResolvedDefaultValueField>,
300 fields: Vec<ResolvedRecordField>,
304 n_reader_fields: usize,
306 },
307 ResolveEnum {
310 doc: Documentation,
311 symbols: Vec<Result<(usize, String), String>>,
314 default: Option<(usize, String)>,
316 },
317}
318
319impl SchemaPiece {
320 pub fn is_underlying_int(&self) -> bool {
322 self.try_make_int_value(0).is_some()
323 }
324 pub fn is_underlying_long(&self) -> bool {
326 self.try_make_long_value(0).is_some()
327 }
328 pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331 match self {
332 SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333 SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336 _ => None,
337 }
338 }
339 pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342 match self {
343 SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344 SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345 SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346 _ => None,
347 }
348 }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356 pub(crate) named: Vec<NamedSchemaPiece>,
357 pub(crate) indices: BTreeMap<FullName, usize>,
358 pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362 fn to_string(&self) -> String {
363 let json = serde_json::to_value(self).unwrap();
364 json.to_string()
365 }
366}
367
368impl std::fmt::Debug for Schema {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 if f.alternate() {
371 f.write_str(
372 &serde_json::to_string_pretty(self)
373 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374 )
375 } else {
376 f.write_str(
377 &serde_json::to_string(self)
378 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379 )
380 }
381 }
382}
383
384impl Schema {
385 pub fn top_node(&self) -> SchemaNode<'_> {
386 let (inner, name) = self.top.get_piece_and_name(self);
387 SchemaNode {
388 root: self,
389 inner,
390 name,
391 }
392 }
393 pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394 SchemaNodeOrNamed {
395 root: self,
396 inner: self.top.as_ref(),
397 }
398 }
399 pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400 &self.named[idx]
401 }
402 pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403 self.indices.get(name).map(|&idx| &self.named[idx])
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416 Null,
418 Boolean,
419 Int,
420 Long,
421 Float,
422 Double,
423 Bytes,
425 String,
426 Array,
427 Map,
428 Union,
429 Record,
430 Enum,
431 Fixed,
432 Unknown,
435}
436
437impl SchemaKind {
438 pub fn name(self) -> &'static str {
439 match self {
440 SchemaKind::Null => "null",
441 SchemaKind::Boolean => "boolean",
442 SchemaKind::Int => "int",
443 SchemaKind::Long => "long",
444 SchemaKind::Float => "float",
445 SchemaKind::Double => "double",
446 SchemaKind::Bytes => "bytes",
447 SchemaKind::String => "string",
448 SchemaKind::Array => "array",
449 SchemaKind::Map => "map",
450 SchemaKind::Union => "union",
451 SchemaKind::Record => "record",
452 SchemaKind::Enum => "enum",
453 SchemaKind::Fixed => "fixed",
454 SchemaKind::Unknown => "unknown",
455 }
456 }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460 #[inline(always)]
461 fn from(piece: &'a SchemaPiece) -> SchemaKind {
462 match piece {
463 SchemaPiece::Null => SchemaKind::Null,
464 SchemaPiece::Boolean => SchemaKind::Boolean,
465 SchemaPiece::Int => SchemaKind::Int,
466 SchemaPiece::Long => SchemaKind::Long,
467 SchemaPiece::Float => SchemaKind::Float,
468 SchemaPiece::Double => SchemaKind::Double,
469 SchemaPiece::Date => SchemaKind::Int,
470 SchemaPiece::TimestampMilli
471 | SchemaPiece::TimestampMicro
472 | SchemaPiece::ResolveIntTsMilli
473 | SchemaPiece::ResolveDateTimestamp
474 | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475 SchemaPiece::Decimal {
476 fixed_size: None, ..
477 } => SchemaKind::Bytes,
478 SchemaPiece::Decimal {
479 fixed_size: Some(_),
480 ..
481 } => SchemaKind::Fixed,
482 SchemaPiece::Bytes => SchemaKind::Bytes,
483 SchemaPiece::String => SchemaKind::String,
484 SchemaPiece::Array(_) => SchemaKind::Array,
485 SchemaPiece::Map(_) => SchemaKind::Map,
486 SchemaPiece::Union(_) => SchemaKind::Union,
487 SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488 SchemaPiece::ResolveIntLong => SchemaKind::Long,
489 SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490 SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491 SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492 SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493 SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494 SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495 SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496 SchemaPiece::Record { .. } => SchemaKind::Record,
497 SchemaPiece::Enum { .. } => SchemaKind::Enum,
498 SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499 SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500 SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501 SchemaPiece::Json => SchemaKind::String,
502 SchemaPiece::Uuid => SchemaKind::String,
503 }
504 }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508 #[inline(always)]
509 fn from(schema: SchemaNode<'a>) -> SchemaKind {
510 SchemaKind::from(schema.inner)
511 }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515 #[inline(always)]
516 fn from(schema: &'a Schema) -> SchemaKind {
517 Self::from(schema.top_node())
518 }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533 pub name: String,
534 pub namespace: Option<String>,
535 pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540 name: String,
541 namespace: String,
542}
543
544impl FullName {
545 pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547 if let Some(ns) = namespace {
548 FullName {
549 name: name.to_owned(),
550 namespace: ns.to_owned(),
551 }
552 } else {
553 let mut split = name.rsplitn(2, '.');
554 let name = split.next().unwrap();
555 let namespace = split.next().unwrap_or(default_namespace);
556
557 FullName {
558 name: name.into(),
559 namespace: namespace.into(),
560 }
561 }
562 }
563 pub fn base_name(&self) -> &str {
564 &self.name
565 }
566 pub fn human_name(&self) -> String {
567 if self.namespace.is_empty() {
568 return self.name.clone();
569 }
570 format!("{}.{}", self.namespace, self.name)
571 }
572 pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577 if enclosing_ns == &self.namespace {
578 Cow::Borrowed(&self.name)
579 } else {
580 Cow::Owned(format!("{}.{}", self.namespace, self.name))
581 }
582 }
583 pub fn namespace(&self) -> &str {
585 &self.namespace
586 }
587}
588
589impl fmt::Debug for FullName {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 write!(f, "{}.{}", self.namespace, self.name)
592 }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599 pub fn is_valid(name: &str) -> bool {
603 static MATCHER: LazyLock<Regex> =
604 LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605 MATCHER.is_match(name)
606 }
607
608 pub fn validate(name: &str) -> Result<(), AvroError> {
610 match Self::is_valid(name) {
611 true => Ok(()),
612 false => {
613 Err(ParseSchemaError::new(format!(
614 "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615 name
616 )).into())
617 }
618 }
619 }
620
621 pub fn make_valid(name: &str) -> String {
629 let mut out = String::new();
630 let mut chars = name.chars();
631 match chars.next() {
632 Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633 Some(ch @ '0'..='9') => {
634 out.push('_');
635 out.push(ch);
636 }
637 _ => out.push('_'),
638 }
639 for ch in chars {
640 match ch {
641 '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642 _ => out.push('_'),
643 }
644 }
645 debug_assert!(
646 Name::is_valid(&out),
647 "make_valid({name}) produced invalid name: {out}"
648 );
649 out
650 }
651
652 pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654 let name = complex
655 .name()
656 .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657 if name.is_empty() {
658 return Err(ParseSchemaError::new(format!(
659 "Name cannot be the empty string: {:?}",
660 complex
661 ))
662 .into());
663 }
664
665 let (namespace, name) = if let Some(index) = name.rfind('.') {
666 let computed_namespace = name[..index].to_owned();
667 let computed_name = name[index + 1..].to_owned();
668 if let Some(provided_namespace) = complex.string("namespace") {
669 if provided_namespace != computed_namespace {
670 warn!(
671 "Found dots in name {}, updating to namespace {} and name {}",
672 name, computed_namespace, computed_name
673 );
674 }
675 }
676 (Some(computed_namespace), computed_name)
677 } else {
678 (complex.string("namespace"), name)
679 };
680
681 Self::validate(&name)?;
682
683 let aliases: Option<Vec<String>> = complex
684 .get("aliases")
685 .and_then(|aliases| aliases.as_array())
686 .and_then(|aliases| {
687 aliases
688 .iter()
689 .map(|alias| alias.as_str())
690 .map(|alias| alias.map(|a| a.to_string()))
691 .collect::<Option<_>>()
692 });
693
694 Ok(Name {
695 name,
696 namespace,
697 aliases,
698 })
699 }
700
701 pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703 let mut map = serde_json::map::Map::new();
704 map.insert("name".into(), serde_json::Value::String(name.into()));
705 Self::parse(&map)
706 }
707
708 pub fn fullname(&self, default_namespace: &str) -> FullName {
713 FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714 }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719 pub name: String,
720 pub doc: Documentation,
721 pub default: types::Value,
722 pub order: RecordFieldOrder,
723 pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728 Absent(Schema),
729 Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735 pub name: String,
737 pub doc: Documentation,
739 pub default: Option<Value>,
743 pub schema: SchemaPieceOrNamed,
745 pub order: RecordFieldOrder,
749 pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756 Ascending,
757 Descending,
758 Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765 schemas: Vec<SchemaPieceOrNamed>,
766
767 anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771 named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776 pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777 let mut avindex = BTreeMap::new();
778 let mut nvindex = BTreeMap::new();
779 for (i, schema) in schemas.iter().enumerate() {
780 match schema {
781 SchemaPieceOrNamed::Piece(sp) => {
782 if let SchemaPiece::Union(_) = sp {
783 return Err(ParseSchemaError::new(
784 "Unions may not directly contain a union",
785 )
786 .into());
787 }
788 let kind = SchemaKind::from(sp);
789 if avindex.insert(kind, i).is_some() {
790 return Err(
791 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792 );
793 }
794 }
795 SchemaPieceOrNamed::Named(idx) => {
796 if nvindex.insert(*idx, i).is_some() {
797 return Err(
798 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799 );
800 }
801 }
802 }
803 }
804 Ok(UnionSchema {
805 schemas,
806 anon_variant_index: avindex,
807 named_variant_index: nvindex,
808 })
809 }
810
811 pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813 &self.schemas
814 }
815
816 pub fn is_nullable(&self) -> bool {
818 !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
819 }
820
821 pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
822 self.anon_variant_index
823 .get(&SchemaKind::from(sp))
824 .map(|idx| (*idx, &self.schemas[*idx]))
825 }
826
827 pub fn match_ref(
828 &self,
829 other: SchemaPieceRefOrNamed,
830 names_map: &BTreeMap<usize, usize>,
831 ) -> Option<(usize, &SchemaPieceOrNamed)> {
832 match other {
833 SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
834 SchemaPieceRefOrNamed::Named(idx) => names_map
835 .get(&idx)
836 .and_then(|idx| self.named_variant_index.get(idx))
837 .map(|idx| (*idx, &self.schemas[*idx])),
838 }
839 }
840
841 #[inline(always)]
842 pub fn match_(
843 &self,
844 other: &SchemaPieceOrNamed,
845 names_map: &BTreeMap<usize, usize>,
846 ) -> Option<(usize, &SchemaPieceOrNamed)> {
847 self.match_ref(other.as_ref(), names_map)
848 }
849}
850
851impl PartialEq for UnionSchema {
853 fn eq(&self, other: &UnionSchema) -> bool {
854 self.schemas.eq(&other.schemas)
855 }
856}
857
858#[derive(Default)]
859struct SchemaParser {
860 named: Vec<Option<NamedSchemaPiece>>,
861 indices: BTreeMap<FullName, usize>,
862}
863
864impl SchemaParser {
865 fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
866 let top = self.parse_inner("", value)?;
867 let SchemaParser { named, indices } = self;
868 Ok(Schema {
869 named: named.into_iter().map(|o| o.unwrap()).collect(),
870 indices,
871 top,
872 })
873 }
874
875 fn parse_inner(
876 &mut self,
877 default_namespace: &str,
878 value: &Value,
879 ) -> Result<SchemaPieceOrNamed, AvroError> {
880 match *value {
881 Value::String(ref t) => {
882 let name = FullName::from_parts(t.as_str(), None, default_namespace);
883 if let Some(idx) = self.indices.get(&name) {
884 Ok(SchemaPieceOrNamed::Named(*idx))
885 } else {
886 Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
887 t.as_str(),
888 )?))
889 }
890 }
891 Value::Object(ref data) => self.parse_complex(default_namespace, data),
892 Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
893 self.parse_union(default_namespace, data)?,
894 )),
895 _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
896 }
897 }
898
899 fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
900 let idx = match self.indices.entry(fullname) {
901 Entry::Vacant(ve) => *ve.insert(self.named.len()),
902 Entry::Occupied(oe) => {
903 return Err(ParseSchemaError::new(format!(
904 "Sub-schema with name {:?} encountered multiple times",
905 oe.key()
906 ))
907 .into());
908 }
909 };
910 self.named.push(None);
911 Ok(idx)
912 }
913
914 fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
915 assert_none!(self.named[index]);
916 self.named[index] = Some(schema);
917 }
918
919 fn parse_named_type(
920 &mut self,
921 type_name: &str,
922 default_namespace: &str,
923 complex: &Map<String, Value>,
924 ) -> Result<usize, AvroError> {
925 let name = Name::parse(complex)?;
926 match name.name.as_str() {
927 "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
928 return Err(ParseSchemaError::new(format!(
929 "{} may not be used as a custom type name",
930 name.name
931 ))
932 .into());
933 }
934 _ => {}
935 };
936 let fullname = name.fullname(default_namespace);
937 let default_namespace = fullname.namespace.clone();
938 let idx = self.alloc_name(fullname.clone())?;
939 let piece = match type_name {
940 "record" => self.parse_record(&default_namespace, complex),
941 "enum" => self.parse_enum(complex),
942 "fixed" => self.parse_fixed(&default_namespace, complex),
943 _ => unreachable!("Unknown named type kind: {}", type_name),
944 }?;
945
946 self.insert(
947 idx,
948 NamedSchemaPiece {
949 name: fullname,
950 piece,
951 },
952 );
953
954 Ok(idx)
955 }
956
957 fn parse_complex(
963 &mut self,
964 default_namespace: &str,
965 complex: &Map<String, Value>,
966 ) -> Result<SchemaPieceOrNamed, AvroError> {
967 match complex.get("type") {
968 Some(&Value::String(ref t)) => Ok(match t.as_str() {
969 "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
970 t,
971 default_namespace,
972 complex,
973 )?),
974 "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
975 "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
976 "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
977 "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
978 "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
979 "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
980 other => {
981 let name = FullName {
982 name: other.into(),
983 namespace: default_namespace.into(),
984 };
985 if let Some(idx) = self.indices.get(&name) {
986 SchemaPieceOrNamed::Named(*idx)
987 } else {
988 SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
989 }
990 }
991 }),
992 Some(&Value::Object(ref data)) => match data.get("type") {
993 Some(value) => self.parse_inner(default_namespace, value),
994 None => Err(
995 ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
996 ),
997 },
998 _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
999 }
1000 }
1001
1002 fn parse_record(
1005 &mut self,
1006 default_namespace: &str,
1007 complex: &Map<String, Value>,
1008 ) -> Result<SchemaPiece, AvroError> {
1009 let mut lookup = BTreeMap::new();
1010
1011 let fields: Vec<RecordField> = complex
1012 .get("fields")
1013 .and_then(|fields| fields.as_array())
1014 .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1015 .and_then(|fields| {
1016 fields
1017 .iter()
1018 .filter_map(|field| field.as_object())
1019 .enumerate()
1020 .map(|(position, field)| {
1021 self.parse_record_field(default_namespace, field, position)
1022 })
1023 .collect::<Result<_, _>>()
1024 })?;
1025
1026 for field in &fields {
1027 lookup.insert(field.name.clone(), field.position);
1028 }
1029
1030 Ok(SchemaPiece::Record {
1031 doc: complex.doc(),
1032 fields,
1033 lookup,
1034 })
1035 }
1036
1037 fn parse_record_field(
1039 &mut self,
1040 default_namespace: &str,
1041 field: &Map<String, Value>,
1042 position: usize,
1043 ) -> Result<RecordField, AvroError> {
1044 let name = field
1045 .name()
1046 .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1047
1048 Name::validate(&name)?;
1049
1050 let schema = field
1051 .get("type")
1052 .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1053 .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1054
1055 let default = field.get("default").cloned();
1056
1057 let order = field
1058 .get("order")
1059 .and_then(|order| order.as_str())
1060 .and_then(|order| match order {
1061 "ascending" => Some(RecordFieldOrder::Ascending),
1062 "descending" => Some(RecordFieldOrder::Descending),
1063 "ignore" => Some(RecordFieldOrder::Ignore),
1064 _ => None,
1065 })
1066 .unwrap_or(RecordFieldOrder::Ascending);
1067
1068 Ok(RecordField {
1069 name,
1070 doc: field.doc(),
1071 default,
1072 schema,
1073 order,
1074 position,
1075 })
1076 }
1077
1078 fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1081 let symbols: Vec<String> = complex
1082 .get("symbols")
1083 .and_then(|v| v.as_array())
1084 .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1085 .and_then(|symbols| {
1086 symbols
1087 .iter()
1088 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1089 .collect::<Option<_>>()
1090 .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1091 })?;
1092
1093 let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1094 for symbol in symbols.iter() {
1095 if unique_symbols.contains(symbol) {
1096 return Err(ParseSchemaError::new(format!(
1097 "Enum symbols must be unique, found multiple: {}",
1098 symbol
1099 ))
1100 .into());
1101 } else {
1102 unique_symbols.insert(symbol);
1103 }
1104 }
1105
1106 let default_idx = if let Some(default) = complex.get("default") {
1107 let default_str = default.as_str().ok_or_else(|| {
1108 ParseSchemaError::new(format!(
1109 "Enum default should be a string, got: {:?}",
1110 default
1111 ))
1112 })?;
1113 let default_idx = symbols
1114 .iter()
1115 .position(|x| x == default_str)
1116 .ok_or_else(|| {
1117 ParseSchemaError::new(format!(
1118 "Enum default not found in list of symbols: {}",
1119 default_str
1120 ))
1121 })?;
1122 Some(default_idx)
1123 } else {
1124 None
1125 };
1126
1127 Ok(SchemaPiece::Enum {
1128 doc: complex.doc(),
1129 symbols,
1130 default_idx,
1131 })
1132 }
1133
1134 fn parse_array(
1137 &mut self,
1138 default_namespace: &str,
1139 complex: &Map<String, Value>,
1140 ) -> Result<SchemaPiece, AvroError> {
1141 complex
1142 .get("items")
1143 .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1144 .and_then(|items| self.parse_inner(default_namespace, items))
1145 .map(|schema| SchemaPiece::Array(Box::new(schema)))
1146 }
1147
1148 fn parse_map(
1151 &mut self,
1152 default_namespace: &str,
1153 complex: &Map<String, Value>,
1154 ) -> Result<SchemaPiece, AvroError> {
1155 complex
1156 .get("values")
1157 .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1158 .and_then(|items| self.parse_inner(default_namespace, items))
1159 .map(|schema| SchemaPiece::Map(Box::new(schema)))
1160 }
1161
1162 fn parse_union(
1165 &mut self,
1166 default_namespace: &str,
1167 items: &[Value],
1168 ) -> Result<SchemaPiece, AvroError> {
1169 items
1170 .iter()
1171 .map(|value| self.parse_inner(default_namespace, value))
1172 .collect::<Result<Vec<_>, _>>()
1173 .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1174 }
1175
1176 fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1179 let precision = complex
1180 .get("precision")
1181 .and_then(|v| v.as_i64())
1182 .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1183
1184 let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1185
1186 if scale < 0 {
1187 return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1188 }
1189
1190 if precision < 0 {
1191 return Err(
1192 ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1193 );
1194 }
1195
1196 if scale > precision {
1197 return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1198 }
1199
1200 Ok((precision as usize, scale as usize))
1201 }
1202
1203 fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1206 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1207
1208 if let Some("decimal") = logical_type {
1209 match Self::parse_decimal(complex) {
1210 Ok((precision, scale)) => {
1211 return Ok(SchemaPiece::Decimal {
1212 precision,
1213 scale,
1214 fixed_size: None,
1215 });
1216 }
1217 Err(e) => warn!(
1218 "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1219 complex, e
1220 ),
1221 }
1222 }
1223
1224 Ok(SchemaPiece::Bytes)
1225 }
1226
1227 fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1235 const AVRO_DATE: &str = "date";
1236 const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1237 const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1238 if let Some(name) = complex.get("connect.name") {
1239 if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1240 if name == KAFKA_DATE {
1241 warn!("using deprecated debezium date format");
1242 }
1243 return Ok(SchemaPiece::Date);
1244 }
1245 }
1246 if let Some(name) = complex.get("logicalType") {
1250 if name == AVRO_DATE {
1251 return Ok(SchemaPiece::Date);
1252 }
1253 }
1254 if !complex.is_empty() {
1255 debug!("parsing complex type as regular int: {:?}", complex);
1256 }
1257 Ok(SchemaPiece::Int)
1258 }
1259
1260 fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1268 const AVRO_MILLI_TS: &str = "timestamp-millis";
1269 const AVRO_MICRO_TS: &str = "timestamp-micros";
1270
1271 const CONNECT_MILLI_TS: &[&str] = &[
1272 "io.debezium.time.Timestamp",
1273 "org.apache.kafka.connect.data.Timestamp",
1274 ];
1275 const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1276
1277 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1278 if CONNECT_MILLI_TS.contains(&&**name) {
1279 return Ok(SchemaPiece::TimestampMilli);
1280 }
1281 if name == CONNECT_MICRO_TS {
1282 return Ok(SchemaPiece::TimestampMicro);
1283 }
1284 }
1285 if let Some(name) = complex.get("logicalType") {
1286 if name == AVRO_MILLI_TS {
1287 return Ok(SchemaPiece::TimestampMilli);
1288 }
1289 if name == AVRO_MICRO_TS {
1290 return Ok(SchemaPiece::TimestampMicro);
1291 }
1292 }
1293 if !complex.is_empty() {
1294 debug!("parsing complex type as regular long: {:?}", complex);
1295 }
1296 Ok(SchemaPiece::Long)
1297 }
1298
1299 fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1300 const CONNECT_JSON: &str = "io.debezium.data.Json";
1301
1302 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1303 if CONNECT_JSON == name.as_str() {
1304 return SchemaPiece::Json;
1305 }
1306 }
1307 if let Some(name) = complex.get("logicalType") {
1308 if name == "uuid" {
1309 return SchemaPiece::Uuid;
1310 }
1311 }
1312 debug!("parsing complex type as regular string: {:?}", complex);
1313 SchemaPiece::String
1314 }
1315
1316 fn parse_fixed(
1319 &self,
1320 _default_namespace: &str,
1321 complex: &Map<String, Value>,
1322 ) -> Result<SchemaPiece, AvroError> {
1323 let _name = Name::parse(complex)?;
1324
1325 let size = complex
1326 .get("size")
1327 .and_then(|v| v.as_i64())
1328 .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1329 if size <= 0 {
1330 return Err(ParseSchemaError::new(format!(
1331 "Fixed values require a positive size attribute, found: {}",
1332 size
1333 ))
1334 .into());
1335 }
1336
1337 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1338
1339 if let Some("decimal") = logical_type {
1340 match Self::parse_decimal(complex) {
1341 Ok((precision, scale)) => {
1342 let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1343 if precision > max {
1344 warn!(
1345 "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1346 precision, size
1347 );
1348 } else {
1349 return Ok(SchemaPiece::Decimal {
1350 precision,
1351 scale,
1352 fixed_size: Some(size as usize),
1353 });
1354 }
1355 }
1356 Err(e) => warn!(
1357 "parsing decimal as fixed due to parse error: {:?}, {:?}",
1358 complex, e
1359 ),
1360 }
1361 }
1362
1363 Ok(SchemaPiece::Fixed {
1364 size: size as usize,
1365 })
1366 }
1367}
1368
1369impl Schema {
1370 pub fn parse(value: &Value) -> Result<Self, AvroError> {
1373 let p = SchemaParser {
1374 named: vec![],
1375 indices: Default::default(),
1376 };
1377 p.parse(value)
1378 }
1379
1380 pub fn canonical_form(&self) -> String {
1385 let json = serde_json::to_value(self).unwrap();
1386 parsing_canonical_form(&json)
1387 }
1388
1389 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1394 let mut d = D::new();
1395 d.update(self.canonical_form());
1396 SchemaFingerprint {
1397 bytes: d.finalize().to_vec(),
1398 }
1399 }
1400
1401 fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1404 match primitive {
1405 "null" => Ok(SchemaPiece::Null),
1406 "boolean" => Ok(SchemaPiece::Boolean),
1407 "int" => Ok(SchemaPiece::Int),
1408 "long" => Ok(SchemaPiece::Long),
1409 "double" => Ok(SchemaPiece::Double),
1410 "float" => Ok(SchemaPiece::Float),
1411 "bytes" => Ok(SchemaPiece::Bytes),
1412 "string" => Ok(SchemaPiece::String),
1413 other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1414 }
1415 }
1416}
1417
1418impl FromStr for Schema {
1419 type Err = AvroError;
1420
1421 fn from_str(input: &str) -> Result<Self, AvroError> {
1423 let value = serde_json::from_str(input)
1424 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1425 Self::parse(&value)
1426 }
1427}
1428
1429#[derive(Clone, Debug, PartialEq)]
1430pub struct NamedSchemaPiece {
1431 pub name: FullName,
1432 pub piece: SchemaPiece,
1433}
1434
1435#[derive(Copy, Clone, Debug)]
1436pub struct SchemaNode<'a> {
1437 pub root: &'a Schema,
1438 pub inner: &'a SchemaPiece,
1439 pub name: Option<&'a FullName>,
1440}
1441
1442#[derive(Copy, Clone, Debug)]
1443pub enum SchemaPieceRefOrNamed<'a> {
1444 Piece(&'a SchemaPiece),
1445 Named(usize),
1446}
1447
1448impl<'a> SchemaPieceRefOrNamed<'a> {
1449 pub fn get_human_name(&self, root: &Schema) -> String {
1450 match self {
1451 Self::Piece(piece) => format!("{:?}", piece),
1452 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1453 }
1454 }
1455
1456 #[inline(always)]
1457 pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1458 match self {
1459 SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1460 SchemaPieceRefOrNamed::Named(index) => {
1461 let named_piece = root.lookup(index);
1462 (&named_piece.piece, Some(&named_piece.name))
1463 }
1464 }
1465 }
1466}
1467
1468#[derive(Copy, Clone, Debug)]
1469pub struct SchemaNodeOrNamed<'a> {
1470 pub root: &'a Schema,
1471 pub inner: SchemaPieceRefOrNamed<'a>,
1472}
1473
1474impl<'a> SchemaNodeOrNamed<'a> {
1475 #[inline(always)]
1476 pub fn lookup(self) -> SchemaNode<'a> {
1477 let (inner, name) = self.inner.get_piece_and_name(self.root);
1478 SchemaNode {
1479 root: self.root,
1480 inner,
1481 name,
1482 }
1483 }
1484 #[inline(always)]
1485 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1486 self.step_ref(next.as_ref())
1487 }
1488 #[inline(always)]
1489 pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1490 Self {
1491 root: self.root,
1492 inner: match next {
1493 SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1494 SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1495 },
1496 }
1497 }
1498
1499 pub fn to_schema(self) -> Schema {
1500 let mut cloner = SchemaSubtreeDeepCloner {
1501 old_root: self.root,
1502 old_to_new_names: Default::default(),
1503 named: Default::default(),
1504 };
1505 let piece = cloner.clone_piece_or_named(self.inner);
1506 let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1507 let indices: BTreeMap<FullName, usize> = named
1508 .iter()
1509 .enumerate()
1510 .map(|(i, nsp)| (nsp.name.clone(), i))
1511 .collect();
1512 Schema {
1513 named,
1514 indices,
1515 top: piece,
1516 }
1517 }
1518
1519 pub fn namespace(self) -> Option<&'a str> {
1520 let SchemaNode { name, .. } = self.lookup();
1521 name.map(|FullName { namespace, .. }| namespace.as_str())
1522 }
1523}
1524
1525struct SchemaSubtreeDeepCloner<'a> {
1526 old_root: &'a Schema,
1527 old_to_new_names: BTreeMap<usize, usize>,
1528 named: Vec<Option<NamedSchemaPiece>>,
1529}
1530
1531impl<'a> SchemaSubtreeDeepCloner<'a> {
1532 fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1533 match piece {
1534 SchemaPiece::Null => SchemaPiece::Null,
1535 SchemaPiece::Boolean => SchemaPiece::Boolean,
1536 SchemaPiece::Int => SchemaPiece::Int,
1537 SchemaPiece::Long => SchemaPiece::Long,
1538 SchemaPiece::Float => SchemaPiece::Float,
1539 SchemaPiece::Double => SchemaPiece::Double,
1540 SchemaPiece::Date => SchemaPiece::Date,
1541 SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1542 SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1543 SchemaPiece::Json => SchemaPiece::Json,
1544 SchemaPiece::Decimal {
1545 scale,
1546 precision,
1547 fixed_size,
1548 } => SchemaPiece::Decimal {
1549 scale: *scale,
1550 precision: *precision,
1551 fixed_size: *fixed_size,
1552 },
1553 SchemaPiece::Bytes => SchemaPiece::Bytes,
1554 SchemaPiece::String => SchemaPiece::String,
1555 SchemaPiece::Uuid => SchemaPiece::Uuid,
1556 SchemaPiece::Array(inner) => {
1557 SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1558 }
1559 SchemaPiece::Map(inner) => {
1560 SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1561 }
1562 SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1563 schemas: us
1564 .schemas
1565 .iter()
1566 .map(|s| self.clone_piece_or_named(s.as_ref()))
1567 .collect(),
1568 anon_variant_index: us.anon_variant_index.clone(),
1569 named_variant_index: us.named_variant_index.clone(),
1570 }),
1571 SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1572 SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1573 SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1574 SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1575 SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1576 SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1577 SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1578 SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1579 SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1580 SchemaPiece::ResolveConcreteUnion {
1581 index,
1582 inner,
1583 n_reader_variants,
1584 reader_null_variant,
1585 } => SchemaPiece::ResolveConcreteUnion {
1586 index: *index,
1587 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1588 n_reader_variants: *n_reader_variants,
1589 reader_null_variant: *reader_null_variant,
1590 },
1591 SchemaPiece::ResolveUnionUnion {
1592 permutation,
1593 n_reader_variants,
1594 reader_null_variant,
1595 } => SchemaPiece::ResolveUnionUnion {
1596 permutation: permutation
1597 .clone()
1598 .into_iter()
1599 .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1600 .collect(),
1601 n_reader_variants: *n_reader_variants,
1602 reader_null_variant: *reader_null_variant,
1603 },
1604 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1605 SchemaPiece::ResolveUnionConcrete {
1606 index: *index,
1607 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1608 }
1609 }
1610 SchemaPiece::Record {
1611 doc,
1612 fields,
1613 lookup,
1614 } => SchemaPiece::Record {
1615 doc: doc.clone(),
1616 fields: fields
1617 .iter()
1618 .map(|rf| RecordField {
1619 name: rf.name.clone(),
1620 doc: rf.doc.clone(),
1621 default: rf.default.clone(),
1622 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1623 order: rf.order,
1624 position: rf.position,
1625 })
1626 .collect(),
1627 lookup: lookup.clone(),
1628 },
1629 SchemaPiece::Enum {
1630 doc,
1631 symbols,
1632 default_idx,
1633 } => SchemaPiece::Enum {
1634 doc: doc.clone(),
1635 symbols: symbols.clone(),
1636 default_idx: *default_idx,
1637 },
1638 SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1639 SchemaPiece::ResolveRecord {
1640 defaults,
1641 fields,
1642 n_reader_fields,
1643 } => SchemaPiece::ResolveRecord {
1644 defaults: defaults.clone(),
1645 fields: fields
1646 .iter()
1647 .map(|rf| match rf {
1648 ResolvedRecordField::Present(rf) => {
1649 ResolvedRecordField::Present(RecordField {
1650 name: rf.name.clone(),
1651 doc: rf.doc.clone(),
1652 default: rf.default.clone(),
1653 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1654 order: rf.order,
1655 position: rf.position,
1656 })
1657 }
1658 ResolvedRecordField::Absent(writer_schema) => {
1659 ResolvedRecordField::Absent(writer_schema.clone())
1660 }
1661 })
1662 .collect(),
1663 n_reader_fields: *n_reader_fields,
1664 },
1665 SchemaPiece::ResolveEnum {
1666 doc,
1667 symbols,
1668 default,
1669 } => SchemaPiece::ResolveEnum {
1670 doc: doc.clone(),
1671 symbols: symbols.clone(),
1672 default: default.clone(),
1673 },
1674 }
1675 }
1676 fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1677 match piece {
1678 SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1679 SchemaPieceRefOrNamed::Named(index) => {
1680 let new_index = match self.old_to_new_names.entry(index) {
1681 Entry::Vacant(ve) => {
1682 let new_index = self.named.len();
1683 self.named.push(None);
1684 ve.insert(new_index);
1685 let old_named_piece = self.old_root.lookup(index);
1686 let new_named_piece = NamedSchemaPiece {
1687 name: old_named_piece.name.clone(),
1688 piece: self.clone_piece(&old_named_piece.piece),
1689 };
1690 self.named[new_index] = Some(new_named_piece);
1691 new_index
1692 }
1693 Entry::Occupied(oe) => *oe.get(),
1694 };
1695 SchemaPieceOrNamed::Named(new_index)
1696 }
1697 }
1698 }
1699}
1700
1701impl<'a> SchemaNode<'a> {
1702 #[inline(always)]
1703 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1704 let (inner, name) = next.get_piece_and_name(self.root);
1705 Self {
1706 root: self.root,
1707 inner,
1708 name,
1709 }
1710 }
1711
1712 pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1713 use serde_json::Value::*;
1714 let val = match (json, self.inner) {
1715 (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1717 Some(variant) => AvroValue::Union {
1718 index: 0,
1719 inner: Box::new(self.step(variant).json_to_value(json)?),
1720 n_variants: us.schemas.len(),
1721 null_variant: us
1722 .schemas
1723 .iter()
1724 .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1725 },
1726 None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1727 },
1728 (Null, SchemaPiece::Null) => AvroValue::Null,
1729 (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1730 (Number(n), piece) => {
1731 match piece {
1732 piece if piece.is_underlying_int() => {
1733 let i =
1734 n.as_i64()
1735 .and_then(|i| i32::try_from(i).ok())
1736 .ok_or_else(|| {
1737 ParseSchemaError(format!("{} is not a 32-bit integer", n))
1738 })?;
1739 piece.try_make_int_value(i).unwrap().map_err(|e| {
1740 ParseSchemaError(format!("invalid default int {i}: {e}"))
1741 })?
1742 }
1743 piece if piece.is_underlying_long() => {
1744 let i = n.as_i64().ok_or_else(|| {
1745 ParseSchemaError(format!("{} is not a 64-bit integer", n))
1746 })?;
1747 piece.try_make_long_value(i).unwrap().map_err(|e| {
1748 ParseSchemaError(format!("invalid default long {i}: {e}"))
1749 })?
1750 }
1751 SchemaPiece::Float => {
1752 let f = n.as_f64().ok_or_else(|| {
1753 ParseSchemaError(format!("{} is not a 32-bit float", n))
1754 })?;
1755 AvroValue::Float(f as f32)
1756 }
1757 SchemaPiece::Double => {
1758 let f = n.as_f64().ok_or_else(|| {
1759 ParseSchemaError(format!("{} is not a 64-bit float", n))
1760 })?;
1761 AvroValue::Double(f)
1762 }
1763 _ => {
1764 return Err(ParseSchemaError(format!(
1765 "Unexpected number in default: {}",
1766 n
1767 )));
1768 }
1769 }
1770 }
1771 (String(s), piece)
1772 if s.eq_ignore_ascii_case("nan")
1773 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1774 {
1775 match piece {
1776 SchemaPiece::Float => AvroValue::Float(f32::NAN),
1777 SchemaPiece::Double => AvroValue::Double(f64::NAN),
1778 _ => unreachable!(),
1779 }
1780 }
1781 (String(s), piece)
1782 if s.eq_ignore_ascii_case("infinity")
1783 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1784 {
1785 match piece {
1786 SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1787 SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1788 _ => unreachable!(),
1789 }
1790 }
1791 (String(s), piece)
1792 if s.eq_ignore_ascii_case("-infinity")
1793 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1794 {
1795 match piece {
1796 SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1797 SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1798 _ => unreachable!(),
1799 }
1800 }
1801 (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1802 (
1803 String(s),
1804 SchemaPiece::Decimal {
1805 precision, scale, ..
1806 },
1807 ) => AvroValue::Decimal(DecimalValue {
1808 precision: *precision,
1809 scale: *scale,
1810 unscaled: s.clone().into_bytes(),
1811 }),
1812 (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1813 (Object(map), SchemaPiece::Record { fields, .. }) => {
1814 let field_values = fields
1815 .iter()
1816 .map(|rf| {
1817 let jval = map.get(&rf.name).ok_or_else(|| {
1818 ParseSchemaError(format!(
1819 "Field not found in default value: {}",
1820 rf.name
1821 ))
1822 })?;
1823 let value = self.step(&rf.schema).json_to_value(jval)?;
1824 Ok((rf.name.clone(), value))
1825 })
1826 .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1827 AvroValue::Record(field_values)
1828 }
1829 (String(s), SchemaPiece::Enum { symbols, .. }) => {
1830 match symbols.iter().find_position(|sym| s == *sym) {
1831 Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1832 None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1833 }
1834 }
1835 (Array(vals), SchemaPiece::Array(inner)) => {
1836 let node = self.step(&**inner);
1837 let vals = vals
1838 .iter()
1839 .map(|val| node.json_to_value(val))
1840 .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1841 AvroValue::Array(vals)
1842 }
1843 (Object(map), SchemaPiece::Map(inner)) => {
1844 let node = self.step(&**inner);
1845 let map = map
1846 .iter()
1847 .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1848 .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1849 AvroValue::Map(map)
1850 }
1851 (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1852 AvroValue::Fixed(*size, s.clone().into_bytes())
1853 }
1854 _ => {
1855 return Err(ParseSchemaError(format!(
1856 "Json default value {} does not match schema",
1857 json
1858 )));
1859 }
1860 };
1861 Ok(val)
1862 }
1863}
1864
1865#[derive(Clone)]
1866struct SchemaSerContext<'a> {
1867 node: SchemaNodeOrNamed<'a>,
1868 seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1873 enclosing_ns: &'a str,
1875}
1876
1877#[derive(Clone)]
1878struct RecordFieldSerContext<'a> {
1879 outer: &'a SchemaSerContext<'a>,
1880 inner: &'a RecordField,
1881}
1882
1883impl<'a> SchemaSerContext<'a> {
1884 fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1885 let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1886 Self {
1887 node: self.node.step_ref(next),
1888 seen_named: Rc::clone(&self.seen_named),
1889 enclosing_ns: ns,
1890 }
1891 }
1892}
1893
1894impl<'a> Serialize for SchemaSerContext<'a> {
1895 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1896 where
1897 S: Serializer,
1898 {
1899 match self.node.inner {
1900 SchemaPieceRefOrNamed::Piece(piece) => match piece {
1901 SchemaPiece::Null => serializer.serialize_str("null"),
1902 SchemaPiece::Boolean => serializer.serialize_str("boolean"),
1903 SchemaPiece::Int => serializer.serialize_str("int"),
1904 SchemaPiece::Long => serializer.serialize_str("long"),
1905 SchemaPiece::Float => serializer.serialize_str("float"),
1906 SchemaPiece::Double => serializer.serialize_str("double"),
1907 SchemaPiece::Date => {
1908 let mut map = serializer.serialize_map(Some(2))?;
1909 map.serialize_entry("type", "int")?;
1910 map.serialize_entry("logicalType", "date")?;
1911 map.end()
1912 }
1913 SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
1914 let mut map = serializer.serialize_map(Some(2))?;
1915 map.serialize_entry("type", "long")?;
1916 if piece == &SchemaPiece::TimestampMilli {
1917 map.serialize_entry("logicalType", "timestamp-millis")?;
1918 } else {
1919 map.serialize_entry("logicalType", "timestamp-micros")?;
1920 }
1921 map.end()
1922 }
1923 SchemaPiece::Decimal {
1924 precision,
1925 scale,
1926 fixed_size: None,
1927 } => {
1928 let mut map = serializer.serialize_map(Some(4))?;
1929 map.serialize_entry("type", "bytes")?;
1930 map.serialize_entry("precision", precision)?;
1931 map.serialize_entry("scale", scale)?;
1932 map.serialize_entry("logicalType", "decimal")?;
1933 map.end()
1934 }
1935 SchemaPiece::Bytes => serializer.serialize_str("bytes"),
1936 SchemaPiece::String => serializer.serialize_str("string"),
1937 SchemaPiece::Array(inner) => {
1938 let mut map = serializer.serialize_map(Some(2))?;
1939 map.serialize_entry("type", "array")?;
1940 map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
1941 map.end()
1942 }
1943 SchemaPiece::Map(inner) => {
1944 let mut map = serializer.serialize_map(Some(2))?;
1945 map.serialize_entry("type", "map")?;
1946 map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
1947 map.end()
1948 }
1949 SchemaPiece::Union(inner) => {
1950 let variants = inner.variants();
1951 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
1952 for v in variants {
1953 seq.serialize_element(&self.step(v.as_ref()))?;
1954 }
1955 seq.end()
1956 }
1957 SchemaPiece::Json => {
1958 let mut map = serializer.serialize_map(Some(2))?;
1959 map.serialize_entry("type", "string")?;
1960 map.serialize_entry("connect.name", "io.debezium.data.Json")?;
1961 map.end()
1962 }
1963 SchemaPiece::Uuid => {
1964 let mut map = serializer.serialize_map(Some(4))?;
1965 map.serialize_entry("type", "string")?;
1966 map.serialize_entry("logicalType", "uuid")?;
1967 map.end()
1968 }
1969 SchemaPiece::Record { .. }
1970 | SchemaPiece::Decimal {
1971 fixed_size: Some(_),
1972 ..
1973 }
1974 | SchemaPiece::Enum { .. }
1975 | SchemaPiece::Fixed { .. } => {
1976 unreachable!("Unexpected named schema piece in anonymous schema position")
1977 }
1978 SchemaPiece::ResolveIntLong
1979 | SchemaPiece::ResolveDateTimestamp
1980 | SchemaPiece::ResolveIntFloat
1981 | SchemaPiece::ResolveIntDouble
1982 | SchemaPiece::ResolveLongFloat
1983 | SchemaPiece::ResolveLongDouble
1984 | SchemaPiece::ResolveFloatDouble
1985 | SchemaPiece::ResolveConcreteUnion { .. }
1986 | SchemaPiece::ResolveUnionUnion { .. }
1987 | SchemaPiece::ResolveUnionConcrete { .. }
1988 | SchemaPiece::ResolveRecord { .. }
1989 | SchemaPiece::ResolveIntTsMicro
1990 | SchemaPiece::ResolveIntTsMilli
1991 | SchemaPiece::ResolveEnum { .. } => {
1992 panic!("Attempted to serialize resolved schema")
1993 }
1994 },
1995 SchemaPieceRefOrNamed::Named(index) => {
1996 let mut map = self.seen_named.borrow_mut();
1997 let named_piece = match map.get(&index) {
1998 Some(name) => {
1999 return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2000 }
2001 None => self.node.root.lookup(index),
2002 };
2003 let name = &named_piece.name;
2004 map.insert(index, name.clone());
2005 std::mem::drop(map);
2006 match &named_piece.piece {
2007 SchemaPiece::Record { doc, fields, .. } => {
2008 let mut map = serializer.serialize_map(None)?;
2009 map.serialize_entry("type", "record")?;
2010 map.serialize_entry("name", &name.name)?;
2011 if self.enclosing_ns != &name.namespace {
2012 map.serialize_entry("namespace", &name.namespace)?;
2013 }
2014 if let Some(docstr) = doc {
2015 map.serialize_entry("doc", docstr)?;
2016 }
2017 map.serialize_entry(
2019 "fields",
2020 &fields
2021 .iter()
2022 .map(|f| RecordFieldSerContext {
2023 outer: self,
2024 inner: f,
2025 })
2026 .collect::<Vec<_>>(),
2027 )?;
2028 map.end()
2029 }
2030 SchemaPiece::Enum {
2031 symbols,
2032 default_idx,
2033 ..
2034 } => {
2035 let mut map = serializer.serialize_map(None)?;
2036 map.serialize_entry("type", "enum")?;
2037 map.serialize_entry("name", &name.name)?;
2038 if self.enclosing_ns != &name.namespace {
2039 map.serialize_entry("namespace", &name.namespace)?;
2040 }
2041 map.serialize_entry("symbols", symbols)?;
2042 if let Some(default_idx) = *default_idx {
2043 assert!(default_idx < symbols.len());
2044 map.serialize_entry("default", &symbols[default_idx])?;
2045 }
2046 map.end()
2047 }
2048 SchemaPiece::Fixed { size } => {
2049 let mut map = serializer.serialize_map(None)?;
2050 map.serialize_entry("type", "fixed")?;
2051 map.serialize_entry("name", &name.name)?;
2052 if self.enclosing_ns != &name.namespace {
2053 map.serialize_entry("namespace", &name.namespace)?;
2054 }
2055 map.serialize_entry("size", size)?;
2056 map.end()
2057 }
2058 SchemaPiece::Decimal {
2059 scale,
2060 precision,
2061 fixed_size: Some(size),
2062 } => {
2063 let mut map = serializer.serialize_map(Some(6))?;
2064 map.serialize_entry("type", "fixed")?;
2065 map.serialize_entry("logicalType", "decimal")?;
2066 map.serialize_entry("name", &name.name)?;
2067 if self.enclosing_ns != &name.namespace {
2068 map.serialize_entry("namespace", &name.namespace)?;
2069 }
2070 map.serialize_entry("size", size)?;
2071 map.serialize_entry("precision", precision)?;
2072 map.serialize_entry("scale", scale)?;
2073 map.end()
2074 }
2075 SchemaPiece::Null
2076 | SchemaPiece::Boolean
2077 | SchemaPiece::Int
2078 | SchemaPiece::Long
2079 | SchemaPiece::Float
2080 | SchemaPiece::Double
2081 | SchemaPiece::Date
2082 | SchemaPiece::TimestampMilli
2083 | SchemaPiece::TimestampMicro
2084 | SchemaPiece::Decimal {
2085 fixed_size: None, ..
2086 }
2087 | SchemaPiece::Bytes
2088 | SchemaPiece::String
2089 | SchemaPiece::Array(_)
2090 | SchemaPiece::Map(_)
2091 | SchemaPiece::Union(_)
2092 | SchemaPiece::Uuid
2093 | SchemaPiece::Json => {
2094 unreachable!("Unexpected anonymous schema piece in named schema position")
2095 }
2096 SchemaPiece::ResolveIntLong
2097 | SchemaPiece::ResolveDateTimestamp
2098 | SchemaPiece::ResolveIntFloat
2099 | SchemaPiece::ResolveIntDouble
2100 | SchemaPiece::ResolveLongFloat
2101 | SchemaPiece::ResolveLongDouble
2102 | SchemaPiece::ResolveFloatDouble
2103 | SchemaPiece::ResolveConcreteUnion { .. }
2104 | SchemaPiece::ResolveUnionUnion { .. }
2105 | SchemaPiece::ResolveUnionConcrete { .. }
2106 | SchemaPiece::ResolveRecord { .. }
2107 | SchemaPiece::ResolveIntTsMilli
2108 | SchemaPiece::ResolveIntTsMicro
2109 | SchemaPiece::ResolveEnum { .. } => {
2110 panic!("Attempted to serialize resolved schema")
2111 }
2112 }
2113 }
2114 }
2115 }
2116}
2117
2118impl Serialize for Schema {
2119 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2120 where
2121 S: Serializer,
2122 {
2123 let ctx = SchemaSerContext {
2124 node: SchemaNodeOrNamed {
2125 root: self,
2126 inner: self.top.as_ref(),
2127 },
2128 seen_named: Rc::new(RefCell::new(Default::default())),
2129 enclosing_ns: "",
2130 };
2131 ctx.serialize(serializer)
2132 }
2133}
2134
2135impl<'a> Serialize for RecordFieldSerContext<'a> {
2136 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2137 where
2138 S: Serializer,
2139 {
2140 let mut map = serializer.serialize_map(None)?;
2141 map.serialize_entry("name", &self.inner.name)?;
2142 map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2143 if let Some(default) = &self.inner.default {
2144 map.serialize_entry("default", default)?;
2145 }
2146 if let Some(doc) = &self.inner.doc {
2147 map.serialize_entry("doc", doc)?;
2148 }
2149 map.end()
2150 }
2151}
2152
2153fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2156 pcf(schema, "", false)
2157}
2158
2159fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2160 match schema {
2161 serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2162 serde_json::Value::String(s) => pcf_string(s),
2163 serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2164 serde_json::Value::Number(n) => n.to_string(),
2165 _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2166 }
2167}
2168
2169fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2170 let default_ns = schema
2172 .get("namespace")
2173 .and_then(|v| v.as_str())
2174 .unwrap_or(enclosing_ns);
2175 let mut fields = Vec::new();
2176 let mut found_next_ns = None;
2177 let mut deferred_values = vec![];
2178 for (k, v) in schema {
2179 if schema.len() == 1 && k == "type" {
2181 if let serde_json::Value::String(s) = v {
2183 return pcf_string(s);
2184 }
2185 }
2186
2187 if field_ordering_position(k).is_none() {
2189 continue;
2190 }
2191
2192 if k == "name" {
2194 if in_fields {
2197 fields.push((
2198 k,
2199 format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2200 ));
2201 continue;
2202 }
2203 let name = v.as_str().unwrap();
2205 assert!(
2206 found_next_ns.is_none(),
2207 "`name` must not be specified multiple times"
2208 );
2209 let next_ns = match name.rsplit_once('.') {
2210 None => default_ns,
2211 Some((ns, _name)) => ns,
2212 };
2213 found_next_ns = Some(next_ns);
2214 let n = if next_ns.is_empty() {
2215 Cow::Borrowed(name)
2216 } else {
2217 Cow::Owned(format!("{next_ns}.{name}"))
2218 };
2219 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2220 continue;
2221 }
2222
2223 if k == "size" {
2225 let i = match v.as_str() {
2226 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2227 None => v.as_i64().unwrap(),
2228 };
2229 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2230 continue;
2231 }
2232
2233 deferred_values.push((k, v));
2236 }
2237
2238 let next_ns = found_next_ns.unwrap_or(default_ns);
2239 for (k, v) in deferred_values {
2240 fields.push((
2241 k,
2242 format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2243 ));
2244 }
2245
2246 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2248 let inter = fields
2249 .into_iter()
2250 .map(|(_, v)| v)
2251 .collect::<Vec<_>>()
2252 .join(",");
2253 format!("{{{}}}", inter)
2254}
2255
2256fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2257 let inter = arr
2258 .iter()
2259 .map(|s| pcf(s, enclosing_ns, in_fields))
2260 .collect::<Vec<String>>()
2261 .join(",");
2262 format!("[{}]", inter)
2263}
2264
2265fn pcf_string(s: &str) -> String {
2266 format!("\"{}\"", s)
2267}
2268
2269fn field_ordering_position(field: &str) -> Option<usize> {
2271 let v = match field {
2272 "name" => 1,
2273 "type" => 2,
2274 "fields" => 3,
2275 "symbols" => 4,
2276 "items" => 5,
2277 "values" => 6,
2278 "size" => 7,
2279 _ => return None,
2280 };
2281
2282 Some(v)
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287 use mz_ore::{assert_err, assert_ok};
2288
2289 use crate::types::{Record, ToAvro};
2290
2291 use super::*;
2292
2293 fn check_schema(schema: &str, expected: SchemaPiece) {
2294 let schema = Schema::from_str(schema).unwrap();
2295 assert_eq!(&expected, schema.top_node().inner);
2296
2297 let schema = serde_json::to_string(&schema).unwrap();
2299 let schema = Schema::from_str(&schema).unwrap();
2300 assert_eq!(&expected, schema.top_node().inner);
2301 }
2302
2303 #[mz_ore::test]
2304 fn test_primitive_schema() {
2305 check_schema("\"null\"", SchemaPiece::Null);
2306 check_schema("\"int\"", SchemaPiece::Int);
2307 check_schema("\"double\"", SchemaPiece::Double);
2308 }
2309
2310 #[mz_ore::test]
2311 fn test_array_schema() {
2312 check_schema(
2313 r#"{"type": "array", "items": "string"}"#,
2314 SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2315 );
2316 }
2317
2318 #[mz_ore::test]
2319 fn test_map_schema() {
2320 check_schema(
2321 r#"{"type": "map", "values": "double"}"#,
2322 SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2323 );
2324 }
2325
2326 #[mz_ore::test]
2327 fn test_union_schema() {
2328 check_schema(
2329 r#"["null", "int"]"#,
2330 SchemaPiece::Union(
2331 UnionSchema::new(vec![
2332 SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2333 SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2334 ])
2335 .unwrap(),
2336 ),
2337 );
2338 }
2339
2340 #[mz_ore::test]
2341 fn test_multi_union_schema() {
2342 let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2343 assert_ok!(schema);
2344 let schema = schema.unwrap();
2345 let node = schema.top_node();
2346 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2347 let union_schema = match node.inner {
2348 SchemaPiece::Union(u) => u,
2349 _ => unreachable!(),
2350 };
2351 assert_eq!(union_schema.variants().len(), 5);
2352 let mut variants = union_schema.variants().iter();
2353 assert_eq!(
2354 SchemaKind::from(node.step(variants.next().unwrap())),
2355 SchemaKind::Null
2356 );
2357 assert_eq!(
2358 SchemaKind::from(node.step(variants.next().unwrap())),
2359 SchemaKind::Int
2360 );
2361 assert_eq!(
2362 SchemaKind::from(node.step(variants.next().unwrap())),
2363 SchemaKind::Float
2364 );
2365 assert_eq!(
2366 SchemaKind::from(node.step(variants.next().unwrap())),
2367 SchemaKind::String
2368 );
2369 assert_eq!(
2370 SchemaKind::from(node.step(variants.next().unwrap())),
2371 SchemaKind::Bytes
2372 );
2373 assert_eq!(variants.next(), None);
2374 }
2375
2376 #[mz_ore::test]
2377 fn test_record_schema() {
2378 let schema = r#"
2379 {
2380 "type": "record",
2381 "name": "test",
2382 "doc": "record doc",
2383 "fields": [
2384 {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2385 {"name": "b", "doc": "b doc", "type": "string"}
2386 ]
2387 }
2388 "#;
2389
2390 let mut lookup = BTreeMap::new();
2391 lookup.insert("a".to_owned(), 0);
2392 lookup.insert("b".to_owned(), 1);
2393
2394 let expected = SchemaPiece::Record {
2395 doc: Some("record doc".to_string()),
2396 fields: vec![
2397 RecordField {
2398 name: "a".to_string(),
2399 doc: Some("a doc".to_string()),
2400 default: Some(Value::Number(42i64.into())),
2401 schema: SchemaPiece::Long.into(),
2402 order: RecordFieldOrder::Ascending,
2403 position: 0,
2404 },
2405 RecordField {
2406 name: "b".to_string(),
2407 doc: Some("b doc".to_string()),
2408 default: None,
2409 schema: SchemaPiece::String.into(),
2410 order: RecordFieldOrder::Ascending,
2411 position: 1,
2412 },
2413 ],
2414 lookup,
2415 };
2416
2417 check_schema(schema, expected);
2418 }
2419
2420 #[mz_ore::test]
2421 fn test_enum_schema() {
2422 let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2423
2424 let expected = SchemaPiece::Enum {
2425 doc: None,
2426 symbols: vec![
2427 "diamonds".to_owned(),
2428 "spades".to_owned(),
2429 "jokers".to_owned(),
2430 "clubs".to_owned(),
2431 "hearts".to_owned(),
2432 ],
2433 default_idx: Some(2),
2434 };
2435
2436 check_schema(schema, expected);
2437
2438 let bad_schema = Schema::from_str(
2439 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2440 );
2441
2442 assert_err!(bad_schema);
2443 }
2444
2445 #[mz_ore::test]
2446 fn test_fixed_schema() {
2447 let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2448
2449 let expected = SchemaPiece::Fixed { size: 16usize };
2450
2451 check_schema(schema, expected);
2452 }
2453
2454 #[mz_ore::test]
2455 fn test_date_schema() {
2456 let kinds = &[
2457 r#"{
2458 "type": "int",
2459 "name": "datish",
2460 "logicalType": "date"
2461 }"#,
2462 r#"{
2463 "type": "int",
2464 "name": "datish",
2465 "connect.name": "io.debezium.time.Date"
2466 }"#,
2467 r#"{
2468 "type": "int",
2469 "name": "datish",
2470 "connect.name": "org.apache.kafka.connect.data.Date"
2471 }"#,
2472 ];
2473 for kind in kinds {
2474 check_schema(*kind, SchemaPiece::Date);
2475
2476 let schema = Schema::from_str(*kind).unwrap();
2477 assert_eq!(
2478 serde_json::to_string(&schema).unwrap(),
2479 r#"{"type":"int","logicalType":"date"}"#
2480 );
2481 }
2482 }
2483
2484 #[mz_ore::test]
2485 fn new_field_in_middle() {
2486 let reader = r#"{
2487 "type": "record",
2488 "name": "MyRecord",
2489 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2490 }"#;
2491 let writer = r#"{
2492 "type": "record",
2493 "name": "MyRecord",
2494 "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2495 }"#;
2496 let reader = Schema::from_str(reader).unwrap();
2497 let writer = Schema::from_str(writer).unwrap();
2498
2499 let mut record = Record::new(writer.top_node()).unwrap();
2500 record.put("f1", 1);
2501 record.put("f2", 2);
2502 record.put("f_interposed", 42);
2503
2504 let value = record.avro();
2505
2506 let mut buf = vec![];
2507 crate::encode::encode(&value, &writer, &mut buf);
2508
2509 let resolved = resolve_schemas(&writer, &reader).unwrap();
2510
2511 let reader = &mut &buf[..];
2512 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2513 let expected = crate::types::Value::Record(vec![
2514 ("f1".to_string(), crate::types::Value::Int(1)),
2515 ("f2".to_string(), crate::types::Value::Int(2)),
2516 ]);
2517 assert_eq!(reader_value, expected);
2518 assert!(reader.is_empty()); }
2520
2521 #[mz_ore::test]
2522 fn new_field_at_end() {
2523 let reader = r#"{
2524 "type": "record",
2525 "name": "MyRecord",
2526 "fields": [{"name": "f1", "type": "int"}]
2527 }"#;
2528 let writer = r#"{
2529 "type": "record",
2530 "name": "MyRecord",
2531 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2532 }"#;
2533 let reader = Schema::from_str(reader).unwrap();
2534 let writer = Schema::from_str(writer).unwrap();
2535
2536 let mut record = Record::new(writer.top_node()).unwrap();
2537 record.put("f1", 1);
2538 record.put("f2", 2);
2539
2540 let value = record.avro();
2541
2542 let mut buf = vec![];
2543 crate::encode::encode(&value, &writer, &mut buf);
2544
2545 let resolved = resolve_schemas(&writer, &reader).unwrap();
2546
2547 let reader = &mut &buf[..];
2548 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2549 let expected =
2550 crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2551 assert_eq!(reader_value, expected);
2552 assert!(reader.is_empty()); }
2554
2555 #[mz_ore::test]
2556 fn default_non_nums() {
2557 let reader = r#"{
2558 "type": "record",
2559 "name": "MyRecord",
2560 "fields": [
2561 {"name": "f1", "type": "double", "default": "NaN"},
2562 {"name": "f2", "type": "double", "default": "Infinity"},
2563 {"name": "f3", "type": "double", "default": "-Infinity"}
2564 ]
2565 }
2566 "#;
2567 let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2568
2569 let writer_schema = Schema::from_str(writer).unwrap();
2570 let reader_schema = Schema::from_str(reader).unwrap();
2571 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2572
2573 let record = Record::new(writer_schema.top_node()).unwrap();
2574
2575 let value = record.avro();
2576 let mut buf = vec![];
2577 crate::encode::encode(&value, &writer_schema, &mut buf);
2578
2579 let reader = &mut &buf[..];
2580 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2581 let expected = crate::types::Value::Record(vec![
2582 ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2583 ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2584 (
2585 "f3".to_string(),
2586 crate::types::Value::Double(f64::NEG_INFINITY),
2587 ),
2588 ]);
2589
2590 #[derive(Debug)]
2591 struct NanEq(crate::types::Value);
2592 impl std::cmp::PartialEq for NanEq {
2593 fn eq(&self, other: &Self) -> bool {
2594 match (self, other) {
2595 (
2596 NanEq(crate::types::Value::Double(x)),
2597 NanEq(crate::types::Value::Double(y)),
2598 ) if x.is_nan() && y.is_nan() => true,
2599 (
2600 NanEq(crate::types::Value::Float(x)),
2601 NanEq(crate::types::Value::Float(y)),
2602 ) if x.is_nan() && y.is_nan() => true,
2603 (
2604 NanEq(crate::types::Value::Record(xs)),
2605 NanEq(crate::types::Value::Record(ys)),
2606 ) => {
2607 let xs = xs
2608 .iter()
2609 .cloned()
2610 .map(|(k, v)| (k, NanEq(v)))
2611 .collect::<Vec<_>>();
2612 let ys = ys
2613 .iter()
2614 .cloned()
2615 .map(|(k, v)| (k, NanEq(v)))
2616 .collect::<Vec<_>>();
2617
2618 xs == ys
2619 }
2620 (NanEq(x), NanEq(y)) => x == y,
2621 }
2622 }
2623 }
2624
2625 assert_eq!(NanEq(reader_value), NanEq(expected));
2626 assert!(reader.is_empty());
2627 }
2628
2629 #[mz_ore::test]
2630 fn test_decimal_schemas() {
2631 let schema = r#"{
2632 "type": "fixed",
2633 "name": "dec",
2634 "size": 8,
2635 "logicalType": "decimal",
2636 "precision": 12,
2637 "scale": 5
2638 }"#;
2639 let expected = SchemaPiece::Decimal {
2640 precision: 12,
2641 scale: 5,
2642 fixed_size: Some(8),
2643 };
2644 check_schema(schema, expected);
2645
2646 let schema = r#"{
2647 "type": "bytes",
2648 "logicalType": "decimal",
2649 "precision": 12,
2650 "scale": 5
2651 }"#;
2652 let expected = SchemaPiece::Decimal {
2653 precision: 12,
2654 scale: 5,
2655 fixed_size: None,
2656 };
2657 check_schema(schema, expected);
2658
2659 let res = Schema::from_str(
2660 r#"["bytes", {
2661 "type": "bytes",
2662 "logicalType": "decimal",
2663 "precision": 12,
2664 "scale": 5
2665 }]"#,
2666 );
2667 assert_eq!(
2668 res.unwrap_err().to_string(),
2669 "Schema parse error: Unions cannot contain duplicate types"
2670 );
2671
2672 let writer_schema = Schema::from_str(
2673 r#"["null", {
2674 "type": "bytes"
2675 }]"#,
2676 )
2677 .unwrap();
2678 let reader_schema = Schema::from_str(
2679 r#"["null", {
2680 "type": "bytes",
2681 "logicalType": "decimal",
2682 "precision": 12,
2683 "scale": 5
2684 }]"#,
2685 )
2686 .unwrap();
2687 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2688
2689 let expected = SchemaPiece::ResolveUnionUnion {
2690 permutation: vec![
2691 Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2692 Ok((
2693 1,
2694 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2695 precision: 12,
2696 scale: 5,
2697 fixed_size: None,
2698 }),
2699 )),
2700 ],
2701 n_reader_variants: 2,
2702 reader_null_variant: Some(0),
2703 };
2704 assert_eq!(resolved.top_node().inner, &expected);
2705 }
2706
2707 #[mz_ore::test]
2708 fn test_no_documentation() {
2709 let schema =
2710 Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2711 .unwrap();
2712
2713 let doc = match schema.top_node().inner {
2714 SchemaPiece::Enum { doc, .. } => doc.clone(),
2715 _ => panic!(),
2716 };
2717
2718 assert_none!(doc);
2719 }
2720
2721 #[mz_ore::test]
2722 fn test_documentation() {
2723 let schema = Schema::from_str(
2724 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2725 ).unwrap();
2726
2727 let doc = match schema.top_node().inner {
2728 SchemaPiece::Enum { doc, .. } => doc.clone(),
2729 _ => None,
2730 };
2731
2732 assert_eq!("Some documentation".to_owned(), doc.unwrap());
2733 }
2734
2735 #[mz_ore::test]
2736 fn test_namespaces_and_names() {
2737 let schema = Schema::from_str(
2739 r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2740 )
2741 .unwrap();
2742 assert_eq!(schema.named.len(), 1);
2743 assert_eq!(
2744 schema.named[0].name,
2745 FullName {
2746 name: "name".into(),
2747 namespace: "namespace".into()
2748 }
2749 );
2750
2751 let schema =
2753 Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2754 .unwrap();
2755 assert_eq!(schema.named.len(), 1);
2756 assert_eq!(
2757 schema.named[0].name,
2758 FullName {
2759 name: "dots".into(),
2760 namespace: "name.has".into()
2761 }
2762 );
2763
2764 let schema = Schema::from_str(
2766 r#"{"type": "enum", "namespace": "namespace",
2767 "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2768 )
2769 .unwrap();
2770 assert_eq!(schema.named.len(), 1);
2771 assert_eq!(
2772 schema.named[0].name,
2773 FullName {
2774 name: "dots".into(),
2775 namespace: "name.has".into()
2776 }
2777 );
2778
2779 let schema = Schema::from_str(
2782 r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2783 "fields": [{"name": "name", "type": "string"}]}"#,
2784 )
2785 .unwrap();
2786 assert_eq!(schema.named.len(), 1);
2787 assert_eq!(
2788 schema.named[0].name,
2789 FullName {
2790 name: "TestDoc".into(),
2791 namespace: "".into()
2792 }
2793 );
2794
2795 let schema = Schema::from_str(
2797 r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2798 "fields": [{"name": "name", "type": "string"}]}"#,
2799 )
2800 .unwrap();
2801 assert_eq!(schema.named.len(), 1);
2802 assert_eq!(
2803 schema.named[0].name,
2804 FullName {
2805 name: "TestDoc".into(),
2806 namespace: "".into()
2807 }
2808 );
2809
2810 let first = Schema::from_str(
2812 r#"{"type": "fixed", "namespace": "namespace",
2813 "name": "name", "size": 1}"#,
2814 )
2815 .unwrap();
2816 let second = Schema::from_str(
2817 r#"{"type": "fixed", "name": "namespace.name",
2818 "size": 1}"#,
2819 )
2820 .unwrap();
2821 assert_eq!(first.named[0].name, second.named[0].name);
2822
2823 let first = Schema::from_str(
2824 r#"{"type": "fixed", "namespace": "namespace",
2825 "name": "name", "size": 1}"#,
2826 )
2827 .unwrap();
2828 let second = Schema::from_str(
2829 r#"{"type": "fixed", "name": "namespace.Name",
2830 "size": 1}"#,
2831 )
2832 .unwrap();
2833 assert_ne!(first.named[0].name, second.named[0].name);
2834
2835 let first = Schema::from_str(
2836 r#"{"type": "fixed", "namespace": "Namespace",
2837 "name": "name", "size": 1}"#,
2838 )
2839 .unwrap();
2840 let second = Schema::from_str(
2841 r#"{"type": "fixed", "namespace": "namespace",
2842 "name": "name", "size": 1}"#,
2843 )
2844 .unwrap();
2845 assert_ne!(first.named[0].name, second.named[0].name);
2846
2847 assert!(
2850 Schema::from_str(
2851 r#"{"type": "record", "name": "99 problems but a name aint one",
2852 "fields": [{"name": "name", "type": "string"}]}"#
2853 )
2854 .is_err()
2855 );
2856
2857 assert!(
2858 Schema::from_str(
2859 r#"{"type": "record", "name": "!!!",
2860 "fields": [{"name": "name", "type": "string"}]}"#
2861 )
2862 .is_err()
2863 );
2864
2865 assert!(
2866 Schema::from_str(
2867 r#"{"type": "record", "name": "_valid_until_©",
2868 "fields": [{"name": "name", "type": "string"}]}"#
2869 )
2870 .is_err()
2871 );
2872
2873 let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2875 {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2876 {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2877 {"name": "f3", "type": "MyEnum"},
2878 {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2879 {"name": "f5", "type": "other.namespace.OtherEnum"},
2880 {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2881 {"name": "f7", "type": "some.other.ThirdEnum"}
2882 ]}"#).unwrap();
2883 assert_eq!(schema.named.len(), 4);
2884
2885 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2886 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
2894 panic!("Expected SchemaPiece::Record, found something else");
2895 }
2896
2897 let schema = Schema::from_str(
2898 r#"{"type": "record", "name": "x.Y", "fields": [
2899 {"name": "e", "type":
2900 {"type": "record", "name": "Z", "fields": [
2901 {"name": "f", "type": "x.Y"},
2902 {"name": "g", "type": "x.Z"}
2903 ]}
2904 }
2905 ]}"#,
2906 )
2907 .unwrap();
2908 assert_eq!(schema.named.len(), 2);
2909
2910 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2911 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
2913 panic!("Expected SchemaPiece::Record, found something else");
2914 }
2915
2916 if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
2917 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
2920 panic!("Expected SchemaPiece::Record, found something else");
2921 }
2922
2923 let schema = Schema::from_str(
2924 r#"{"type": "record", "name": "R", "fields": [
2925 {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
2926 {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
2927 "symbols": ["Foo", "Bar"]}
2928 }
2929 ]}},
2930 {"name": "t", "type": "Z"}
2931 ]}"#,
2932 )
2933 .unwrap();
2934 assert_eq!(schema.named.len(), 3);
2935
2936 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2937 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
2940 panic!("Expected SchemaPiece::Record, found something else");
2941 }
2942 }
2943
2944 #[mz_ore::test]
2947 fn test_schema_is_send() {
2948 fn send<S: Send>(_s: S) {}
2949
2950 let schema = Schema {
2951 named: vec![],
2952 indices: Default::default(),
2953 top: SchemaPiece::Null.into(),
2954 };
2955 send(schema);
2956 }
2957
2958 #[mz_ore::test]
2959 fn test_schema_is_sync() {
2960 fn sync<S: Sync>(_s: S) {}
2961
2962 let schema = Schema {
2963 named: vec![],
2964 indices: Default::default(),
2965 top: SchemaPiece::Null.into(),
2966 };
2967 sync(&schema);
2968 sync(schema);
2969 }
2970
2971 #[mz_ore::test]
2972 #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
2974 use sha2::Sha256;
2975
2976 let raw_schema = r#"
2977 {
2978 "type": "record",
2979 "name": "test",
2980 "fields": [
2981 {"name": "a", "type": "long", "default": 42},
2982 {"name": "b", "type": "string"}
2983 ]
2984 }
2985 "#;
2986 let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
2987 let schema = Schema::from_str(raw_schema).unwrap();
2988 assert_eq!(&schema.canonical_form(), expected_canonical);
2989 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
2990 assert_eq!(
2991 format!("{}", schema.fingerprint::<Sha256>()),
2992 expected_fingerprint
2993 );
2994
2995 let raw_schema = r#"
2996{
2997 "type": "record",
2998 "name": "ns.r1",
2999 "namespace": "ignored",
3000 "fields": [
3001 {
3002 "name": "f1",
3003 "type": {
3004 "type": "fixed",
3005 "name": "r2",
3006 "size": 1
3007 }
3008 }
3009 ]
3010}
3011"#;
3012 let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3013 let schema = Schema::from_str(raw_schema).unwrap();
3014 assert_eq!(&schema.canonical_form(), expected_canonical);
3015 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3016 assert_eq!(
3017 format!("{}", schema.fingerprint::<Sha256>()),
3018 expected_fingerprint
3019 );
3020 }
3021
3022 #[mz_ore::test]
3023 fn test_make_valid() {
3024 for (input, expected) in [
3025 ("foo", "foo"),
3026 ("az99", "az99"),
3027 ("99az", "_99az"),
3028 ("is,bad", "is_bad"),
3029 ("@#$%", "____"),
3030 ("i-amMisBehaved!", "i_amMisBehaved_"),
3031 ("", "_"),
3032 ] {
3033 let actual = Name::make_valid(input);
3034 assert_eq!(expected, actual, "Name::make_valid({input})")
3035 }
3036 }
3037}