1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use digest::Digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51 writer_schema: &Schema,
52 reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54 let r_indices = reader_schema.indices.clone();
55 let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56 writer_schema
57 .indices
58 .iter()
59 .flat_map(|(name, widx)| {
60 r_indices
61 .get(name)
62 .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63 })
64 .unzip();
65 let reader_fullnames = reader_schema
66 .indices
67 .iter()
68 .map(|(f, i)| (*i, f))
69 .collect::<BTreeMap<_, _>>();
70 let mut resolver = SchemaResolver {
71 named: Default::default(),
72 indices: Default::default(),
73 human_readable_field_path: Vec::new(),
74 current_human_readable_path_start: 0,
75 writer_to_reader_names,
76 reader_to_writer_names,
77 reader_to_resolved_names: Default::default(),
78 reader_fullnames,
79 reader_schema,
80 };
81 let writer_node = writer_schema.top_node_or_named();
82 let reader_node = reader_schema.top_node_or_named();
83 let inner = resolver.resolve(writer_node, reader_node)?;
84 let sch = Schema {
85 named: resolver.named.into_iter().map(Option::unwrap).collect(),
86 indices: resolver.indices,
87 top: inner,
88 };
89 Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97 pub fn new<S>(msg: S) -> ParseSchemaError
98 where
99 S: Into<String>,
100 {
101 ParseSchemaError(msg.into())
102 }
103}
104
105impl fmt::Display for ParseSchemaError {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 self.0.fmt(f)
108 }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118 pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 self.bytes
127 .iter()
128 .map(|byte| format!("{:02x}", byte))
129 .collect::<Vec<String>>()
130 .join("")
131 )
132 }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137 Piece(SchemaPiece),
138 Named(usize),
139}
140impl SchemaPieceOrNamed {
141 pub fn get_human_name(&self, root: &Schema) -> String {
142 match self {
143 Self::Piece(piece) => format!("{:?}", piece),
144 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145 }
146 }
147 #[inline(always)]
148 pub fn get_piece_and_name<'a>(
149 &'a self,
150 root: &'a Schema,
151 ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152 self.as_ref().get_piece_and_name(root)
153 }
154
155 #[inline(always)]
156 pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157 match self {
158 SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159 SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160 }
161 }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165 #[inline(always)]
166 fn from(piece: SchemaPiece) -> Self {
167 Self::Piece(piece)
168 }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173 Null,
175 Boolean,
177 Int,
179 Long,
181 Float,
183 Double,
185 Date,
187 TimestampMilli,
191 TimestampMicro,
195 Decimal {
201 precision: usize,
202 scale: usize,
203 fixed_size: Option<usize>,
204 },
205 Bytes,
208 String,
211 Json,
213 Uuid,
215 Array(Box<SchemaPieceOrNamed>),
218 Map(Box<SchemaPieceOrNamed>),
222 Union(UnionSchema),
224 ResolveIntTsMilli,
227 ResolveIntTsMicro,
230 ResolveDateTimestamp,
233 ResolveIntLong,
235 ResolveIntFloat,
237 ResolveIntDouble,
239 ResolveLongFloat,
241 ResolveLongDouble,
243 ResolveFloatDouble,
245 ResolveConcreteUnion {
248 index: usize,
250 inner: Box<SchemaPieceOrNamed>,
252 n_reader_variants: usize,
253 reader_null_variant: Option<usize>,
254 },
255 ResolveUnionUnion {
258 permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264 n_reader_variants: usize,
265 reader_null_variant: Option<usize>,
266 },
267 ResolveUnionConcrete {
269 index: usize,
270 inner: Box<SchemaPieceOrNamed>,
271 },
272 Record {
277 doc: Documentation,
278 fields: Vec<RecordField>,
279 lookup: BTreeMap<String, usize>,
280 },
281 Enum {
283 doc: Documentation,
284 symbols: Vec<String>,
285 default_idx: Option<usize>,
291 },
292 Fixed { size: usize },
294 ResolveRecord {
297 defaults: Vec<ResolvedDefaultValueField>,
300 fields: Vec<ResolvedRecordField>,
304 n_reader_fields: usize,
306 },
307 ResolveEnum {
310 doc: Documentation,
311 symbols: Vec<Result<(usize, String), String>>,
314 default: Option<(usize, String)>,
316 },
317}
318
319impl SchemaPiece {
320 pub fn is_underlying_int(&self) -> bool {
322 self.try_make_int_value(0).is_some()
323 }
324 pub fn is_underlying_long(&self) -> bool {
326 self.try_make_long_value(0).is_some()
327 }
328 pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331 match self {
332 SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333 SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336 _ => None,
337 }
338 }
339 pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342 match self {
343 SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344 SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345 SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346 _ => None,
347 }
348 }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356 pub(crate) named: Vec<NamedSchemaPiece>,
357 pub(crate) indices: BTreeMap<FullName, usize>,
358 pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362 fn to_string(&self) -> String {
363 let json = serde_json::to_value(self).unwrap();
364 json.to_string()
365 }
366}
367
368impl std::fmt::Debug for Schema {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 if f.alternate() {
371 f.write_str(
372 &serde_json::to_string_pretty(self)
373 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374 )
375 } else {
376 f.write_str(
377 &serde_json::to_string(self)
378 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379 )
380 }
381 }
382}
383
384impl Schema {
385 pub fn top_node(&self) -> SchemaNode<'_> {
386 let (inner, name) = self.top.get_piece_and_name(self);
387 SchemaNode {
388 root: self,
389 inner,
390 name,
391 }
392 }
393 pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394 SchemaNodeOrNamed {
395 root: self,
396 inner: self.top.as_ref(),
397 }
398 }
399 pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400 &self.named[idx]
401 }
402 pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403 self.indices.get(name).map(|&idx| &self.named[idx])
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416 Null,
418 Boolean,
419 Int,
420 Long,
421 Float,
422 Double,
423 Bytes,
425 String,
426 Array,
427 Map,
428 Union,
429 Record,
430 Enum,
431 Fixed,
432 Unknown,
435}
436
437impl SchemaKind {
438 pub fn name(self) -> &'static str {
439 match self {
440 SchemaKind::Null => "null",
441 SchemaKind::Boolean => "boolean",
442 SchemaKind::Int => "int",
443 SchemaKind::Long => "long",
444 SchemaKind::Float => "float",
445 SchemaKind::Double => "double",
446 SchemaKind::Bytes => "bytes",
447 SchemaKind::String => "string",
448 SchemaKind::Array => "array",
449 SchemaKind::Map => "map",
450 SchemaKind::Union => "union",
451 SchemaKind::Record => "record",
452 SchemaKind::Enum => "enum",
453 SchemaKind::Fixed => "fixed",
454 SchemaKind::Unknown => "unknown",
455 }
456 }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460 #[inline(always)]
461 fn from(piece: &'a SchemaPiece) -> SchemaKind {
462 match piece {
463 SchemaPiece::Null => SchemaKind::Null,
464 SchemaPiece::Boolean => SchemaKind::Boolean,
465 SchemaPiece::Int => SchemaKind::Int,
466 SchemaPiece::Long => SchemaKind::Long,
467 SchemaPiece::Float => SchemaKind::Float,
468 SchemaPiece::Double => SchemaKind::Double,
469 SchemaPiece::Date => SchemaKind::Int,
470 SchemaPiece::TimestampMilli
471 | SchemaPiece::TimestampMicro
472 | SchemaPiece::ResolveIntTsMilli
473 | SchemaPiece::ResolveDateTimestamp
474 | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475 SchemaPiece::Decimal {
476 fixed_size: None, ..
477 } => SchemaKind::Bytes,
478 SchemaPiece::Decimal {
479 fixed_size: Some(_),
480 ..
481 } => SchemaKind::Fixed,
482 SchemaPiece::Bytes => SchemaKind::Bytes,
483 SchemaPiece::String => SchemaKind::String,
484 SchemaPiece::Array(_) => SchemaKind::Array,
485 SchemaPiece::Map(_) => SchemaKind::Map,
486 SchemaPiece::Union(_) => SchemaKind::Union,
487 SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488 SchemaPiece::ResolveIntLong => SchemaKind::Long,
489 SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490 SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491 SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492 SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493 SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494 SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495 SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496 SchemaPiece::Record { .. } => SchemaKind::Record,
497 SchemaPiece::Enum { .. } => SchemaKind::Enum,
498 SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499 SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500 SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501 SchemaPiece::Json => SchemaKind::String,
502 SchemaPiece::Uuid => SchemaKind::String,
503 }
504 }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508 #[inline(always)]
509 fn from(schema: SchemaNode<'a>) -> SchemaKind {
510 SchemaKind::from(schema.inner)
511 }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515 #[inline(always)]
516 fn from(schema: &'a Schema) -> SchemaKind {
517 Self::from(schema.top_node())
518 }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533 pub name: String,
534 pub namespace: Option<String>,
535 pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540 name: String,
541 namespace: String,
542}
543
544impl FullName {
545 pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547 if let Some(ns) = namespace {
548 FullName {
549 name: name.to_owned(),
550 namespace: ns.to_owned(),
551 }
552 } else {
553 let mut split = name.rsplitn(2, '.');
554 let name = split.next().unwrap();
555 let namespace = split.next().unwrap_or(default_namespace);
556
557 FullName {
558 name: name.into(),
559 namespace: namespace.into(),
560 }
561 }
562 }
563 pub fn base_name(&self) -> &str {
564 &self.name
565 }
566 pub fn human_name(&self) -> String {
567 if self.namespace.is_empty() {
568 return self.name.clone();
569 }
570 format!("{}.{}", self.namespace, self.name)
571 }
572 pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577 if enclosing_ns == &self.namespace {
578 Cow::Borrowed(&self.name)
579 } else {
580 Cow::Owned(format!("{}.{}", self.namespace, self.name))
581 }
582 }
583 pub fn namespace(&self) -> &str {
585 &self.namespace
586 }
587}
588
589impl fmt::Debug for FullName {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 write!(f, "{}.{}", self.namespace, self.name)
592 }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599 pub fn is_valid(name: &str) -> bool {
603 static MATCHER: LazyLock<Regex> =
604 LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605 MATCHER.is_match(name)
606 }
607
608 pub fn validate(name: &str) -> Result<(), AvroError> {
610 match Self::is_valid(name) {
611 true => Ok(()),
612 false => {
613 Err(ParseSchemaError::new(format!(
614 "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615 name
616 )).into())
617 }
618 }
619 }
620
621 pub fn make_valid(name: &str) -> String {
629 let mut out = String::new();
630 let mut chars = name.chars();
631 match chars.next() {
632 Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633 Some(ch @ '0'..='9') => {
634 out.push('_');
635 out.push(ch);
636 }
637 _ => out.push('_'),
638 }
639 for ch in chars {
640 match ch {
641 '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642 _ => out.push('_'),
643 }
644 }
645 debug_assert!(
646 Name::is_valid(&out),
647 "make_valid({name}) produced invalid name: {out}"
648 );
649 out
650 }
651
652 pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654 let name = complex
655 .name()
656 .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657 if name.is_empty() {
658 return Err(ParseSchemaError::new(format!(
659 "Name cannot be the empty string: {:?}",
660 complex
661 ))
662 .into());
663 }
664
665 let (namespace, name) = if let Some(index) = name.rfind('.') {
666 let computed_namespace = name[..index].to_owned();
667 let computed_name = name[index + 1..].to_owned();
668 if let Some(provided_namespace) = complex.string("namespace") {
669 if provided_namespace != computed_namespace {
670 warn!(
671 "Found dots in name {}, updating to namespace {} and name {}",
672 name, computed_namespace, computed_name
673 );
674 }
675 }
676 (Some(computed_namespace), computed_name)
677 } else {
678 (complex.string("namespace"), name)
679 };
680
681 Self::validate(&name)?;
682
683 let aliases: Option<Vec<String>> = complex
684 .get("aliases")
685 .and_then(|aliases| aliases.as_array())
686 .and_then(|aliases| {
687 aliases
688 .iter()
689 .map(|alias| alias.as_str())
690 .map(|alias| alias.map(|a| a.to_string()))
691 .collect::<Option<_>>()
692 });
693
694 Ok(Name {
695 name,
696 namespace,
697 aliases,
698 })
699 }
700
701 pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703 let mut map = serde_json::map::Map::new();
704 map.insert("name".into(), serde_json::Value::String(name.into()));
705 Self::parse(&map)
706 }
707
708 pub fn fullname(&self, default_namespace: &str) -> FullName {
713 FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714 }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719 pub name: String,
720 pub doc: Documentation,
721 pub default: types::Value,
722 pub order: RecordFieldOrder,
723 pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728 Absent(Schema),
729 Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735 pub name: String,
737 pub doc: Documentation,
739 pub default: Option<Value>,
743 pub schema: SchemaPieceOrNamed,
745 pub order: RecordFieldOrder,
749 pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756 Ascending,
757 Descending,
758 Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765 schemas: Vec<SchemaPieceOrNamed>,
766
767 anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771 named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776 pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777 let mut avindex = BTreeMap::new();
778 let mut nvindex = BTreeMap::new();
779 for (i, schema) in schemas.iter().enumerate() {
780 match schema {
781 SchemaPieceOrNamed::Piece(sp) => {
782 if let SchemaPiece::Union(_) = sp {
783 return Err(ParseSchemaError::new(
784 "Unions may not directly contain a union",
785 )
786 .into());
787 }
788 let kind = SchemaKind::from(sp);
789 if avindex.insert(kind, i).is_some() {
790 return Err(
791 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792 );
793 }
794 }
795 SchemaPieceOrNamed::Named(idx) => {
796 if nvindex.insert(*idx, i).is_some() {
797 return Err(
798 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799 );
800 }
801 }
802 }
803 }
804 Ok(UnionSchema {
805 schemas,
806 anon_variant_index: avindex,
807 named_variant_index: nvindex,
808 })
809 }
810
811 pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813 &self.schemas
814 }
815
816 pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818 &mut self.schemas
819 }
820
821 pub fn is_nullable(&self) -> bool {
823 !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824 }
825
826 pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827 self.anon_variant_index
828 .get(&SchemaKind::from(sp))
829 .map(|idx| (*idx, &self.schemas[*idx]))
830 }
831
832 pub fn match_ref(
833 &self,
834 other: SchemaPieceRefOrNamed,
835 names_map: &BTreeMap<usize, usize>,
836 ) -> Option<(usize, &SchemaPieceOrNamed)> {
837 match other {
838 SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839 SchemaPieceRefOrNamed::Named(idx) => names_map
840 .get(&idx)
841 .and_then(|idx| self.named_variant_index.get(idx))
842 .map(|idx| (*idx, &self.schemas[*idx])),
843 }
844 }
845
846 #[inline(always)]
847 pub fn match_(
848 &self,
849 other: &SchemaPieceOrNamed,
850 names_map: &BTreeMap<usize, usize>,
851 ) -> Option<(usize, &SchemaPieceOrNamed)> {
852 self.match_ref(other.as_ref(), names_map)
853 }
854}
855
856impl PartialEq for UnionSchema {
858 fn eq(&self, other: &UnionSchema) -> bool {
859 self.schemas.eq(&other.schemas)
860 }
861}
862
863#[derive(Default)]
864struct SchemaParser {
865 named: Vec<Option<NamedSchemaPiece>>,
866 indices: BTreeMap<FullName, usize>,
867}
868
869impl SchemaParser {
870 fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
871 let top = self.parse_inner("", value)?;
872 let SchemaParser { named, indices } = self;
873 Ok(Schema {
874 named: named.into_iter().map(|o| o.unwrap()).collect(),
875 indices,
876 top,
877 })
878 }
879
880 fn parse_inner(
881 &mut self,
882 default_namespace: &str,
883 value: &Value,
884 ) -> Result<SchemaPieceOrNamed, AvroError> {
885 match *value {
886 Value::String(ref t) => {
887 let name = FullName::from_parts(t.as_str(), None, default_namespace);
888 if let Some(idx) = self.indices.get(&name) {
889 Ok(SchemaPieceOrNamed::Named(*idx))
890 } else {
891 Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
892 t.as_str(),
893 )?))
894 }
895 }
896 Value::Object(ref data) => self.parse_complex(default_namespace, data),
897 Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
898 self.parse_union(default_namespace, data)?,
899 )),
900 _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
901 }
902 }
903
904 fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
905 let idx = match self.indices.entry(fullname) {
906 Entry::Vacant(ve) => *ve.insert(self.named.len()),
907 Entry::Occupied(oe) => {
908 return Err(ParseSchemaError::new(format!(
909 "Sub-schema with name {:?} encountered multiple times",
910 oe.key()
911 ))
912 .into());
913 }
914 };
915 self.named.push(None);
916 Ok(idx)
917 }
918
919 fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
920 assert_none!(self.named[index]);
921 self.named[index] = Some(schema);
922 }
923
924 fn parse_named_type(
925 &mut self,
926 type_name: &str,
927 default_namespace: &str,
928 complex: &Map<String, Value>,
929 ) -> Result<usize, AvroError> {
930 let name = Name::parse(complex)?;
931 match name.name.as_str() {
932 "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
933 return Err(ParseSchemaError::new(format!(
934 "{} may not be used as a custom type name",
935 name.name
936 ))
937 .into());
938 }
939 _ => {}
940 };
941 let fullname = name.fullname(default_namespace);
942 let default_namespace = fullname.namespace.clone();
943 let idx = self.alloc_name(fullname.clone())?;
944 let piece = match type_name {
945 "record" => self.parse_record(&default_namespace, complex),
946 "enum" => self.parse_enum(complex),
947 "fixed" => self.parse_fixed(&default_namespace, complex),
948 _ => unreachable!("Unknown named type kind: {}", type_name),
949 }?;
950
951 self.insert(
952 idx,
953 NamedSchemaPiece {
954 name: fullname,
955 piece,
956 },
957 );
958
959 Ok(idx)
960 }
961
962 fn parse_complex(
968 &mut self,
969 default_namespace: &str,
970 complex: &Map<String, Value>,
971 ) -> Result<SchemaPieceOrNamed, AvroError> {
972 match complex.get("type") {
973 Some(&Value::String(ref t)) => Ok(match t.as_str() {
974 "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
975 t,
976 default_namespace,
977 complex,
978 )?),
979 "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
980 "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
981 "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
982 "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
983 "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
984 "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
985 other => {
986 let name = FullName {
987 name: other.into(),
988 namespace: default_namespace.into(),
989 };
990 if let Some(idx) = self.indices.get(&name) {
991 SchemaPieceOrNamed::Named(*idx)
992 } else {
993 SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
994 }
995 }
996 }),
997 Some(&Value::Object(ref data)) => match data.get("type") {
998 Some(value) => self.parse_inner(default_namespace, value),
999 None => Err(
1000 ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1001 ),
1002 },
1003 _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1004 }
1005 }
1006
1007 fn parse_record(
1010 &mut self,
1011 default_namespace: &str,
1012 complex: &Map<String, Value>,
1013 ) -> Result<SchemaPiece, AvroError> {
1014 let mut lookup = BTreeMap::new();
1015
1016 let fields: Vec<RecordField> = complex
1017 .get("fields")
1018 .and_then(|fields| fields.as_array())
1019 .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1020 .and_then(|fields| {
1021 fields
1022 .iter()
1023 .filter_map(|field| field.as_object())
1024 .enumerate()
1025 .map(|(position, field)| {
1026 self.parse_record_field(default_namespace, field, position)
1027 })
1028 .collect::<Result<_, _>>()
1029 })?;
1030
1031 for field in &fields {
1032 lookup.insert(field.name.clone(), field.position);
1033 }
1034
1035 Ok(SchemaPiece::Record {
1036 doc: complex.doc(),
1037 fields,
1038 lookup,
1039 })
1040 }
1041
1042 fn parse_record_field(
1044 &mut self,
1045 default_namespace: &str,
1046 field: &Map<String, Value>,
1047 position: usize,
1048 ) -> Result<RecordField, AvroError> {
1049 let name = field
1050 .name()
1051 .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1052
1053 Name::validate(&name)?;
1054
1055 let schema = field
1056 .get("type")
1057 .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1058 .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1059
1060 let default = field.get("default").cloned();
1061
1062 let order = field
1063 .get("order")
1064 .and_then(|order| order.as_str())
1065 .and_then(|order| match order {
1066 "ascending" => Some(RecordFieldOrder::Ascending),
1067 "descending" => Some(RecordFieldOrder::Descending),
1068 "ignore" => Some(RecordFieldOrder::Ignore),
1069 _ => None,
1070 })
1071 .unwrap_or(RecordFieldOrder::Ascending);
1072
1073 Ok(RecordField {
1074 name,
1075 doc: field.doc(),
1076 default,
1077 schema,
1078 order,
1079 position,
1080 })
1081 }
1082
1083 fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1086 let symbols: Vec<String> = complex
1087 .get("symbols")
1088 .and_then(|v| v.as_array())
1089 .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1090 .and_then(|symbols| {
1091 symbols
1092 .iter()
1093 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1094 .collect::<Option<_>>()
1095 .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1096 })?;
1097
1098 let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1099 for symbol in symbols.iter() {
1100 if unique_symbols.contains(symbol) {
1101 return Err(ParseSchemaError::new(format!(
1102 "Enum symbols must be unique, found multiple: {}",
1103 symbol
1104 ))
1105 .into());
1106 } else {
1107 unique_symbols.insert(symbol);
1108 }
1109 }
1110
1111 let default_idx = if let Some(default) = complex.get("default") {
1112 let default_str = default.as_str().ok_or_else(|| {
1113 ParseSchemaError::new(format!(
1114 "Enum default should be a string, got: {:?}",
1115 default
1116 ))
1117 })?;
1118 let default_idx = symbols
1119 .iter()
1120 .position(|x| x == default_str)
1121 .ok_or_else(|| {
1122 ParseSchemaError::new(format!(
1123 "Enum default not found in list of symbols: {}",
1124 default_str
1125 ))
1126 })?;
1127 Some(default_idx)
1128 } else {
1129 None
1130 };
1131
1132 Ok(SchemaPiece::Enum {
1133 doc: complex.doc(),
1134 symbols,
1135 default_idx,
1136 })
1137 }
1138
1139 fn parse_array(
1142 &mut self,
1143 default_namespace: &str,
1144 complex: &Map<String, Value>,
1145 ) -> Result<SchemaPiece, AvroError> {
1146 complex
1147 .get("items")
1148 .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1149 .and_then(|items| self.parse_inner(default_namespace, items))
1150 .map(|schema| SchemaPiece::Array(Box::new(schema)))
1151 }
1152
1153 fn parse_map(
1156 &mut self,
1157 default_namespace: &str,
1158 complex: &Map<String, Value>,
1159 ) -> Result<SchemaPiece, AvroError> {
1160 complex
1161 .get("values")
1162 .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1163 .and_then(|items| self.parse_inner(default_namespace, items))
1164 .map(|schema| SchemaPiece::Map(Box::new(schema)))
1165 }
1166
1167 fn parse_union(
1170 &mut self,
1171 default_namespace: &str,
1172 items: &[Value],
1173 ) -> Result<SchemaPiece, AvroError> {
1174 items
1175 .iter()
1176 .map(|value| self.parse_inner(default_namespace, value))
1177 .collect::<Result<Vec<_>, _>>()
1178 .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1179 }
1180
1181 fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1184 let precision = complex
1185 .get("precision")
1186 .and_then(|v| v.as_i64())
1187 .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1188
1189 let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1190
1191 if scale < 0 {
1192 return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1193 }
1194
1195 if precision < 0 {
1196 return Err(
1197 ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1198 );
1199 }
1200
1201 if scale > precision {
1202 return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1203 }
1204
1205 Ok((precision as usize, scale as usize))
1206 }
1207
1208 fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1211 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1212
1213 if let Some("decimal") = logical_type {
1214 match Self::parse_decimal(complex) {
1215 Ok((precision, scale)) => {
1216 return Ok(SchemaPiece::Decimal {
1217 precision,
1218 scale,
1219 fixed_size: None,
1220 });
1221 }
1222 Err(e) => warn!(
1223 "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1224 complex, e
1225 ),
1226 }
1227 }
1228
1229 Ok(SchemaPiece::Bytes)
1230 }
1231
1232 fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1240 const AVRO_DATE: &str = "date";
1241 const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1242 const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1243 if let Some(name) = complex.get("connect.name") {
1244 if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1245 if name == KAFKA_DATE {
1246 warn!("using deprecated debezium date format");
1247 }
1248 return Ok(SchemaPiece::Date);
1249 }
1250 }
1251 if let Some(name) = complex.get("logicalType") {
1255 if name == AVRO_DATE {
1256 return Ok(SchemaPiece::Date);
1257 }
1258 }
1259 if !complex.is_empty() {
1260 debug!("parsing complex type as regular int: {:?}", complex);
1261 }
1262 Ok(SchemaPiece::Int)
1263 }
1264
1265 fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1273 const AVRO_MILLI_TS: &str = "timestamp-millis";
1274 const AVRO_MICRO_TS: &str = "timestamp-micros";
1275
1276 const CONNECT_MILLI_TS: &[&str] = &[
1277 "io.debezium.time.Timestamp",
1278 "org.apache.kafka.connect.data.Timestamp",
1279 ];
1280 const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1281
1282 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1283 if CONNECT_MILLI_TS.contains(&&**name) {
1284 return Ok(SchemaPiece::TimestampMilli);
1285 }
1286 if name == CONNECT_MICRO_TS {
1287 return Ok(SchemaPiece::TimestampMicro);
1288 }
1289 }
1290 if let Some(name) = complex.get("logicalType") {
1291 if name == AVRO_MILLI_TS {
1292 return Ok(SchemaPiece::TimestampMilli);
1293 }
1294 if name == AVRO_MICRO_TS {
1295 return Ok(SchemaPiece::TimestampMicro);
1296 }
1297 }
1298 if !complex.is_empty() {
1299 debug!("parsing complex type as regular long: {:?}", complex);
1300 }
1301 Ok(SchemaPiece::Long)
1302 }
1303
1304 fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1305 const CONNECT_JSON: &str = "io.debezium.data.Json";
1306
1307 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1308 if CONNECT_JSON == name.as_str() {
1309 return SchemaPiece::Json;
1310 }
1311 }
1312 if let Some(name) = complex.get("logicalType") {
1313 if name == "uuid" {
1314 return SchemaPiece::Uuid;
1315 }
1316 }
1317 debug!("parsing complex type as regular string: {:?}", complex);
1318 SchemaPiece::String
1319 }
1320
1321 fn parse_fixed(
1324 &self,
1325 _default_namespace: &str,
1326 complex: &Map<String, Value>,
1327 ) -> Result<SchemaPiece, AvroError> {
1328 let _name = Name::parse(complex)?;
1329
1330 let size = complex
1331 .get("size")
1332 .and_then(|v| v.as_i64())
1333 .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1334 if size <= 0 {
1335 return Err(ParseSchemaError::new(format!(
1336 "Fixed values require a positive size attribute, found: {}",
1337 size
1338 ))
1339 .into());
1340 }
1341
1342 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1343
1344 if let Some("decimal") = logical_type {
1345 match Self::parse_decimal(complex) {
1346 Ok((precision, scale)) => {
1347 let max = ((2_usize.pow((8 * size - 1) as u32) - 1) as f64).log10() as usize;
1348 if precision > max {
1349 warn!(
1350 "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1351 precision, size
1352 );
1353 } else {
1354 return Ok(SchemaPiece::Decimal {
1355 precision,
1356 scale,
1357 fixed_size: Some(size as usize),
1358 });
1359 }
1360 }
1361 Err(e) => warn!(
1362 "parsing decimal as fixed due to parse error: {:?}, {:?}",
1363 complex, e
1364 ),
1365 }
1366 }
1367
1368 Ok(SchemaPiece::Fixed {
1369 size: size as usize,
1370 })
1371 }
1372}
1373
1374impl Schema {
1375 pub fn parse(value: &Value) -> Result<Self, AvroError> {
1378 Self::parse_with_references(value, &[])
1379 }
1380
1381 pub fn parse_with_references(
1392 primary: &Value,
1393 reference_schemas: &[Schema],
1394 ) -> Result<Self, AvroError> {
1395 let (named, indices) = Self::collect_named_types(reference_schemas);
1397
1398 let p = SchemaParser {
1400 named: named.into_iter().map(Some).collect(),
1401 indices,
1402 };
1403 p.parse(primary)
1404 }
1405
1406 fn collect_named_types(
1411 schemas: &[Schema],
1412 ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1413 let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1414 let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1415
1416 for schema in schemas {
1417 let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1421 let mut new_type_offset = combined_named.len();
1422
1423 for named_piece in &schema.named {
1424 if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1425 index_map.push(existing_idx);
1427 } else {
1428 index_map.push(new_type_offset);
1430 new_type_offset += 1;
1431 }
1432 }
1433
1434 for named_piece in &schema.named {
1436 if combined_indices.contains_key(&named_piece.name) {
1437 continue;
1438 }
1439
1440 let mut remapped = named_piece.clone();
1441 Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1442
1443 let new_idx = combined_named.len();
1444 combined_indices.insert(remapped.name.clone(), new_idx);
1445 combined_named.push(remapped);
1446 }
1447 }
1448
1449 (combined_named, combined_indices)
1450 }
1451
1452 fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1454 match piece {
1455 SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1456 SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1457 SchemaPiece::Union(union) => {
1458 for variant in union.variants_mut() {
1459 Self::remap_indices_with_map(variant, index_map);
1460 }
1461 }
1462 SchemaPiece::Record { fields, .. } => {
1463 for field in fields {
1464 Self::remap_indices_with_map(&mut field.schema, index_map);
1465 }
1466 }
1467 _ => {}
1468 }
1469 }
1470
1471 fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1473 match item {
1474 SchemaPieceOrNamed::Named(idx) => {
1475 if let Some(&new_idx) = index_map.get(*idx) {
1476 *idx = new_idx;
1477 }
1478 }
1479 SchemaPieceOrNamed::Piece(piece) => {
1480 Self::remap_indices_in_piece_with_map(piece, index_map)
1481 }
1482 }
1483 }
1484
1485 pub fn canonical_form(&self) -> String {
1490 let json = serde_json::to_value(self).unwrap();
1491 parsing_canonical_form(&json)
1492 }
1493
1494 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1499 let mut d = D::new();
1500 d.update(self.canonical_form());
1501 SchemaFingerprint {
1502 bytes: d.finalize().to_vec(),
1503 }
1504 }
1505
1506 fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1509 match primitive {
1510 "null" => Ok(SchemaPiece::Null),
1511 "boolean" => Ok(SchemaPiece::Boolean),
1512 "int" => Ok(SchemaPiece::Int),
1513 "long" => Ok(SchemaPiece::Long),
1514 "double" => Ok(SchemaPiece::Double),
1515 "float" => Ok(SchemaPiece::Float),
1516 "bytes" => Ok(SchemaPiece::Bytes),
1517 "string" => Ok(SchemaPiece::String),
1518 other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1519 }
1520 }
1521}
1522
1523impl FromStr for Schema {
1524 type Err = AvroError;
1525
1526 fn from_str(input: &str) -> Result<Self, AvroError> {
1528 let value = serde_json::from_str(input)
1529 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1530 Self::parse(&value)
1531 }
1532}
1533
1534#[derive(Clone, Debug, PartialEq)]
1535pub struct NamedSchemaPiece {
1536 pub name: FullName,
1537 pub piece: SchemaPiece,
1538}
1539
1540#[derive(Copy, Clone, Debug)]
1541pub struct SchemaNode<'a> {
1542 pub root: &'a Schema,
1543 pub inner: &'a SchemaPiece,
1544 pub name: Option<&'a FullName>,
1545}
1546
1547#[derive(Copy, Clone, Debug)]
1548pub enum SchemaPieceRefOrNamed<'a> {
1549 Piece(&'a SchemaPiece),
1550 Named(usize),
1551}
1552
1553impl<'a> SchemaPieceRefOrNamed<'a> {
1554 pub fn get_human_name(&self, root: &Schema) -> String {
1555 match self {
1556 Self::Piece(piece) => format!("{:?}", piece),
1557 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1558 }
1559 }
1560
1561 #[inline(always)]
1562 pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1563 match self {
1564 SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1565 SchemaPieceRefOrNamed::Named(index) => {
1566 let named_piece = root.lookup(index);
1567 (&named_piece.piece, Some(&named_piece.name))
1568 }
1569 }
1570 }
1571}
1572
1573#[derive(Copy, Clone, Debug)]
1574pub struct SchemaNodeOrNamed<'a> {
1575 pub root: &'a Schema,
1576 pub inner: SchemaPieceRefOrNamed<'a>,
1577}
1578
1579impl<'a> SchemaNodeOrNamed<'a> {
1580 #[inline(always)]
1581 pub fn lookup(self) -> SchemaNode<'a> {
1582 let (inner, name) = self.inner.get_piece_and_name(self.root);
1583 SchemaNode {
1584 root: self.root,
1585 inner,
1586 name,
1587 }
1588 }
1589 #[inline(always)]
1590 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1591 self.step_ref(next.as_ref())
1592 }
1593 #[inline(always)]
1594 pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1595 Self {
1596 root: self.root,
1597 inner: match next {
1598 SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1599 SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1600 },
1601 }
1602 }
1603
1604 pub fn to_schema(self) -> Schema {
1605 let mut cloner = SchemaSubtreeDeepCloner {
1606 old_root: self.root,
1607 old_to_new_names: Default::default(),
1608 named: Default::default(),
1609 };
1610 let piece = cloner.clone_piece_or_named(self.inner);
1611 let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1612 let indices: BTreeMap<FullName, usize> = named
1613 .iter()
1614 .enumerate()
1615 .map(|(i, nsp)| (nsp.name.clone(), i))
1616 .collect();
1617 Schema {
1618 named,
1619 indices,
1620 top: piece,
1621 }
1622 }
1623
1624 pub fn namespace(self) -> Option<&'a str> {
1625 let SchemaNode { name, .. } = self.lookup();
1626 name.map(|FullName { namespace, .. }| namespace.as_str())
1627 }
1628}
1629
1630struct SchemaSubtreeDeepCloner<'a> {
1631 old_root: &'a Schema,
1632 old_to_new_names: BTreeMap<usize, usize>,
1633 named: Vec<Option<NamedSchemaPiece>>,
1634}
1635
1636impl<'a> SchemaSubtreeDeepCloner<'a> {
1637 fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1638 match piece {
1639 SchemaPiece::Null => SchemaPiece::Null,
1640 SchemaPiece::Boolean => SchemaPiece::Boolean,
1641 SchemaPiece::Int => SchemaPiece::Int,
1642 SchemaPiece::Long => SchemaPiece::Long,
1643 SchemaPiece::Float => SchemaPiece::Float,
1644 SchemaPiece::Double => SchemaPiece::Double,
1645 SchemaPiece::Date => SchemaPiece::Date,
1646 SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1647 SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1648 SchemaPiece::Json => SchemaPiece::Json,
1649 SchemaPiece::Decimal {
1650 scale,
1651 precision,
1652 fixed_size,
1653 } => SchemaPiece::Decimal {
1654 scale: *scale,
1655 precision: *precision,
1656 fixed_size: *fixed_size,
1657 },
1658 SchemaPiece::Bytes => SchemaPiece::Bytes,
1659 SchemaPiece::String => SchemaPiece::String,
1660 SchemaPiece::Uuid => SchemaPiece::Uuid,
1661 SchemaPiece::Array(inner) => {
1662 SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1663 }
1664 SchemaPiece::Map(inner) => {
1665 SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1666 }
1667 SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1668 schemas: us
1669 .schemas
1670 .iter()
1671 .map(|s| self.clone_piece_or_named(s.as_ref()))
1672 .collect(),
1673 anon_variant_index: us.anon_variant_index.clone(),
1674 named_variant_index: us.named_variant_index.clone(),
1675 }),
1676 SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1677 SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1678 SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1679 SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1680 SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1681 SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1682 SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1683 SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1684 SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1685 SchemaPiece::ResolveConcreteUnion {
1686 index,
1687 inner,
1688 n_reader_variants,
1689 reader_null_variant,
1690 } => SchemaPiece::ResolveConcreteUnion {
1691 index: *index,
1692 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1693 n_reader_variants: *n_reader_variants,
1694 reader_null_variant: *reader_null_variant,
1695 },
1696 SchemaPiece::ResolveUnionUnion {
1697 permutation,
1698 n_reader_variants,
1699 reader_null_variant,
1700 } => SchemaPiece::ResolveUnionUnion {
1701 permutation: permutation
1702 .clone()
1703 .into_iter()
1704 .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1705 .collect(),
1706 n_reader_variants: *n_reader_variants,
1707 reader_null_variant: *reader_null_variant,
1708 },
1709 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1710 SchemaPiece::ResolveUnionConcrete {
1711 index: *index,
1712 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1713 }
1714 }
1715 SchemaPiece::Record {
1716 doc,
1717 fields,
1718 lookup,
1719 } => SchemaPiece::Record {
1720 doc: doc.clone(),
1721 fields: fields
1722 .iter()
1723 .map(|rf| RecordField {
1724 name: rf.name.clone(),
1725 doc: rf.doc.clone(),
1726 default: rf.default.clone(),
1727 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1728 order: rf.order,
1729 position: rf.position,
1730 })
1731 .collect(),
1732 lookup: lookup.clone(),
1733 },
1734 SchemaPiece::Enum {
1735 doc,
1736 symbols,
1737 default_idx,
1738 } => SchemaPiece::Enum {
1739 doc: doc.clone(),
1740 symbols: symbols.clone(),
1741 default_idx: *default_idx,
1742 },
1743 SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1744 SchemaPiece::ResolveRecord {
1745 defaults,
1746 fields,
1747 n_reader_fields,
1748 } => SchemaPiece::ResolveRecord {
1749 defaults: defaults.clone(),
1750 fields: fields
1751 .iter()
1752 .map(|rf| match rf {
1753 ResolvedRecordField::Present(rf) => {
1754 ResolvedRecordField::Present(RecordField {
1755 name: rf.name.clone(),
1756 doc: rf.doc.clone(),
1757 default: rf.default.clone(),
1758 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1759 order: rf.order,
1760 position: rf.position,
1761 })
1762 }
1763 ResolvedRecordField::Absent(writer_schema) => {
1764 ResolvedRecordField::Absent(writer_schema.clone())
1765 }
1766 })
1767 .collect(),
1768 n_reader_fields: *n_reader_fields,
1769 },
1770 SchemaPiece::ResolveEnum {
1771 doc,
1772 symbols,
1773 default,
1774 } => SchemaPiece::ResolveEnum {
1775 doc: doc.clone(),
1776 symbols: symbols.clone(),
1777 default: default.clone(),
1778 },
1779 }
1780 }
1781 fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1782 match piece {
1783 SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1784 SchemaPieceRefOrNamed::Named(index) => {
1785 let new_index = match self.old_to_new_names.entry(index) {
1786 Entry::Vacant(ve) => {
1787 let new_index = self.named.len();
1788 self.named.push(None);
1789 ve.insert(new_index);
1790 let old_named_piece = self.old_root.lookup(index);
1791 let new_named_piece = NamedSchemaPiece {
1792 name: old_named_piece.name.clone(),
1793 piece: self.clone_piece(&old_named_piece.piece),
1794 };
1795 self.named[new_index] = Some(new_named_piece);
1796 new_index
1797 }
1798 Entry::Occupied(oe) => *oe.get(),
1799 };
1800 SchemaPieceOrNamed::Named(new_index)
1801 }
1802 }
1803 }
1804}
1805
1806impl<'a> SchemaNode<'a> {
1807 #[inline(always)]
1808 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1809 let (inner, name) = next.get_piece_and_name(self.root);
1810 Self {
1811 root: self.root,
1812 inner,
1813 name,
1814 }
1815 }
1816
1817 pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1818 use serde_json::Value::*;
1819 let val = match (json, self.inner) {
1820 (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1822 Some(variant) => AvroValue::Union {
1823 index: 0,
1824 inner: Box::new(self.step(variant).json_to_value(json)?),
1825 n_variants: us.schemas.len(),
1826 null_variant: us
1827 .schemas
1828 .iter()
1829 .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1830 },
1831 None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1832 },
1833 (Null, SchemaPiece::Null) => AvroValue::Null,
1834 (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1835 (Number(n), piece) => {
1836 match piece {
1837 piece if piece.is_underlying_int() => {
1838 let i =
1839 n.as_i64()
1840 .and_then(|i| i32::try_from(i).ok())
1841 .ok_or_else(|| {
1842 ParseSchemaError(format!("{} is not a 32-bit integer", n))
1843 })?;
1844 piece.try_make_int_value(i).unwrap().map_err(|e| {
1845 ParseSchemaError(format!("invalid default int {i}: {e}"))
1846 })?
1847 }
1848 piece if piece.is_underlying_long() => {
1849 let i = n.as_i64().ok_or_else(|| {
1850 ParseSchemaError(format!("{} is not a 64-bit integer", n))
1851 })?;
1852 piece.try_make_long_value(i).unwrap().map_err(|e| {
1853 ParseSchemaError(format!("invalid default long {i}: {e}"))
1854 })?
1855 }
1856 SchemaPiece::Float => {
1857 let f = n.as_f64().ok_or_else(|| {
1858 ParseSchemaError(format!("{} is not a 32-bit float", n))
1859 })?;
1860 AvroValue::Float(f as f32)
1861 }
1862 SchemaPiece::Double => {
1863 let f = n.as_f64().ok_or_else(|| {
1864 ParseSchemaError(format!("{} is not a 64-bit float", n))
1865 })?;
1866 AvroValue::Double(f)
1867 }
1868 _ => {
1869 return Err(ParseSchemaError(format!(
1870 "Unexpected number in default: {}",
1871 n
1872 )));
1873 }
1874 }
1875 }
1876 (String(s), piece)
1877 if s.eq_ignore_ascii_case("nan")
1878 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1879 {
1880 match piece {
1881 SchemaPiece::Float => AvroValue::Float(f32::NAN),
1882 SchemaPiece::Double => AvroValue::Double(f64::NAN),
1883 _ => unreachable!(),
1884 }
1885 }
1886 (String(s), piece)
1887 if s.eq_ignore_ascii_case("infinity")
1888 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1889 {
1890 match piece {
1891 SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1892 SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1893 _ => unreachable!(),
1894 }
1895 }
1896 (String(s), piece)
1897 if s.eq_ignore_ascii_case("-infinity")
1898 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1899 {
1900 match piece {
1901 SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1902 SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1903 _ => unreachable!(),
1904 }
1905 }
1906 (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1907 (
1908 String(s),
1909 SchemaPiece::Decimal {
1910 precision, scale, ..
1911 },
1912 ) => AvroValue::Decimal(DecimalValue {
1913 precision: *precision,
1914 scale: *scale,
1915 unscaled: s.clone().into_bytes(),
1916 }),
1917 (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1918 (Object(map), SchemaPiece::Record { fields, .. }) => {
1919 let field_values = fields
1920 .iter()
1921 .map(|rf| {
1922 let jval = map.get(&rf.name).ok_or_else(|| {
1923 ParseSchemaError(format!(
1924 "Field not found in default value: {}",
1925 rf.name
1926 ))
1927 })?;
1928 let value = self.step(&rf.schema).json_to_value(jval)?;
1929 Ok((rf.name.clone(), value))
1930 })
1931 .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1932 AvroValue::Record(field_values)
1933 }
1934 (String(s), SchemaPiece::Enum { symbols, .. }) => {
1935 match symbols.iter().find_position(|sym| s == *sym) {
1936 Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1937 None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1938 }
1939 }
1940 (Array(vals), SchemaPiece::Array(inner)) => {
1941 let node = self.step(&**inner);
1942 let vals = vals
1943 .iter()
1944 .map(|val| node.json_to_value(val))
1945 .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1946 AvroValue::Array(vals)
1947 }
1948 (Object(map), SchemaPiece::Map(inner)) => {
1949 let node = self.step(&**inner);
1950 let map = map
1951 .iter()
1952 .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1953 .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1954 AvroValue::Map(map)
1955 }
1956 (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1957 AvroValue::Fixed(*size, s.clone().into_bytes())
1958 }
1959 _ => {
1960 return Err(ParseSchemaError(format!(
1961 "Json default value {} does not match schema",
1962 json
1963 )));
1964 }
1965 };
1966 Ok(val)
1967 }
1968}
1969
1970#[derive(Clone)]
1971struct SchemaSerContext<'a> {
1972 node: SchemaNodeOrNamed<'a>,
1973 seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1978 enclosing_ns: &'a str,
1980}
1981
1982#[derive(Clone)]
1983struct RecordFieldSerContext<'a> {
1984 outer: &'a SchemaSerContext<'a>,
1985 inner: &'a RecordField,
1986}
1987
1988impl<'a> SchemaSerContext<'a> {
1989 fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1990 let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1991 Self {
1992 node: self.node.step_ref(next),
1993 seen_named: Rc::clone(&self.seen_named),
1994 enclosing_ns: ns,
1995 }
1996 }
1997}
1998
1999impl<'a> Serialize for SchemaSerContext<'a> {
2000 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2001 where
2002 S: Serializer,
2003 {
2004 match self.node.inner {
2005 SchemaPieceRefOrNamed::Piece(piece) => match piece {
2006 SchemaPiece::Null => serializer.serialize_str("null"),
2007 SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2008 SchemaPiece::Int => serializer.serialize_str("int"),
2009 SchemaPiece::Long => serializer.serialize_str("long"),
2010 SchemaPiece::Float => serializer.serialize_str("float"),
2011 SchemaPiece::Double => serializer.serialize_str("double"),
2012 SchemaPiece::Date => {
2013 let mut map = serializer.serialize_map(Some(2))?;
2014 map.serialize_entry("type", "int")?;
2015 map.serialize_entry("logicalType", "date")?;
2016 map.end()
2017 }
2018 SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2019 let mut map = serializer.serialize_map(Some(2))?;
2020 map.serialize_entry("type", "long")?;
2021 if piece == &SchemaPiece::TimestampMilli {
2022 map.serialize_entry("logicalType", "timestamp-millis")?;
2023 } else {
2024 map.serialize_entry("logicalType", "timestamp-micros")?;
2025 }
2026 map.end()
2027 }
2028 SchemaPiece::Decimal {
2029 precision,
2030 scale,
2031 fixed_size: None,
2032 } => {
2033 let mut map = serializer.serialize_map(Some(4))?;
2034 map.serialize_entry("type", "bytes")?;
2035 map.serialize_entry("precision", precision)?;
2036 map.serialize_entry("scale", scale)?;
2037 map.serialize_entry("logicalType", "decimal")?;
2038 map.end()
2039 }
2040 SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2041 SchemaPiece::String => serializer.serialize_str("string"),
2042 SchemaPiece::Array(inner) => {
2043 let mut map = serializer.serialize_map(Some(2))?;
2044 map.serialize_entry("type", "array")?;
2045 map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2046 map.end()
2047 }
2048 SchemaPiece::Map(inner) => {
2049 let mut map = serializer.serialize_map(Some(2))?;
2050 map.serialize_entry("type", "map")?;
2051 map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2052 map.end()
2053 }
2054 SchemaPiece::Union(inner) => {
2055 let variants = inner.variants();
2056 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2057 for v in variants {
2058 seq.serialize_element(&self.step(v.as_ref()))?;
2059 }
2060 seq.end()
2061 }
2062 SchemaPiece::Json => {
2063 let mut map = serializer.serialize_map(Some(2))?;
2064 map.serialize_entry("type", "string")?;
2065 map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2066 map.end()
2067 }
2068 SchemaPiece::Uuid => {
2069 let mut map = serializer.serialize_map(Some(4))?;
2070 map.serialize_entry("type", "string")?;
2071 map.serialize_entry("logicalType", "uuid")?;
2072 map.end()
2073 }
2074 SchemaPiece::Record { .. }
2075 | SchemaPiece::Decimal {
2076 fixed_size: Some(_),
2077 ..
2078 }
2079 | SchemaPiece::Enum { .. }
2080 | SchemaPiece::Fixed { .. } => {
2081 unreachable!("Unexpected named schema piece in anonymous schema position")
2082 }
2083 SchemaPiece::ResolveIntLong
2084 | SchemaPiece::ResolveDateTimestamp
2085 | SchemaPiece::ResolveIntFloat
2086 | SchemaPiece::ResolveIntDouble
2087 | SchemaPiece::ResolveLongFloat
2088 | SchemaPiece::ResolveLongDouble
2089 | SchemaPiece::ResolveFloatDouble
2090 | SchemaPiece::ResolveConcreteUnion { .. }
2091 | SchemaPiece::ResolveUnionUnion { .. }
2092 | SchemaPiece::ResolveUnionConcrete { .. }
2093 | SchemaPiece::ResolveRecord { .. }
2094 | SchemaPiece::ResolveIntTsMicro
2095 | SchemaPiece::ResolveIntTsMilli
2096 | SchemaPiece::ResolveEnum { .. } => {
2097 panic!("Attempted to serialize resolved schema")
2098 }
2099 },
2100 SchemaPieceRefOrNamed::Named(index) => {
2101 let mut map = self.seen_named.borrow_mut();
2102 let named_piece = match map.get(&index) {
2103 Some(name) => {
2104 return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2105 }
2106 None => self.node.root.lookup(index),
2107 };
2108 let name = &named_piece.name;
2109 map.insert(index, name.clone());
2110 std::mem::drop(map);
2111 match &named_piece.piece {
2112 SchemaPiece::Record { doc, fields, .. } => {
2113 let mut map = serializer.serialize_map(None)?;
2114 map.serialize_entry("type", "record")?;
2115 map.serialize_entry("name", &name.name)?;
2116 if self.enclosing_ns != &name.namespace {
2117 map.serialize_entry("namespace", &name.namespace)?;
2118 }
2119 if let Some(docstr) = doc {
2120 map.serialize_entry("doc", docstr)?;
2121 }
2122 map.serialize_entry(
2124 "fields",
2125 &fields
2126 .iter()
2127 .map(|f| RecordFieldSerContext {
2128 outer: self,
2129 inner: f,
2130 })
2131 .collect::<Vec<_>>(),
2132 )?;
2133 map.end()
2134 }
2135 SchemaPiece::Enum {
2136 symbols,
2137 default_idx,
2138 ..
2139 } => {
2140 let mut map = serializer.serialize_map(None)?;
2141 map.serialize_entry("type", "enum")?;
2142 map.serialize_entry("name", &name.name)?;
2143 if self.enclosing_ns != &name.namespace {
2144 map.serialize_entry("namespace", &name.namespace)?;
2145 }
2146 map.serialize_entry("symbols", symbols)?;
2147 if let Some(default_idx) = *default_idx {
2148 assert!(default_idx < symbols.len());
2149 map.serialize_entry("default", &symbols[default_idx])?;
2150 }
2151 map.end()
2152 }
2153 SchemaPiece::Fixed { size } => {
2154 let mut map = serializer.serialize_map(None)?;
2155 map.serialize_entry("type", "fixed")?;
2156 map.serialize_entry("name", &name.name)?;
2157 if self.enclosing_ns != &name.namespace {
2158 map.serialize_entry("namespace", &name.namespace)?;
2159 }
2160 map.serialize_entry("size", size)?;
2161 map.end()
2162 }
2163 SchemaPiece::Decimal {
2164 scale,
2165 precision,
2166 fixed_size: Some(size),
2167 } => {
2168 let mut map = serializer.serialize_map(Some(6))?;
2169 map.serialize_entry("type", "fixed")?;
2170 map.serialize_entry("logicalType", "decimal")?;
2171 map.serialize_entry("name", &name.name)?;
2172 if self.enclosing_ns != &name.namespace {
2173 map.serialize_entry("namespace", &name.namespace)?;
2174 }
2175 map.serialize_entry("size", size)?;
2176 map.serialize_entry("precision", precision)?;
2177 map.serialize_entry("scale", scale)?;
2178 map.end()
2179 }
2180 SchemaPiece::Null
2181 | SchemaPiece::Boolean
2182 | SchemaPiece::Int
2183 | SchemaPiece::Long
2184 | SchemaPiece::Float
2185 | SchemaPiece::Double
2186 | SchemaPiece::Date
2187 | SchemaPiece::TimestampMilli
2188 | SchemaPiece::TimestampMicro
2189 | SchemaPiece::Decimal {
2190 fixed_size: None, ..
2191 }
2192 | SchemaPiece::Bytes
2193 | SchemaPiece::String
2194 | SchemaPiece::Array(_)
2195 | SchemaPiece::Map(_)
2196 | SchemaPiece::Union(_)
2197 | SchemaPiece::Uuid
2198 | SchemaPiece::Json => {
2199 unreachable!("Unexpected anonymous schema piece in named schema position")
2200 }
2201 SchemaPiece::ResolveIntLong
2202 | SchemaPiece::ResolveDateTimestamp
2203 | SchemaPiece::ResolveIntFloat
2204 | SchemaPiece::ResolveIntDouble
2205 | SchemaPiece::ResolveLongFloat
2206 | SchemaPiece::ResolveLongDouble
2207 | SchemaPiece::ResolveFloatDouble
2208 | SchemaPiece::ResolveConcreteUnion { .. }
2209 | SchemaPiece::ResolveUnionUnion { .. }
2210 | SchemaPiece::ResolveUnionConcrete { .. }
2211 | SchemaPiece::ResolveRecord { .. }
2212 | SchemaPiece::ResolveIntTsMilli
2213 | SchemaPiece::ResolveIntTsMicro
2214 | SchemaPiece::ResolveEnum { .. } => {
2215 panic!("Attempted to serialize resolved schema")
2216 }
2217 }
2218 }
2219 }
2220 }
2221}
2222
2223impl Serialize for Schema {
2224 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2225 where
2226 S: Serializer,
2227 {
2228 let ctx = SchemaSerContext {
2229 node: SchemaNodeOrNamed {
2230 root: self,
2231 inner: self.top.as_ref(),
2232 },
2233 seen_named: Rc::new(RefCell::new(Default::default())),
2234 enclosing_ns: "",
2235 };
2236 ctx.serialize(serializer)
2237 }
2238}
2239
2240impl<'a> Serialize for RecordFieldSerContext<'a> {
2241 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2242 where
2243 S: Serializer,
2244 {
2245 let mut map = serializer.serialize_map(None)?;
2246 map.serialize_entry("name", &self.inner.name)?;
2247 map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2248 if let Some(default) = &self.inner.default {
2249 map.serialize_entry("default", default)?;
2250 }
2251 if let Some(doc) = &self.inner.doc {
2252 map.serialize_entry("doc", doc)?;
2253 }
2254 map.end()
2255 }
2256}
2257
2258fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2261 pcf(schema, "", false)
2262}
2263
2264fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2265 match schema {
2266 serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2267 serde_json::Value::String(s) => pcf_string(s),
2268 serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2269 serde_json::Value::Number(n) => n.to_string(),
2270 _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2271 }
2272}
2273
2274fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2275 let default_ns = schema
2277 .get("namespace")
2278 .and_then(|v| v.as_str())
2279 .unwrap_or(enclosing_ns);
2280 let mut fields = Vec::new();
2281 let mut found_next_ns = None;
2282 let mut deferred_values = vec![];
2283 for (k, v) in schema {
2284 if schema.len() == 1 && k == "type" {
2286 if let serde_json::Value::String(s) = v {
2288 return pcf_string(s);
2289 }
2290 }
2291
2292 if field_ordering_position(k).is_none() {
2294 continue;
2295 }
2296
2297 if k == "name" {
2299 if in_fields {
2302 fields.push((
2303 k,
2304 format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2305 ));
2306 continue;
2307 }
2308 let name = v.as_str().unwrap();
2310 assert!(
2311 found_next_ns.is_none(),
2312 "`name` must not be specified multiple times"
2313 );
2314 let next_ns = match name.rsplit_once('.') {
2315 None => default_ns,
2316 Some((ns, _name)) => ns,
2317 };
2318 found_next_ns = Some(next_ns);
2319 let n = if next_ns.is_empty() {
2320 Cow::Borrowed(name)
2321 } else {
2322 Cow::Owned(format!("{next_ns}.{name}"))
2323 };
2324 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2325 continue;
2326 }
2327
2328 if k == "size" {
2330 let i = match v.as_str() {
2331 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2332 None => v.as_i64().unwrap(),
2333 };
2334 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2335 continue;
2336 }
2337
2338 deferred_values.push((k, v));
2341 }
2342
2343 let next_ns = found_next_ns.unwrap_or(default_ns);
2344 for (k, v) in deferred_values {
2345 fields.push((
2346 k,
2347 format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2348 ));
2349 }
2350
2351 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2353 let inter = fields
2354 .into_iter()
2355 .map(|(_, v)| v)
2356 .collect::<Vec<_>>()
2357 .join(",");
2358 format!("{{{}}}", inter)
2359}
2360
2361fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2362 let inter = arr
2363 .iter()
2364 .map(|s| pcf(s, enclosing_ns, in_fields))
2365 .collect::<Vec<String>>()
2366 .join(",");
2367 format!("[{}]", inter)
2368}
2369
2370fn pcf_string(s: &str) -> String {
2371 format!("\"{}\"", s)
2372}
2373
2374fn field_ordering_position(field: &str) -> Option<usize> {
2376 let v = match field {
2377 "name" => 1,
2378 "type" => 2,
2379 "fields" => 3,
2380 "symbols" => 4,
2381 "items" => 5,
2382 "values" => 6,
2383 "size" => 7,
2384 _ => return None,
2385 };
2386
2387 Some(v)
2388}
2389
2390#[cfg(test)]
2391mod tests {
2392 use mz_ore::{assert_err, assert_ok};
2393
2394 use crate::types::{Record, ToAvro};
2395
2396 use super::*;
2397
2398 fn check_schema(schema: &str, expected: SchemaPiece) {
2399 let schema = Schema::from_str(schema).unwrap();
2400 assert_eq!(&expected, schema.top_node().inner);
2401
2402 let schema = serde_json::to_string(&schema).unwrap();
2404 let schema = Schema::from_str(&schema).unwrap();
2405 assert_eq!(&expected, schema.top_node().inner);
2406 }
2407
2408 #[mz_ore::test]
2409 fn test_primitive_schema() {
2410 check_schema("\"null\"", SchemaPiece::Null);
2411 check_schema("\"int\"", SchemaPiece::Int);
2412 check_schema("\"double\"", SchemaPiece::Double);
2413 }
2414
2415 #[mz_ore::test]
2416 fn test_array_schema() {
2417 check_schema(
2418 r#"{"type": "array", "items": "string"}"#,
2419 SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2420 );
2421 }
2422
2423 #[mz_ore::test]
2424 fn test_map_schema() {
2425 check_schema(
2426 r#"{"type": "map", "values": "double"}"#,
2427 SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2428 );
2429 }
2430
2431 #[mz_ore::test]
2432 fn test_union_schema() {
2433 check_schema(
2434 r#"["null", "int"]"#,
2435 SchemaPiece::Union(
2436 UnionSchema::new(vec![
2437 SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2438 SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2439 ])
2440 .unwrap(),
2441 ),
2442 );
2443 }
2444
2445 #[mz_ore::test]
2446 fn test_multi_union_schema() {
2447 let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2448 assert_ok!(schema);
2449 let schema = schema.unwrap();
2450 let node = schema.top_node();
2451 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2452 let union_schema = match node.inner {
2453 SchemaPiece::Union(u) => u,
2454 _ => unreachable!(),
2455 };
2456 assert_eq!(union_schema.variants().len(), 5);
2457 let mut variants = union_schema.variants().iter();
2458 assert_eq!(
2459 SchemaKind::from(node.step(variants.next().unwrap())),
2460 SchemaKind::Null
2461 );
2462 assert_eq!(
2463 SchemaKind::from(node.step(variants.next().unwrap())),
2464 SchemaKind::Int
2465 );
2466 assert_eq!(
2467 SchemaKind::from(node.step(variants.next().unwrap())),
2468 SchemaKind::Float
2469 );
2470 assert_eq!(
2471 SchemaKind::from(node.step(variants.next().unwrap())),
2472 SchemaKind::String
2473 );
2474 assert_eq!(
2475 SchemaKind::from(node.step(variants.next().unwrap())),
2476 SchemaKind::Bytes
2477 );
2478 assert_eq!(variants.next(), None);
2479 }
2480
2481 #[mz_ore::test]
2482 fn test_record_schema() {
2483 let schema = r#"
2484 {
2485 "type": "record",
2486 "name": "test",
2487 "doc": "record doc",
2488 "fields": [
2489 {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2490 {"name": "b", "doc": "b doc", "type": "string"}
2491 ]
2492 }
2493 "#;
2494
2495 let mut lookup = BTreeMap::new();
2496 lookup.insert("a".to_owned(), 0);
2497 lookup.insert("b".to_owned(), 1);
2498
2499 let expected = SchemaPiece::Record {
2500 doc: Some("record doc".to_string()),
2501 fields: vec![
2502 RecordField {
2503 name: "a".to_string(),
2504 doc: Some("a doc".to_string()),
2505 default: Some(Value::Number(42i64.into())),
2506 schema: SchemaPiece::Long.into(),
2507 order: RecordFieldOrder::Ascending,
2508 position: 0,
2509 },
2510 RecordField {
2511 name: "b".to_string(),
2512 doc: Some("b doc".to_string()),
2513 default: None,
2514 schema: SchemaPiece::String.into(),
2515 order: RecordFieldOrder::Ascending,
2516 position: 1,
2517 },
2518 ],
2519 lookup,
2520 };
2521
2522 check_schema(schema, expected);
2523 }
2524
2525 #[mz_ore::test]
2526 fn test_enum_schema() {
2527 let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2528
2529 let expected = SchemaPiece::Enum {
2530 doc: None,
2531 symbols: vec![
2532 "diamonds".to_owned(),
2533 "spades".to_owned(),
2534 "jokers".to_owned(),
2535 "clubs".to_owned(),
2536 "hearts".to_owned(),
2537 ],
2538 default_idx: Some(2),
2539 };
2540
2541 check_schema(schema, expected);
2542
2543 let bad_schema = Schema::from_str(
2544 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2545 );
2546
2547 assert_err!(bad_schema);
2548 }
2549
2550 #[mz_ore::test]
2551 fn test_fixed_schema() {
2552 let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2553
2554 let expected = SchemaPiece::Fixed { size: 16usize };
2555
2556 check_schema(schema, expected);
2557 }
2558
2559 #[mz_ore::test]
2560 fn test_date_schema() {
2561 let kinds = &[
2562 r#"{
2563 "type": "int",
2564 "name": "datish",
2565 "logicalType": "date"
2566 }"#,
2567 r#"{
2568 "type": "int",
2569 "name": "datish",
2570 "connect.name": "io.debezium.time.Date"
2571 }"#,
2572 r#"{
2573 "type": "int",
2574 "name": "datish",
2575 "connect.name": "org.apache.kafka.connect.data.Date"
2576 }"#,
2577 ];
2578 for kind in kinds {
2579 check_schema(*kind, SchemaPiece::Date);
2580
2581 let schema = Schema::from_str(*kind).unwrap();
2582 assert_eq!(
2583 serde_json::to_string(&schema).unwrap(),
2584 r#"{"type":"int","logicalType":"date"}"#
2585 );
2586 }
2587 }
2588
2589 #[mz_ore::test]
2590 fn new_field_in_middle() {
2591 let reader = r#"{
2592 "type": "record",
2593 "name": "MyRecord",
2594 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2595 }"#;
2596 let writer = r#"{
2597 "type": "record",
2598 "name": "MyRecord",
2599 "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2600 }"#;
2601 let reader = Schema::from_str(reader).unwrap();
2602 let writer = Schema::from_str(writer).unwrap();
2603
2604 let mut record = Record::new(writer.top_node()).unwrap();
2605 record.put("f1", 1);
2606 record.put("f2", 2);
2607 record.put("f_interposed", 42);
2608
2609 let value = record.avro();
2610
2611 let mut buf = vec![];
2612 crate::encode::encode(&value, &writer, &mut buf);
2613
2614 let resolved = resolve_schemas(&writer, &reader).unwrap();
2615
2616 let reader = &mut &buf[..];
2617 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2618 let expected = crate::types::Value::Record(vec![
2619 ("f1".to_string(), crate::types::Value::Int(1)),
2620 ("f2".to_string(), crate::types::Value::Int(2)),
2621 ]);
2622 assert_eq!(reader_value, expected);
2623 assert!(reader.is_empty()); }
2625
2626 #[mz_ore::test]
2627 fn new_field_at_end() {
2628 let reader = r#"{
2629 "type": "record",
2630 "name": "MyRecord",
2631 "fields": [{"name": "f1", "type": "int"}]
2632 }"#;
2633 let writer = r#"{
2634 "type": "record",
2635 "name": "MyRecord",
2636 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2637 }"#;
2638 let reader = Schema::from_str(reader).unwrap();
2639 let writer = Schema::from_str(writer).unwrap();
2640
2641 let mut record = Record::new(writer.top_node()).unwrap();
2642 record.put("f1", 1);
2643 record.put("f2", 2);
2644
2645 let value = record.avro();
2646
2647 let mut buf = vec![];
2648 crate::encode::encode(&value, &writer, &mut buf);
2649
2650 let resolved = resolve_schemas(&writer, &reader).unwrap();
2651
2652 let reader = &mut &buf[..];
2653 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2654 let expected =
2655 crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2656 assert_eq!(reader_value, expected);
2657 assert!(reader.is_empty()); }
2659
2660 #[mz_ore::test]
2661 fn default_non_nums() {
2662 let reader = r#"{
2663 "type": "record",
2664 "name": "MyRecord",
2665 "fields": [
2666 {"name": "f1", "type": "double", "default": "NaN"},
2667 {"name": "f2", "type": "double", "default": "Infinity"},
2668 {"name": "f3", "type": "double", "default": "-Infinity"}
2669 ]
2670 }
2671 "#;
2672 let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2673
2674 let writer_schema = Schema::from_str(writer).unwrap();
2675 let reader_schema = Schema::from_str(reader).unwrap();
2676 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2677
2678 let record = Record::new(writer_schema.top_node()).unwrap();
2679
2680 let value = record.avro();
2681 let mut buf = vec![];
2682 crate::encode::encode(&value, &writer_schema, &mut buf);
2683
2684 let reader = &mut &buf[..];
2685 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2686 let expected = crate::types::Value::Record(vec![
2687 ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2688 ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2689 (
2690 "f3".to_string(),
2691 crate::types::Value::Double(f64::NEG_INFINITY),
2692 ),
2693 ]);
2694
2695 #[derive(Debug)]
2696 struct NanEq(crate::types::Value);
2697 impl std::cmp::PartialEq for NanEq {
2698 fn eq(&self, other: &Self) -> bool {
2699 match (self, other) {
2700 (
2701 NanEq(crate::types::Value::Double(x)),
2702 NanEq(crate::types::Value::Double(y)),
2703 ) if x.is_nan() && y.is_nan() => true,
2704 (
2705 NanEq(crate::types::Value::Float(x)),
2706 NanEq(crate::types::Value::Float(y)),
2707 ) if x.is_nan() && y.is_nan() => true,
2708 (
2709 NanEq(crate::types::Value::Record(xs)),
2710 NanEq(crate::types::Value::Record(ys)),
2711 ) => {
2712 let xs = xs
2713 .iter()
2714 .cloned()
2715 .map(|(k, v)| (k, NanEq(v)))
2716 .collect::<Vec<_>>();
2717 let ys = ys
2718 .iter()
2719 .cloned()
2720 .map(|(k, v)| (k, NanEq(v)))
2721 .collect::<Vec<_>>();
2722
2723 xs == ys
2724 }
2725 (NanEq(x), NanEq(y)) => x == y,
2726 }
2727 }
2728 }
2729
2730 assert_eq!(NanEq(reader_value), NanEq(expected));
2731 assert!(reader.is_empty());
2732 }
2733
2734 #[mz_ore::test]
2735 fn test_decimal_schemas() {
2736 let schema = r#"{
2737 "type": "fixed",
2738 "name": "dec",
2739 "size": 8,
2740 "logicalType": "decimal",
2741 "precision": 12,
2742 "scale": 5
2743 }"#;
2744 let expected = SchemaPiece::Decimal {
2745 precision: 12,
2746 scale: 5,
2747 fixed_size: Some(8),
2748 };
2749 check_schema(schema, expected);
2750
2751 let schema = r#"{
2752 "type": "bytes",
2753 "logicalType": "decimal",
2754 "precision": 12,
2755 "scale": 5
2756 }"#;
2757 let expected = SchemaPiece::Decimal {
2758 precision: 12,
2759 scale: 5,
2760 fixed_size: None,
2761 };
2762 check_schema(schema, expected);
2763
2764 let res = Schema::from_str(
2765 r#"["bytes", {
2766 "type": "bytes",
2767 "logicalType": "decimal",
2768 "precision": 12,
2769 "scale": 5
2770 }]"#,
2771 );
2772 assert_eq!(
2773 res.unwrap_err().to_string(),
2774 "Schema parse error: Unions cannot contain duplicate types"
2775 );
2776
2777 let writer_schema = Schema::from_str(
2778 r#"["null", {
2779 "type": "bytes"
2780 }]"#,
2781 )
2782 .unwrap();
2783 let reader_schema = Schema::from_str(
2784 r#"["null", {
2785 "type": "bytes",
2786 "logicalType": "decimal",
2787 "precision": 12,
2788 "scale": 5
2789 }]"#,
2790 )
2791 .unwrap();
2792 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2793
2794 let expected = SchemaPiece::ResolveUnionUnion {
2795 permutation: vec![
2796 Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2797 Ok((
2798 1,
2799 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2800 precision: 12,
2801 scale: 5,
2802 fixed_size: None,
2803 }),
2804 )),
2805 ],
2806 n_reader_variants: 2,
2807 reader_null_variant: Some(0),
2808 };
2809 assert_eq!(resolved.top_node().inner, &expected);
2810 }
2811
2812 #[mz_ore::test]
2813 fn test_no_documentation() {
2814 let schema =
2815 Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2816 .unwrap();
2817
2818 let doc = match schema.top_node().inner {
2819 SchemaPiece::Enum { doc, .. } => doc.clone(),
2820 _ => panic!(),
2821 };
2822
2823 assert_none!(doc);
2824 }
2825
2826 #[mz_ore::test]
2827 fn test_documentation() {
2828 let schema = Schema::from_str(
2829 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2830 ).unwrap();
2831
2832 let doc = match schema.top_node().inner {
2833 SchemaPiece::Enum { doc, .. } => doc.clone(),
2834 _ => None,
2835 };
2836
2837 assert_eq!("Some documentation".to_owned(), doc.unwrap());
2838 }
2839
2840 #[mz_ore::test]
2841 fn test_namespaces_and_names() {
2842 let schema = Schema::from_str(
2844 r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2845 )
2846 .unwrap();
2847 assert_eq!(schema.named.len(), 1);
2848 assert_eq!(
2849 schema.named[0].name,
2850 FullName {
2851 name: "name".into(),
2852 namespace: "namespace".into()
2853 }
2854 );
2855
2856 let schema =
2858 Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2859 .unwrap();
2860 assert_eq!(schema.named.len(), 1);
2861 assert_eq!(
2862 schema.named[0].name,
2863 FullName {
2864 name: "dots".into(),
2865 namespace: "name.has".into()
2866 }
2867 );
2868
2869 let schema = Schema::from_str(
2871 r#"{"type": "enum", "namespace": "namespace",
2872 "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2873 )
2874 .unwrap();
2875 assert_eq!(schema.named.len(), 1);
2876 assert_eq!(
2877 schema.named[0].name,
2878 FullName {
2879 name: "dots".into(),
2880 namespace: "name.has".into()
2881 }
2882 );
2883
2884 let schema = Schema::from_str(
2887 r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2888 "fields": [{"name": "name", "type": "string"}]}"#,
2889 )
2890 .unwrap();
2891 assert_eq!(schema.named.len(), 1);
2892 assert_eq!(
2893 schema.named[0].name,
2894 FullName {
2895 name: "TestDoc".into(),
2896 namespace: "".into()
2897 }
2898 );
2899
2900 let schema = Schema::from_str(
2902 r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2903 "fields": [{"name": "name", "type": "string"}]}"#,
2904 )
2905 .unwrap();
2906 assert_eq!(schema.named.len(), 1);
2907 assert_eq!(
2908 schema.named[0].name,
2909 FullName {
2910 name: "TestDoc".into(),
2911 namespace: "".into()
2912 }
2913 );
2914
2915 let first = Schema::from_str(
2917 r#"{"type": "fixed", "namespace": "namespace",
2918 "name": "name", "size": 1}"#,
2919 )
2920 .unwrap();
2921 let second = Schema::from_str(
2922 r#"{"type": "fixed", "name": "namespace.name",
2923 "size": 1}"#,
2924 )
2925 .unwrap();
2926 assert_eq!(first.named[0].name, second.named[0].name);
2927
2928 let first = Schema::from_str(
2929 r#"{"type": "fixed", "namespace": "namespace",
2930 "name": "name", "size": 1}"#,
2931 )
2932 .unwrap();
2933 let second = Schema::from_str(
2934 r#"{"type": "fixed", "name": "namespace.Name",
2935 "size": 1}"#,
2936 )
2937 .unwrap();
2938 assert_ne!(first.named[0].name, second.named[0].name);
2939
2940 let first = Schema::from_str(
2941 r#"{"type": "fixed", "namespace": "Namespace",
2942 "name": "name", "size": 1}"#,
2943 )
2944 .unwrap();
2945 let second = Schema::from_str(
2946 r#"{"type": "fixed", "namespace": "namespace",
2947 "name": "name", "size": 1}"#,
2948 )
2949 .unwrap();
2950 assert_ne!(first.named[0].name, second.named[0].name);
2951
2952 assert!(
2955 Schema::from_str(
2956 r#"{"type": "record", "name": "99 problems but a name aint one",
2957 "fields": [{"name": "name", "type": "string"}]}"#
2958 )
2959 .is_err()
2960 );
2961
2962 assert!(
2963 Schema::from_str(
2964 r#"{"type": "record", "name": "!!!",
2965 "fields": [{"name": "name", "type": "string"}]}"#
2966 )
2967 .is_err()
2968 );
2969
2970 assert!(
2971 Schema::from_str(
2972 r#"{"type": "record", "name": "_valid_until_©",
2973 "fields": [{"name": "name", "type": "string"}]}"#
2974 )
2975 .is_err()
2976 );
2977
2978 let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2980 {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2981 {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2982 {"name": "f3", "type": "MyEnum"},
2983 {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2984 {"name": "f5", "type": "other.namespace.OtherEnum"},
2985 {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2986 {"name": "f7", "type": "some.other.ThirdEnum"}
2987 ]}"#).unwrap();
2988 assert_eq!(schema.named.len(), 4);
2989
2990 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2991 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
2999 panic!("Expected SchemaPiece::Record, found something else");
3000 }
3001
3002 let schema = Schema::from_str(
3003 r#"{"type": "record", "name": "x.Y", "fields": [
3004 {"name": "e", "type":
3005 {"type": "record", "name": "Z", "fields": [
3006 {"name": "f", "type": "x.Y"},
3007 {"name": "g", "type": "x.Z"}
3008 ]}
3009 }
3010 ]}"#,
3011 )
3012 .unwrap();
3013 assert_eq!(schema.named.len(), 2);
3014
3015 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3016 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
3018 panic!("Expected SchemaPiece::Record, found something else");
3019 }
3020
3021 if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3022 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
3025 panic!("Expected SchemaPiece::Record, found something else");
3026 }
3027
3028 let schema = Schema::from_str(
3029 r#"{"type": "record", "name": "R", "fields": [
3030 {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3031 {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3032 "symbols": ["Foo", "Bar"]}
3033 }
3034 ]}},
3035 {"name": "t", "type": "Z"}
3036 ]}"#,
3037 )
3038 .unwrap();
3039 assert_eq!(schema.named.len(), 3);
3040
3041 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3042 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
3045 panic!("Expected SchemaPiece::Record, found something else");
3046 }
3047 }
3048
3049 #[mz_ore::test]
3052 fn test_schema_is_send() {
3053 fn send<S: Send>(_s: S) {}
3054
3055 let schema = Schema {
3056 named: vec![],
3057 indices: Default::default(),
3058 top: SchemaPiece::Null.into(),
3059 };
3060 send(schema);
3061 }
3062
3063 #[mz_ore::test]
3064 fn test_schema_is_sync() {
3065 fn sync<S: Sync>(_s: S) {}
3066
3067 let schema = Schema {
3068 named: vec![],
3069 indices: Default::default(),
3070 top: SchemaPiece::Null.into(),
3071 };
3072 sync(&schema);
3073 sync(schema);
3074 }
3075
3076 #[mz_ore::test]
3077 #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
3079 use sha2::Sha256;
3080
3081 let raw_schema = r#"
3082 {
3083 "type": "record",
3084 "name": "test",
3085 "fields": [
3086 {"name": "a", "type": "long", "default": 42},
3087 {"name": "b", "type": "string"}
3088 ]
3089 }
3090 "#;
3091 let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3092 let schema = Schema::from_str(raw_schema).unwrap();
3093 assert_eq!(&schema.canonical_form(), expected_canonical);
3094 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3095 assert_eq!(
3096 format!("{}", schema.fingerprint::<Sha256>()),
3097 expected_fingerprint
3098 );
3099
3100 let raw_schema = r#"
3101{
3102 "type": "record",
3103 "name": "ns.r1",
3104 "namespace": "ignored",
3105 "fields": [
3106 {
3107 "name": "f1",
3108 "type": {
3109 "type": "fixed",
3110 "name": "r2",
3111 "size": 1
3112 }
3113 }
3114 ]
3115}
3116"#;
3117 let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3118 let schema = Schema::from_str(raw_schema).unwrap();
3119 assert_eq!(&schema.canonical_form(), expected_canonical);
3120 let expected_fingerprint = format!("{:02x}", Sha256::digest(expected_canonical));
3121 assert_eq!(
3122 format!("{}", schema.fingerprint::<Sha256>()),
3123 expected_fingerprint
3124 );
3125 }
3126
3127 #[mz_ore::test]
3128 fn test_make_valid() {
3129 for (input, expected) in [
3130 ("foo", "foo"),
3131 ("az99", "az99"),
3132 ("99az", "_99az"),
3133 ("is,bad", "is_bad"),
3134 ("@#$%", "____"),
3135 ("i-amMisBehaved!", "i_amMisBehaved_"),
3136 ("", "_"),
3137 ] {
3138 let actual = Name::make_valid(input);
3139 assert_eq!(expected, actual, "Name::make_valid({input})")
3140 }
3141 }
3142
3143 #[mz_ore::test]
3144 fn test_parse_with_simple_reference() {
3145 let ref_schema_json = r#"{
3147 "type": "record",
3148 "name": "User",
3149 "namespace": "com.example",
3150 "fields": [{"name": "id", "type": "int"}]
3151 }"#;
3152
3153 let primary_json = r#"{
3155 "type": "record",
3156 "name": "Event",
3157 "namespace": "com.example",
3158 "fields": [{"name": "user", "type": "com.example.User"}]
3159 }"#;
3160
3161 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3162 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3163
3164 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3165
3166 let user_name = FullName {
3168 name: "User".to_string(),
3169 namespace: "com.example".to_string(),
3170 };
3171 let event_name = FullName {
3172 name: "Event".to_string(),
3173 namespace: "com.example".to_string(),
3174 };
3175
3176 assert!(
3177 schema.indices.contains_key(&user_name),
3178 "User type should be in schema indices"
3179 );
3180 assert!(
3181 schema.indices.contains_key(&event_name),
3182 "Event type should be in schema indices"
3183 );
3184
3185 if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3187 let event_piece = &schema.named[*event_idx].piece;
3188 if let SchemaPiece::Record { fields, .. } = event_piece {
3189 assert_eq!(fields.len(), 1);
3190 assert_eq!(fields[0].name, "user");
3191 assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3193 } else {
3194 panic!("Expected Event to be a record");
3195 }
3196 } else {
3197 panic!("Expected top to be Named");
3198 }
3199 }
3200
3201 #[mz_ore::test]
3202 fn test_parse_with_nested_references() {
3203 let schema_a = r#"{
3205 "type": "record",
3206 "name": "Address",
3207 "namespace": "com.example",
3208 "fields": [
3209 {"name": "street", "type": "string"},
3210 {"name": "city", "type": "string"}
3211 ]
3212 }"#;
3213
3214 let schema_b = r#"{
3216 "type": "record",
3217 "name": "User",
3218 "namespace": "com.example",
3219 "fields": [
3220 {"name": "id", "type": "int"},
3221 {"name": "address", "type": "com.example.Address"}
3222 ]
3223 }"#;
3224
3225 let schema_c = r#"{
3227 "type": "record",
3228 "name": "Event",
3229 "namespace": "com.example",
3230 "fields": [
3231 {"name": "user", "type": "com.example.User"},
3232 {"name": "timestamp", "type": "long"}
3233 ]
3234 }"#;
3235
3236 let ref_schema_a = Schema::from_str(schema_a).unwrap();
3238
3239 let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3241 let ref_schema_b =
3242 Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3243 .unwrap();
3244
3245 let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3247 let final_schema =
3248 Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3249
3250 for name in ["Address", "User", "Event"] {
3252 let full_name = FullName {
3253 name: name.to_string(),
3254 namespace: "com.example".to_string(),
3255 };
3256 assert!(
3257 final_schema.indices.contains_key(&full_name),
3258 "{} type should be in schema indices",
3259 name
3260 );
3261 }
3262 }
3263
3264 #[mz_ore::test]
3265 fn test_parse_with_multiple_types_in_reference() {
3266 let ref_schema_json = r#"{
3268 "type": "record",
3269 "name": "ContactInfo",
3270 "namespace": "com.example",
3271 "fields": [
3272 {
3273 "name": "address",
3274 "type": {
3275 "type": "record",
3276 "name": "Address",
3277 "fields": [{"name": "street", "type": "string"}]
3278 }
3279 },
3280 {
3281 "name": "phone",
3282 "type": {
3283 "type": "record",
3284 "name": "PhoneNumber",
3285 "fields": [{"name": "number", "type": "string"}]
3286 }
3287 }
3288 ]
3289 }"#;
3290
3291 let primary_json = r#"{
3293 "type": "record",
3294 "name": "User",
3295 "namespace": "com.example",
3296 "fields": [
3297 {"name": "id", "type": "int"},
3298 {"name": "home", "type": "com.example.Address"},
3299 {"name": "mobile", "type": "com.example.PhoneNumber"}
3300 ]
3301 }"#;
3302
3303 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3304 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3305
3306 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3307
3308 for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3310 let full_name = FullName {
3311 name: name.to_string(),
3312 namespace: "com.example".to_string(),
3313 };
3314 assert!(
3315 schema.indices.contains_key(&full_name),
3316 "{} type should be in schema indices",
3317 name
3318 );
3319 }
3320 }
3321
3322 #[mz_ore::test]
3323 fn test_parse_with_no_references() {
3324 let schema_json = r#"{
3326 "type": "record",
3327 "name": "Simple",
3328 "fields": [{"name": "id", "type": "int"}]
3329 }"#;
3330
3331 let value: Value = serde_json::from_str(schema_json).unwrap();
3332
3333 let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3334 let schema_normal = Schema::parse(&value).unwrap();
3335
3336 assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3338 assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3339 }
3340
3341 #[mz_ore::test]
3342 fn test_parse_with_reference_in_array() {
3343 let ref_schema_json = r#"{
3345 "type": "record",
3346 "name": "Item",
3347 "namespace": "com.example",
3348 "fields": [{"name": "name", "type": "string"}]
3349 }"#;
3350
3351 let primary_json = r#"{
3353 "type": "record",
3354 "name": "Order",
3355 "namespace": "com.example",
3356 "fields": [
3357 {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3358 ]
3359 }"#;
3360
3361 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3362 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3363
3364 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3365
3366 let item_name = FullName {
3368 name: "Item".to_string(),
3369 namespace: "com.example".to_string(),
3370 };
3371 assert!(schema.indices.contains_key(&item_name));
3372
3373 if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3375 let order_piece = &schema.named[*order_idx].piece;
3376 if let SchemaPiece::Record { fields, .. } = order_piece {
3377 if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3378 assert!(
3379 matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3380 "Array items should be a Named reference to Item"
3381 );
3382 } else {
3383 panic!("Expected items field to be an array");
3384 }
3385 } else {
3386 panic!("Expected Order to be a record");
3387 }
3388 } else {
3389 panic!("Expected top to be Named");
3390 }
3391 }
3392}