1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use aws_lc_rs::digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51 writer_schema: &Schema,
52 reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54 let r_indices = reader_schema.indices.clone();
55 let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56 writer_schema
57 .indices
58 .iter()
59 .flat_map(|(name, widx)| {
60 r_indices
61 .get(name)
62 .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63 })
64 .unzip();
65 let reader_fullnames = reader_schema
66 .indices
67 .iter()
68 .map(|(f, i)| (*i, f))
69 .collect::<BTreeMap<_, _>>();
70 let mut resolver = SchemaResolver {
71 named: Default::default(),
72 indices: Default::default(),
73 human_readable_field_path: Vec::new(),
74 current_human_readable_path_start: 0,
75 writer_to_reader_names,
76 reader_to_writer_names,
77 reader_to_resolved_names: Default::default(),
78 reader_fullnames,
79 reader_schema,
80 };
81 let writer_node = writer_schema.top_node_or_named();
82 let reader_node = reader_schema.top_node_or_named();
83 let inner = resolver.resolve(writer_node, reader_node)?;
84 let sch = Schema {
85 named: resolver.named.into_iter().map(Option::unwrap).collect(),
86 indices: resolver.indices,
87 top: inner,
88 };
89 Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97 pub fn new<S>(msg: S) -> ParseSchemaError
98 where
99 S: Into<String>,
100 {
101 ParseSchemaError(msg.into())
102 }
103}
104
105impl fmt::Display for ParseSchemaError {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 self.0.fmt(f)
108 }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118 pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 self.bytes
127 .iter()
128 .map(|byte| format!("{:02x}", byte))
129 .collect::<Vec<String>>()
130 .join("")
131 )
132 }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137 Piece(SchemaPiece),
138 Named(usize),
139}
140impl SchemaPieceOrNamed {
141 pub fn get_human_name(&self, root: &Schema) -> String {
142 match self {
143 Self::Piece(piece) => format!("{:?}", piece),
144 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145 }
146 }
147 #[inline(always)]
148 pub fn get_piece_and_name<'a>(
149 &'a self,
150 root: &'a Schema,
151 ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152 self.as_ref().get_piece_and_name(root)
153 }
154
155 #[inline(always)]
156 pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157 match self {
158 SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159 SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160 }
161 }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165 #[inline(always)]
166 fn from(piece: SchemaPiece) -> Self {
167 Self::Piece(piece)
168 }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173 Null,
175 Boolean,
177 Int,
179 Long,
181 Float,
183 Double,
185 Date,
187 TimestampMilli,
191 TimestampMicro,
195 Decimal {
201 precision: usize,
202 scale: usize,
203 fixed_size: Option<usize>,
204 },
205 Bytes,
208 String,
211 Json,
213 Uuid,
215 Array(Box<SchemaPieceOrNamed>),
218 Map(Box<SchemaPieceOrNamed>),
222 Union(UnionSchema),
224 ResolveIntTsMilli,
227 ResolveIntTsMicro,
230 ResolveDateTimestamp,
233 ResolveIntLong,
235 ResolveIntFloat,
237 ResolveIntDouble,
239 ResolveLongFloat,
241 ResolveLongDouble,
243 ResolveFloatDouble,
245 ResolveConcreteUnion {
248 index: usize,
250 inner: Box<SchemaPieceOrNamed>,
252 n_reader_variants: usize,
253 reader_null_variant: Option<usize>,
254 },
255 ResolveUnionUnion {
258 permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264 n_reader_variants: usize,
265 reader_null_variant: Option<usize>,
266 },
267 ResolveUnionConcrete {
269 index: usize,
270 inner: Box<SchemaPieceOrNamed>,
271 },
272 Record {
277 doc: Documentation,
278 fields: Vec<RecordField>,
279 lookup: BTreeMap<String, usize>,
280 },
281 Enum {
283 doc: Documentation,
284 symbols: Vec<String>,
285 default_idx: Option<usize>,
291 },
292 Fixed { size: usize },
294 ResolveRecord {
297 defaults: Vec<ResolvedDefaultValueField>,
300 fields: Vec<ResolvedRecordField>,
304 n_reader_fields: usize,
306 },
307 ResolveEnum {
310 doc: Documentation,
311 symbols: Vec<Result<(usize, String), String>>,
314 default: Option<(usize, String)>,
316 },
317}
318
319impl SchemaPiece {
320 pub fn is_underlying_int(&self) -> bool {
322 self.try_make_int_value(0).is_some()
323 }
324 pub fn is_underlying_long(&self) -> bool {
326 self.try_make_long_value(0).is_some()
327 }
328 pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331 match self {
332 SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333 SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336 _ => None,
337 }
338 }
339 pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342 match self {
343 SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344 SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345 SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346 _ => None,
347 }
348 }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356 pub(crate) named: Vec<NamedSchemaPiece>,
357 pub(crate) indices: BTreeMap<FullName, usize>,
358 pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362 fn to_string(&self) -> String {
363 let json = serde_json::to_value(self).unwrap();
364 json.to_string()
365 }
366}
367
368impl std::fmt::Debug for Schema {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 if f.alternate() {
371 f.write_str(
372 &serde_json::to_string_pretty(self)
373 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374 )
375 } else {
376 f.write_str(
377 &serde_json::to_string(self)
378 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379 )
380 }
381 }
382}
383
384impl Schema {
385 pub fn top_node(&self) -> SchemaNode<'_> {
386 let (inner, name) = self.top.get_piece_and_name(self);
387 SchemaNode {
388 root: self,
389 inner,
390 name,
391 }
392 }
393 pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394 SchemaNodeOrNamed {
395 root: self,
396 inner: self.top.as_ref(),
397 }
398 }
399 pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400 &self.named[idx]
401 }
402 pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403 self.indices.get(name).map(|&idx| &self.named[idx])
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416 Null,
418 Boolean,
419 Int,
420 Long,
421 Float,
422 Double,
423 Bytes,
425 String,
426 Array,
427 Map,
428 Union,
429 Record,
430 Enum,
431 Fixed,
432 Unknown,
435}
436
437impl SchemaKind {
438 pub fn name(self) -> &'static str {
439 match self {
440 SchemaKind::Null => "null",
441 SchemaKind::Boolean => "boolean",
442 SchemaKind::Int => "int",
443 SchemaKind::Long => "long",
444 SchemaKind::Float => "float",
445 SchemaKind::Double => "double",
446 SchemaKind::Bytes => "bytes",
447 SchemaKind::String => "string",
448 SchemaKind::Array => "array",
449 SchemaKind::Map => "map",
450 SchemaKind::Union => "union",
451 SchemaKind::Record => "record",
452 SchemaKind::Enum => "enum",
453 SchemaKind::Fixed => "fixed",
454 SchemaKind::Unknown => "unknown",
455 }
456 }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460 #[inline(always)]
461 fn from(piece: &'a SchemaPiece) -> SchemaKind {
462 match piece {
463 SchemaPiece::Null => SchemaKind::Null,
464 SchemaPiece::Boolean => SchemaKind::Boolean,
465 SchemaPiece::Int => SchemaKind::Int,
466 SchemaPiece::Long => SchemaKind::Long,
467 SchemaPiece::Float => SchemaKind::Float,
468 SchemaPiece::Double => SchemaKind::Double,
469 SchemaPiece::Date => SchemaKind::Int,
470 SchemaPiece::TimestampMilli
471 | SchemaPiece::TimestampMicro
472 | SchemaPiece::ResolveIntTsMilli
473 | SchemaPiece::ResolveDateTimestamp
474 | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475 SchemaPiece::Decimal {
476 fixed_size: None, ..
477 } => SchemaKind::Bytes,
478 SchemaPiece::Decimal {
479 fixed_size: Some(_),
480 ..
481 } => SchemaKind::Fixed,
482 SchemaPiece::Bytes => SchemaKind::Bytes,
483 SchemaPiece::String => SchemaKind::String,
484 SchemaPiece::Array(_) => SchemaKind::Array,
485 SchemaPiece::Map(_) => SchemaKind::Map,
486 SchemaPiece::Union(_) => SchemaKind::Union,
487 SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488 SchemaPiece::ResolveIntLong => SchemaKind::Long,
489 SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490 SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491 SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492 SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493 SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494 SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495 SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496 SchemaPiece::Record { .. } => SchemaKind::Record,
497 SchemaPiece::Enum { .. } => SchemaKind::Enum,
498 SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499 SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500 SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501 SchemaPiece::Json => SchemaKind::String,
502 SchemaPiece::Uuid => SchemaKind::String,
503 }
504 }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508 #[inline(always)]
509 fn from(schema: SchemaNode<'a>) -> SchemaKind {
510 SchemaKind::from(schema.inner)
511 }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515 #[inline(always)]
516 fn from(schema: &'a Schema) -> SchemaKind {
517 Self::from(schema.top_node())
518 }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533 pub name: String,
534 pub namespace: Option<String>,
535 pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540 name: String,
541 namespace: String,
542}
543
544impl FullName {
545 pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547 if let Some(ns) = namespace {
548 FullName {
549 name: name.to_owned(),
550 namespace: ns.to_owned(),
551 }
552 } else {
553 let mut split = name.rsplitn(2, '.');
554 let name = split.next().unwrap();
555 let namespace = split.next().unwrap_or(default_namespace);
556
557 FullName {
558 name: name.into(),
559 namespace: namespace.into(),
560 }
561 }
562 }
563 pub fn base_name(&self) -> &str {
564 &self.name
565 }
566 pub fn human_name(&self) -> String {
567 if self.namespace.is_empty() {
568 return self.name.clone();
569 }
570 format!("{}.{}", self.namespace, self.name)
571 }
572 pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577 if enclosing_ns == &self.namespace {
578 Cow::Borrowed(&self.name)
579 } else {
580 Cow::Owned(format!("{}.{}", self.namespace, self.name))
581 }
582 }
583 pub fn namespace(&self) -> &str {
585 &self.namespace
586 }
587}
588
589impl fmt::Debug for FullName {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 write!(f, "{}.{}", self.namespace, self.name)
592 }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599 pub fn is_valid(name: &str) -> bool {
603 static MATCHER: LazyLock<Regex> =
604 LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605 MATCHER.is_match(name)
606 }
607
608 pub fn validate(name: &str) -> Result<(), AvroError> {
610 match Self::is_valid(name) {
611 true => Ok(()),
612 false => {
613 Err(ParseSchemaError::new(format!(
614 "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615 name
616 )).into())
617 }
618 }
619 }
620
621 pub fn make_valid(name: &str) -> String {
629 let mut out = String::new();
630 let mut chars = name.chars();
631 match chars.next() {
632 Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633 Some(ch @ '0'..='9') => {
634 out.push('_');
635 out.push(ch);
636 }
637 _ => out.push('_'),
638 }
639 for ch in chars {
640 match ch {
641 '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642 _ => out.push('_'),
643 }
644 }
645 debug_assert!(
646 Name::is_valid(&out),
647 "make_valid({name}) produced invalid name: {out}"
648 );
649 out
650 }
651
652 pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654 let name = complex
655 .name()
656 .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657 if name.is_empty() {
658 return Err(ParseSchemaError::new(format!(
659 "Name cannot be the empty string: {:?}",
660 complex
661 ))
662 .into());
663 }
664
665 let (namespace, name) = if let Some(index) = name.rfind('.') {
666 let computed_namespace = name[..index].to_owned();
667 let computed_name = name[index + 1..].to_owned();
668 if let Some(provided_namespace) = complex.string("namespace") {
669 if provided_namespace != computed_namespace {
670 warn!(
671 "Found dots in name {}, updating to namespace {} and name {}",
672 name, computed_namespace, computed_name
673 );
674 }
675 }
676 (Some(computed_namespace), computed_name)
677 } else {
678 (complex.string("namespace"), name)
679 };
680
681 Self::validate(&name)?;
682
683 let aliases: Option<Vec<String>> = complex
684 .get("aliases")
685 .and_then(|aliases| aliases.as_array())
686 .and_then(|aliases| {
687 aliases
688 .iter()
689 .map(|alias| alias.as_str())
690 .map(|alias| alias.map(|a| a.to_string()))
691 .collect::<Option<_>>()
692 });
693
694 Ok(Name {
695 name,
696 namespace,
697 aliases,
698 })
699 }
700
701 pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703 let mut map = serde_json::map::Map::new();
704 map.insert("name".into(), serde_json::Value::String(name.into()));
705 Self::parse(&map)
706 }
707
708 pub fn fullname(&self, default_namespace: &str) -> FullName {
713 FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714 }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719 pub name: String,
720 pub doc: Documentation,
721 pub default: types::Value,
722 pub order: RecordFieldOrder,
723 pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728 Absent(Schema),
729 Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735 pub name: String,
737 pub doc: Documentation,
739 pub default: Option<Value>,
743 pub schema: SchemaPieceOrNamed,
745 pub order: RecordFieldOrder,
749 pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756 Ascending,
757 Descending,
758 Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765 schemas: Vec<SchemaPieceOrNamed>,
766
767 anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771 named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776 pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777 let mut avindex = BTreeMap::new();
778 let mut nvindex = BTreeMap::new();
779 for (i, schema) in schemas.iter().enumerate() {
780 match schema {
781 SchemaPieceOrNamed::Piece(sp) => {
782 if let SchemaPiece::Union(_) = sp {
783 return Err(ParseSchemaError::new(
784 "Unions may not directly contain a union",
785 )
786 .into());
787 }
788 let kind = SchemaKind::from(sp);
789 if avindex.insert(kind, i).is_some() {
790 return Err(
791 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792 );
793 }
794 }
795 SchemaPieceOrNamed::Named(idx) => {
796 if nvindex.insert(*idx, i).is_some() {
797 return Err(
798 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799 );
800 }
801 }
802 }
803 }
804 Ok(UnionSchema {
805 schemas,
806 anon_variant_index: avindex,
807 named_variant_index: nvindex,
808 })
809 }
810
811 pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813 &self.schemas
814 }
815
816 pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818 &mut self.schemas
819 }
820
821 pub fn is_nullable(&self) -> bool {
823 !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824 }
825
826 pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827 self.anon_variant_index
828 .get(&SchemaKind::from(sp))
829 .map(|idx| (*idx, &self.schemas[*idx]))
830 }
831
832 pub fn match_ref(
833 &self,
834 other: SchemaPieceRefOrNamed,
835 names_map: &BTreeMap<usize, usize>,
836 ) -> Option<(usize, &SchemaPieceOrNamed)> {
837 match other {
838 SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839 SchemaPieceRefOrNamed::Named(idx) => names_map
840 .get(&idx)
841 .and_then(|idx| self.named_variant_index.get(idx))
842 .map(|idx| (*idx, &self.schemas[*idx])),
843 }
844 }
845
846 #[inline(always)]
847 pub fn match_(
848 &self,
849 other: &SchemaPieceOrNamed,
850 names_map: &BTreeMap<usize, usize>,
851 ) -> Option<(usize, &SchemaPieceOrNamed)> {
852 self.match_ref(other.as_ref(), names_map)
853 }
854}
855
856impl PartialEq for UnionSchema {
858 fn eq(&self, other: &UnionSchema) -> bool {
859 self.schemas.eq(&other.schemas)
860 }
861}
862
863#[derive(Default)]
864struct SchemaParser {
865 named: Vec<Option<NamedSchemaPiece>>,
866 indices: BTreeMap<FullName, usize>,
867}
868
869impl SchemaParser {
870 fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
871 let top = self.parse_inner("", value)?;
872 let SchemaParser { named, indices } = self;
873 Ok(Schema {
874 named: named.into_iter().map(|o| o.unwrap()).collect(),
875 indices,
876 top,
877 })
878 }
879
880 fn parse_inner(
881 &mut self,
882 default_namespace: &str,
883 value: &Value,
884 ) -> Result<SchemaPieceOrNamed, AvroError> {
885 match *value {
886 Value::String(ref t) => {
887 let name = FullName::from_parts(t.as_str(), None, default_namespace);
888 if let Some(idx) = self.indices.get(&name) {
889 Ok(SchemaPieceOrNamed::Named(*idx))
890 } else {
891 Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
892 t.as_str(),
893 )?))
894 }
895 }
896 Value::Object(ref data) => self.parse_complex(default_namespace, data),
897 Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
898 self.parse_union(default_namespace, data)?,
899 )),
900 _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
901 }
902 }
903
904 fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
905 let idx = match self.indices.entry(fullname) {
906 Entry::Vacant(ve) => *ve.insert(self.named.len()),
907 Entry::Occupied(oe) => {
908 return Err(ParseSchemaError::new(format!(
909 "Sub-schema with name {:?} encountered multiple times",
910 oe.key()
911 ))
912 .into());
913 }
914 };
915 self.named.push(None);
916 Ok(idx)
917 }
918
919 fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
920 assert_none!(self.named[index]);
921 self.named[index] = Some(schema);
922 }
923
924 fn parse_named_type(
925 &mut self,
926 type_name: &str,
927 default_namespace: &str,
928 complex: &Map<String, Value>,
929 ) -> Result<usize, AvroError> {
930 let name = Name::parse(complex)?;
931 match name.name.as_str() {
932 "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
933 return Err(ParseSchemaError::new(format!(
934 "{} may not be used as a custom type name",
935 name.name
936 ))
937 .into());
938 }
939 _ => {}
940 };
941 let fullname = name.fullname(default_namespace);
942 let default_namespace = fullname.namespace.clone();
943 let idx = self.alloc_name(fullname.clone())?;
944 let piece = match type_name {
945 "record" => self.parse_record(&default_namespace, complex),
946 "enum" => self.parse_enum(complex),
947 "fixed" => self.parse_fixed(&default_namespace, complex),
948 _ => unreachable!("Unknown named type kind: {}", type_name),
949 }?;
950
951 self.insert(
952 idx,
953 NamedSchemaPiece {
954 name: fullname,
955 piece,
956 },
957 );
958
959 Ok(idx)
960 }
961
962 fn parse_complex(
968 &mut self,
969 default_namespace: &str,
970 complex: &Map<String, Value>,
971 ) -> Result<SchemaPieceOrNamed, AvroError> {
972 match complex.get("type") {
973 Some(&Value::String(ref t)) => Ok(match t.as_str() {
974 "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
975 t,
976 default_namespace,
977 complex,
978 )?),
979 "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
980 "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
981 "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
982 "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
983 "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
984 "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
985 other => {
986 let name = FullName {
987 name: other.into(),
988 namespace: default_namespace.into(),
989 };
990 if let Some(idx) = self.indices.get(&name) {
991 SchemaPieceOrNamed::Named(*idx)
992 } else {
993 SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
994 }
995 }
996 }),
997 Some(&Value::Object(ref data)) => match data.get("type") {
998 Some(value) => self.parse_inner(default_namespace, value),
999 None => Err(
1000 ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1001 ),
1002 },
1003 _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1004 }
1005 }
1006
1007 fn parse_record(
1010 &mut self,
1011 default_namespace: &str,
1012 complex: &Map<String, Value>,
1013 ) -> Result<SchemaPiece, AvroError> {
1014 let mut lookup = BTreeMap::new();
1015
1016 let fields: Vec<RecordField> = complex
1017 .get("fields")
1018 .and_then(|fields| fields.as_array())
1019 .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1020 .and_then(|fields| {
1021 fields
1022 .iter()
1023 .filter_map(|field| field.as_object())
1024 .enumerate()
1025 .map(|(position, field)| {
1026 self.parse_record_field(default_namespace, field, position)
1027 })
1028 .collect::<Result<_, _>>()
1029 })?;
1030
1031 for field in &fields {
1032 lookup.insert(field.name.clone(), field.position);
1033 }
1034
1035 Ok(SchemaPiece::Record {
1036 doc: complex.doc(),
1037 fields,
1038 lookup,
1039 })
1040 }
1041
1042 fn parse_record_field(
1044 &mut self,
1045 default_namespace: &str,
1046 field: &Map<String, Value>,
1047 position: usize,
1048 ) -> Result<RecordField, AvroError> {
1049 let name = field
1050 .name()
1051 .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1052
1053 Name::validate(&name)?;
1054
1055 let schema = field
1056 .get("type")
1057 .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1058 .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1059
1060 let default = field.get("default").cloned();
1061
1062 let order = field
1063 .get("order")
1064 .and_then(|order| order.as_str())
1065 .and_then(|order| match order {
1066 "ascending" => Some(RecordFieldOrder::Ascending),
1067 "descending" => Some(RecordFieldOrder::Descending),
1068 "ignore" => Some(RecordFieldOrder::Ignore),
1069 _ => None,
1070 })
1071 .unwrap_or(RecordFieldOrder::Ascending);
1072
1073 Ok(RecordField {
1074 name,
1075 doc: field.doc(),
1076 default,
1077 schema,
1078 order,
1079 position,
1080 })
1081 }
1082
1083 fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1086 let symbols: Vec<String> = complex
1087 .get("symbols")
1088 .and_then(|v| v.as_array())
1089 .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1090 .and_then(|symbols| {
1091 symbols
1092 .iter()
1093 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1094 .collect::<Option<_>>()
1095 .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1096 })?;
1097
1098 let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1099 for symbol in symbols.iter() {
1100 if unique_symbols.contains(symbol) {
1101 return Err(ParseSchemaError::new(format!(
1102 "Enum symbols must be unique, found multiple: {}",
1103 symbol
1104 ))
1105 .into());
1106 } else {
1107 unique_symbols.insert(symbol);
1108 }
1109 }
1110
1111 let default_idx = if let Some(default) = complex.get("default") {
1112 let default_str = default.as_str().ok_or_else(|| {
1113 ParseSchemaError::new(format!(
1114 "Enum default should be a string, got: {:?}",
1115 default
1116 ))
1117 })?;
1118 let default_idx = symbols
1119 .iter()
1120 .position(|x| x == default_str)
1121 .ok_or_else(|| {
1122 ParseSchemaError::new(format!(
1123 "Enum default not found in list of symbols: {}",
1124 default_str
1125 ))
1126 })?;
1127 Some(default_idx)
1128 } else {
1129 None
1130 };
1131
1132 Ok(SchemaPiece::Enum {
1133 doc: complex.doc(),
1134 symbols,
1135 default_idx,
1136 })
1137 }
1138
1139 fn parse_array(
1142 &mut self,
1143 default_namespace: &str,
1144 complex: &Map<String, Value>,
1145 ) -> Result<SchemaPiece, AvroError> {
1146 complex
1147 .get("items")
1148 .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1149 .and_then(|items| self.parse_inner(default_namespace, items))
1150 .map(|schema| SchemaPiece::Array(Box::new(schema)))
1151 }
1152
1153 fn parse_map(
1156 &mut self,
1157 default_namespace: &str,
1158 complex: &Map<String, Value>,
1159 ) -> Result<SchemaPiece, AvroError> {
1160 complex
1161 .get("values")
1162 .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1163 .and_then(|items| self.parse_inner(default_namespace, items))
1164 .map(|schema| SchemaPiece::Map(Box::new(schema)))
1165 }
1166
1167 fn parse_union(
1170 &mut self,
1171 default_namespace: &str,
1172 items: &[Value],
1173 ) -> Result<SchemaPiece, AvroError> {
1174 items
1175 .iter()
1176 .map(|value| self.parse_inner(default_namespace, value))
1177 .collect::<Result<Vec<_>, _>>()
1178 .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1179 }
1180
1181 fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1184 let precision = complex
1185 .get("precision")
1186 .and_then(|v| v.as_i64())
1187 .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1188
1189 let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1190
1191 if scale < 0 {
1192 return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1193 }
1194
1195 if precision < 0 {
1196 return Err(
1197 ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1198 );
1199 }
1200
1201 if scale > precision {
1202 return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1203 }
1204
1205 Ok((precision as usize, scale as usize))
1206 }
1207
1208 fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1211 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1212
1213 if let Some("decimal") = logical_type {
1214 match Self::parse_decimal(complex) {
1215 Ok((precision, scale)) => {
1216 return Ok(SchemaPiece::Decimal {
1217 precision,
1218 scale,
1219 fixed_size: None,
1220 });
1221 }
1222 Err(e) => warn!(
1223 "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1224 complex, e
1225 ),
1226 }
1227 }
1228
1229 Ok(SchemaPiece::Bytes)
1230 }
1231
1232 fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1240 const AVRO_DATE: &str = "date";
1241 const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1242 const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1243 if let Some(name) = complex.get("connect.name") {
1244 if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1245 if name == KAFKA_DATE {
1246 warn!("using deprecated debezium date format");
1247 }
1248 return Ok(SchemaPiece::Date);
1249 }
1250 }
1251 if let Some(name) = complex.get("logicalType") {
1255 if name == AVRO_DATE {
1256 return Ok(SchemaPiece::Date);
1257 }
1258 }
1259 if !complex.is_empty() {
1260 debug!("parsing complex type as regular int: {:?}", complex);
1261 }
1262 Ok(SchemaPiece::Int)
1263 }
1264
1265 fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1273 const AVRO_MILLI_TS: &str = "timestamp-millis";
1274 const AVRO_MICRO_TS: &str = "timestamp-micros";
1275
1276 const CONNECT_MILLI_TS: &[&str] = &[
1277 "io.debezium.time.Timestamp",
1278 "org.apache.kafka.connect.data.Timestamp",
1279 ];
1280 const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1281
1282 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1283 if CONNECT_MILLI_TS.contains(&&**name) {
1284 return Ok(SchemaPiece::TimestampMilli);
1285 }
1286 if name == CONNECT_MICRO_TS {
1287 return Ok(SchemaPiece::TimestampMicro);
1288 }
1289 }
1290 if let Some(name) = complex.get("logicalType") {
1291 if name == AVRO_MILLI_TS {
1292 return Ok(SchemaPiece::TimestampMilli);
1293 }
1294 if name == AVRO_MICRO_TS {
1295 return Ok(SchemaPiece::TimestampMicro);
1296 }
1297 }
1298 if !complex.is_empty() {
1299 debug!("parsing complex type as regular long: {:?}", complex);
1300 }
1301 Ok(SchemaPiece::Long)
1302 }
1303
1304 fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1305 const CONNECT_JSON: &str = "io.debezium.data.Json";
1306
1307 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1308 if CONNECT_JSON == name.as_str() {
1309 return SchemaPiece::Json;
1310 }
1311 }
1312 if let Some(name) = complex.get("logicalType") {
1313 if name == "uuid" {
1314 return SchemaPiece::Uuid;
1315 }
1316 }
1317 debug!("parsing complex type as regular string: {:?}", complex);
1318 SchemaPiece::String
1319 }
1320
1321 fn parse_fixed(
1324 &self,
1325 _default_namespace: &str,
1326 complex: &Map<String, Value>,
1327 ) -> Result<SchemaPiece, AvroError> {
1328 let _name = Name::parse(complex)?;
1329
1330 let size = complex
1331 .get("size")
1332 .and_then(|v| v.as_i64())
1333 .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1334 if size <= 0 {
1335 return Err(ParseSchemaError::new(format!(
1336 "Fixed values require a positive size attribute, found: {}",
1337 size
1338 ))
1339 .into());
1340 }
1341
1342 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1343
1344 if let Some("decimal") = logical_type {
1345 match Self::parse_decimal(complex) {
1346 Ok((precision, scale)) => {
1347 let max = ((8 * size - 1) as f64 * 2_f64.log10()).floor() as usize;
1348 if precision > max {
1349 warn!(
1350 "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1351 precision, size
1352 );
1353 } else {
1354 return Ok(SchemaPiece::Decimal {
1355 precision,
1356 scale,
1357 fixed_size: Some(size as usize),
1358 });
1359 }
1360 }
1361 Err(e) => warn!(
1362 "parsing decimal as fixed due to parse error: {:?}, {:?}",
1363 complex, e
1364 ),
1365 }
1366 }
1367
1368 Ok(SchemaPiece::Fixed {
1369 size: size as usize,
1370 })
1371 }
1372}
1373
1374impl Schema {
1375 pub fn parse(value: &Value) -> Result<Self, AvroError> {
1378 Self::parse_with_references(value, &[])
1379 }
1380
1381 pub fn parse_with_references(
1392 primary: &Value,
1393 reference_schemas: &[Schema],
1394 ) -> Result<Self, AvroError> {
1395 let (named, indices) = Self::collect_named_types(reference_schemas);
1397
1398 let p = SchemaParser {
1400 named: named.into_iter().map(Some).collect(),
1401 indices,
1402 };
1403 p.parse(primary)
1404 }
1405
1406 fn collect_named_types(
1411 schemas: &[Schema],
1412 ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1413 let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1414 let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1415
1416 for schema in schemas {
1417 let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1421 let mut new_type_offset = combined_named.len();
1422
1423 for named_piece in &schema.named {
1424 if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1425 index_map.push(existing_idx);
1427 } else {
1428 index_map.push(new_type_offset);
1430 new_type_offset += 1;
1431 }
1432 }
1433
1434 for named_piece in &schema.named {
1436 if combined_indices.contains_key(&named_piece.name) {
1437 continue;
1438 }
1439
1440 let mut remapped = named_piece.clone();
1441 Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1442
1443 let new_idx = combined_named.len();
1444 combined_indices.insert(remapped.name.clone(), new_idx);
1445 combined_named.push(remapped);
1446 }
1447 }
1448
1449 (combined_named, combined_indices)
1450 }
1451
1452 fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1454 match piece {
1455 SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1456 SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1457 SchemaPiece::Union(union) => {
1458 for variant in union.variants_mut() {
1459 Self::remap_indices_with_map(variant, index_map);
1460 }
1461 }
1462 SchemaPiece::Record { fields, .. } => {
1463 for field in fields {
1464 Self::remap_indices_with_map(&mut field.schema, index_map);
1465 }
1466 }
1467 _ => {}
1468 }
1469 }
1470
1471 fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1473 match item {
1474 SchemaPieceOrNamed::Named(idx) => {
1475 if let Some(&new_idx) = index_map.get(*idx) {
1476 *idx = new_idx;
1477 }
1478 }
1479 SchemaPieceOrNamed::Piece(piece) => {
1480 Self::remap_indices_in_piece_with_map(piece, index_map)
1481 }
1482 }
1483 }
1484
1485 pub fn canonical_form(&self) -> String {
1490 let json = serde_json::to_value(self).unwrap();
1491 parsing_canonical_form(&json)
1492 }
1493
1494 pub fn fingerprint(&self, algorithm: &'static digest::Algorithm) -> SchemaFingerprint {
1499 let hash = digest::digest(algorithm, self.canonical_form().as_bytes());
1500 SchemaFingerprint {
1501 bytes: hash.as_ref().to_vec(),
1502 }
1503 }
1504
1505 fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1508 match primitive {
1509 "null" => Ok(SchemaPiece::Null),
1510 "boolean" => Ok(SchemaPiece::Boolean),
1511 "int" => Ok(SchemaPiece::Int),
1512 "long" => Ok(SchemaPiece::Long),
1513 "double" => Ok(SchemaPiece::Double),
1514 "float" => Ok(SchemaPiece::Float),
1515 "bytes" => Ok(SchemaPiece::Bytes),
1516 "string" => Ok(SchemaPiece::String),
1517 other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1518 }
1519 }
1520}
1521
1522impl FromStr for Schema {
1523 type Err = AvroError;
1524
1525 fn from_str(input: &str) -> Result<Self, AvroError> {
1527 let value = serde_json::from_str(input)
1528 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1529 Self::parse(&value)
1530 }
1531}
1532
1533#[derive(Clone, Debug, PartialEq)]
1534pub struct NamedSchemaPiece {
1535 pub name: FullName,
1536 pub piece: SchemaPiece,
1537}
1538
1539#[derive(Copy, Clone, Debug)]
1540pub struct SchemaNode<'a> {
1541 pub root: &'a Schema,
1542 pub inner: &'a SchemaPiece,
1543 pub name: Option<&'a FullName>,
1544}
1545
1546#[derive(Copy, Clone, Debug)]
1547pub enum SchemaPieceRefOrNamed<'a> {
1548 Piece(&'a SchemaPiece),
1549 Named(usize),
1550}
1551
1552impl<'a> SchemaPieceRefOrNamed<'a> {
1553 pub fn get_human_name(&self, root: &Schema) -> String {
1554 match self {
1555 Self::Piece(piece) => format!("{:?}", piece),
1556 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1557 }
1558 }
1559
1560 #[inline(always)]
1561 pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1562 match self {
1563 SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1564 SchemaPieceRefOrNamed::Named(index) => {
1565 let named_piece = root.lookup(index);
1566 (&named_piece.piece, Some(&named_piece.name))
1567 }
1568 }
1569 }
1570}
1571
1572#[derive(Copy, Clone, Debug)]
1573pub struct SchemaNodeOrNamed<'a> {
1574 pub root: &'a Schema,
1575 pub inner: SchemaPieceRefOrNamed<'a>,
1576}
1577
1578impl<'a> SchemaNodeOrNamed<'a> {
1579 #[inline(always)]
1580 pub fn lookup(self) -> SchemaNode<'a> {
1581 let (inner, name) = self.inner.get_piece_and_name(self.root);
1582 SchemaNode {
1583 root: self.root,
1584 inner,
1585 name,
1586 }
1587 }
1588 #[inline(always)]
1589 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1590 self.step_ref(next.as_ref())
1591 }
1592 #[inline(always)]
1593 pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1594 Self {
1595 root: self.root,
1596 inner: match next {
1597 SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1598 SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1599 },
1600 }
1601 }
1602
1603 pub fn to_schema(self) -> Schema {
1604 let mut cloner = SchemaSubtreeDeepCloner {
1605 old_root: self.root,
1606 old_to_new_names: Default::default(),
1607 named: Default::default(),
1608 };
1609 let piece = cloner.clone_piece_or_named(self.inner);
1610 let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1611 let indices: BTreeMap<FullName, usize> = named
1612 .iter()
1613 .enumerate()
1614 .map(|(i, nsp)| (nsp.name.clone(), i))
1615 .collect();
1616 Schema {
1617 named,
1618 indices,
1619 top: piece,
1620 }
1621 }
1622
1623 pub fn namespace(self) -> Option<&'a str> {
1624 let SchemaNode { name, .. } = self.lookup();
1625 name.map(|FullName { namespace, .. }| namespace.as_str())
1626 }
1627}
1628
1629struct SchemaSubtreeDeepCloner<'a> {
1630 old_root: &'a Schema,
1631 old_to_new_names: BTreeMap<usize, usize>,
1632 named: Vec<Option<NamedSchemaPiece>>,
1633}
1634
1635impl<'a> SchemaSubtreeDeepCloner<'a> {
1636 fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1637 match piece {
1638 SchemaPiece::Null => SchemaPiece::Null,
1639 SchemaPiece::Boolean => SchemaPiece::Boolean,
1640 SchemaPiece::Int => SchemaPiece::Int,
1641 SchemaPiece::Long => SchemaPiece::Long,
1642 SchemaPiece::Float => SchemaPiece::Float,
1643 SchemaPiece::Double => SchemaPiece::Double,
1644 SchemaPiece::Date => SchemaPiece::Date,
1645 SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1646 SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1647 SchemaPiece::Json => SchemaPiece::Json,
1648 SchemaPiece::Decimal {
1649 scale,
1650 precision,
1651 fixed_size,
1652 } => SchemaPiece::Decimal {
1653 scale: *scale,
1654 precision: *precision,
1655 fixed_size: *fixed_size,
1656 },
1657 SchemaPiece::Bytes => SchemaPiece::Bytes,
1658 SchemaPiece::String => SchemaPiece::String,
1659 SchemaPiece::Uuid => SchemaPiece::Uuid,
1660 SchemaPiece::Array(inner) => {
1661 SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1662 }
1663 SchemaPiece::Map(inner) => {
1664 SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1665 }
1666 SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1667 schemas: us
1668 .schemas
1669 .iter()
1670 .map(|s| self.clone_piece_or_named(s.as_ref()))
1671 .collect(),
1672 anon_variant_index: us.anon_variant_index.clone(),
1673 named_variant_index: us.named_variant_index.clone(),
1674 }),
1675 SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1676 SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1677 SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1678 SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1679 SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1680 SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1681 SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1682 SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1683 SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1684 SchemaPiece::ResolveConcreteUnion {
1685 index,
1686 inner,
1687 n_reader_variants,
1688 reader_null_variant,
1689 } => SchemaPiece::ResolveConcreteUnion {
1690 index: *index,
1691 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1692 n_reader_variants: *n_reader_variants,
1693 reader_null_variant: *reader_null_variant,
1694 },
1695 SchemaPiece::ResolveUnionUnion {
1696 permutation,
1697 n_reader_variants,
1698 reader_null_variant,
1699 } => SchemaPiece::ResolveUnionUnion {
1700 permutation: permutation
1701 .clone()
1702 .into_iter()
1703 .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1704 .collect(),
1705 n_reader_variants: *n_reader_variants,
1706 reader_null_variant: *reader_null_variant,
1707 },
1708 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1709 SchemaPiece::ResolveUnionConcrete {
1710 index: *index,
1711 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1712 }
1713 }
1714 SchemaPiece::Record {
1715 doc,
1716 fields,
1717 lookup,
1718 } => SchemaPiece::Record {
1719 doc: doc.clone(),
1720 fields: fields
1721 .iter()
1722 .map(|rf| RecordField {
1723 name: rf.name.clone(),
1724 doc: rf.doc.clone(),
1725 default: rf.default.clone(),
1726 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1727 order: rf.order,
1728 position: rf.position,
1729 })
1730 .collect(),
1731 lookup: lookup.clone(),
1732 },
1733 SchemaPiece::Enum {
1734 doc,
1735 symbols,
1736 default_idx,
1737 } => SchemaPiece::Enum {
1738 doc: doc.clone(),
1739 symbols: symbols.clone(),
1740 default_idx: *default_idx,
1741 },
1742 SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1743 SchemaPiece::ResolveRecord {
1744 defaults,
1745 fields,
1746 n_reader_fields,
1747 } => SchemaPiece::ResolveRecord {
1748 defaults: defaults.clone(),
1749 fields: fields
1750 .iter()
1751 .map(|rf| match rf {
1752 ResolvedRecordField::Present(rf) => {
1753 ResolvedRecordField::Present(RecordField {
1754 name: rf.name.clone(),
1755 doc: rf.doc.clone(),
1756 default: rf.default.clone(),
1757 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1758 order: rf.order,
1759 position: rf.position,
1760 })
1761 }
1762 ResolvedRecordField::Absent(writer_schema) => {
1763 ResolvedRecordField::Absent(writer_schema.clone())
1764 }
1765 })
1766 .collect(),
1767 n_reader_fields: *n_reader_fields,
1768 },
1769 SchemaPiece::ResolveEnum {
1770 doc,
1771 symbols,
1772 default,
1773 } => SchemaPiece::ResolveEnum {
1774 doc: doc.clone(),
1775 symbols: symbols.clone(),
1776 default: default.clone(),
1777 },
1778 }
1779 }
1780 fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1781 match piece {
1782 SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1783 SchemaPieceRefOrNamed::Named(index) => {
1784 let new_index = match self.old_to_new_names.entry(index) {
1785 Entry::Vacant(ve) => {
1786 let new_index = self.named.len();
1787 self.named.push(None);
1788 ve.insert(new_index);
1789 let old_named_piece = self.old_root.lookup(index);
1790 let new_named_piece = NamedSchemaPiece {
1791 name: old_named_piece.name.clone(),
1792 piece: self.clone_piece(&old_named_piece.piece),
1793 };
1794 self.named[new_index] = Some(new_named_piece);
1795 new_index
1796 }
1797 Entry::Occupied(oe) => *oe.get(),
1798 };
1799 SchemaPieceOrNamed::Named(new_index)
1800 }
1801 }
1802 }
1803}
1804
1805impl<'a> SchemaNode<'a> {
1806 #[inline(always)]
1807 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1808 let (inner, name) = next.get_piece_and_name(self.root);
1809 Self {
1810 root: self.root,
1811 inner,
1812 name,
1813 }
1814 }
1815
1816 pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1817 use serde_json::Value::*;
1818 let val = match (json, self.inner) {
1819 (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1821 Some(variant) => AvroValue::Union {
1822 index: 0,
1823 inner: Box::new(self.step(variant).json_to_value(json)?),
1824 n_variants: us.schemas.len(),
1825 null_variant: us
1826 .schemas
1827 .iter()
1828 .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1829 },
1830 None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1831 },
1832 (Null, SchemaPiece::Null) => AvroValue::Null,
1833 (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1834 (Number(n), piece) => {
1835 match piece {
1836 piece if piece.is_underlying_int() => {
1837 let i =
1838 n.as_i64()
1839 .and_then(|i| i32::try_from(i).ok())
1840 .ok_or_else(|| {
1841 ParseSchemaError(format!("{} is not a 32-bit integer", n))
1842 })?;
1843 piece.try_make_int_value(i).unwrap().map_err(|e| {
1844 ParseSchemaError(format!("invalid default int {i}: {e}"))
1845 })?
1846 }
1847 piece if piece.is_underlying_long() => {
1848 let i = n.as_i64().ok_or_else(|| {
1849 ParseSchemaError(format!("{} is not a 64-bit integer", n))
1850 })?;
1851 piece.try_make_long_value(i).unwrap().map_err(|e| {
1852 ParseSchemaError(format!("invalid default long {i}: {e}"))
1853 })?
1854 }
1855 SchemaPiece::Float => {
1856 let f = n.as_f64().ok_or_else(|| {
1857 ParseSchemaError(format!("{} is not a 32-bit float", n))
1858 })?;
1859 AvroValue::Float(f as f32)
1860 }
1861 SchemaPiece::Double => {
1862 let f = n.as_f64().ok_or_else(|| {
1863 ParseSchemaError(format!("{} is not a 64-bit float", n))
1864 })?;
1865 AvroValue::Double(f)
1866 }
1867 _ => {
1868 return Err(ParseSchemaError(format!(
1869 "Unexpected number in default: {}",
1870 n
1871 )));
1872 }
1873 }
1874 }
1875 (String(s), piece)
1876 if s.eq_ignore_ascii_case("nan")
1877 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1878 {
1879 match piece {
1880 SchemaPiece::Float => AvroValue::Float(f32::NAN),
1881 SchemaPiece::Double => AvroValue::Double(f64::NAN),
1882 _ => unreachable!(),
1883 }
1884 }
1885 (String(s), piece)
1886 if s.eq_ignore_ascii_case("infinity")
1887 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1888 {
1889 match piece {
1890 SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
1891 SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
1892 _ => unreachable!(),
1893 }
1894 }
1895 (String(s), piece)
1896 if s.eq_ignore_ascii_case("-infinity")
1897 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1898 {
1899 match piece {
1900 SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
1901 SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
1902 _ => unreachable!(),
1903 }
1904 }
1905 (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
1906 (
1907 String(s),
1908 SchemaPiece::Decimal {
1909 precision, scale, ..
1910 },
1911 ) => AvroValue::Decimal(DecimalValue {
1912 precision: *precision,
1913 scale: *scale,
1914 unscaled: s.clone().into_bytes(),
1915 }),
1916 (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
1917 (Object(map), SchemaPiece::Record { fields, .. }) => {
1918 let field_values = fields
1919 .iter()
1920 .map(|rf| {
1921 let jval = map.get(&rf.name).ok_or_else(|| {
1922 ParseSchemaError(format!(
1923 "Field not found in default value: {}",
1924 rf.name
1925 ))
1926 })?;
1927 let value = self.step(&rf.schema).json_to_value(jval)?;
1928 Ok((rf.name.clone(), value))
1929 })
1930 .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
1931 AvroValue::Record(field_values)
1932 }
1933 (String(s), SchemaPiece::Enum { symbols, .. }) => {
1934 match symbols.iter().find_position(|sym| s == *sym) {
1935 Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
1936 None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
1937 }
1938 }
1939 (Array(vals), SchemaPiece::Array(inner)) => {
1940 let node = self.step(&**inner);
1941 let vals = vals
1942 .iter()
1943 .map(|val| node.json_to_value(val))
1944 .collect::<Result<Vec<_>, ParseSchemaError>>()?;
1945 AvroValue::Array(vals)
1946 }
1947 (Object(map), SchemaPiece::Map(inner)) => {
1948 let node = self.step(&**inner);
1949 let map = map
1950 .iter()
1951 .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
1952 .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
1953 AvroValue::Map(map)
1954 }
1955 (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
1956 AvroValue::Fixed(*size, s.clone().into_bytes())
1957 }
1958 _ => {
1959 return Err(ParseSchemaError(format!(
1960 "Json default value {} does not match schema",
1961 json
1962 )));
1963 }
1964 };
1965 Ok(val)
1966 }
1967}
1968
1969#[derive(Clone)]
1970struct SchemaSerContext<'a> {
1971 node: SchemaNodeOrNamed<'a>,
1972 seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
1977 enclosing_ns: &'a str,
1979}
1980
1981#[derive(Clone)]
1982struct RecordFieldSerContext<'a> {
1983 outer: &'a SchemaSerContext<'a>,
1984 inner: &'a RecordField,
1985}
1986
1987impl<'a> SchemaSerContext<'a> {
1988 fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1989 let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
1990 Self {
1991 node: self.node.step_ref(next),
1992 seen_named: Rc::clone(&self.seen_named),
1993 enclosing_ns: ns,
1994 }
1995 }
1996}
1997
1998impl<'a> Serialize for SchemaSerContext<'a> {
1999 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2000 where
2001 S: Serializer,
2002 {
2003 match self.node.inner {
2004 SchemaPieceRefOrNamed::Piece(piece) => match piece {
2005 SchemaPiece::Null => serializer.serialize_str("null"),
2006 SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2007 SchemaPiece::Int => serializer.serialize_str("int"),
2008 SchemaPiece::Long => serializer.serialize_str("long"),
2009 SchemaPiece::Float => serializer.serialize_str("float"),
2010 SchemaPiece::Double => serializer.serialize_str("double"),
2011 SchemaPiece::Date => {
2012 let mut map = serializer.serialize_map(Some(2))?;
2013 map.serialize_entry("type", "int")?;
2014 map.serialize_entry("logicalType", "date")?;
2015 map.end()
2016 }
2017 SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2018 let mut map = serializer.serialize_map(Some(2))?;
2019 map.serialize_entry("type", "long")?;
2020 if piece == &SchemaPiece::TimestampMilli {
2021 map.serialize_entry("logicalType", "timestamp-millis")?;
2022 } else {
2023 map.serialize_entry("logicalType", "timestamp-micros")?;
2024 }
2025 map.end()
2026 }
2027 SchemaPiece::Decimal {
2028 precision,
2029 scale,
2030 fixed_size: None,
2031 } => {
2032 let mut map = serializer.serialize_map(Some(4))?;
2033 map.serialize_entry("type", "bytes")?;
2034 map.serialize_entry("precision", precision)?;
2035 map.serialize_entry("scale", scale)?;
2036 map.serialize_entry("logicalType", "decimal")?;
2037 map.end()
2038 }
2039 SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2040 SchemaPiece::String => serializer.serialize_str("string"),
2041 SchemaPiece::Array(inner) => {
2042 let mut map = serializer.serialize_map(Some(2))?;
2043 map.serialize_entry("type", "array")?;
2044 map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2045 map.end()
2046 }
2047 SchemaPiece::Map(inner) => {
2048 let mut map = serializer.serialize_map(Some(2))?;
2049 map.serialize_entry("type", "map")?;
2050 map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2051 map.end()
2052 }
2053 SchemaPiece::Union(inner) => {
2054 let variants = inner.variants();
2055 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2056 for v in variants {
2057 seq.serialize_element(&self.step(v.as_ref()))?;
2058 }
2059 seq.end()
2060 }
2061 SchemaPiece::Json => {
2062 let mut map = serializer.serialize_map(Some(2))?;
2063 map.serialize_entry("type", "string")?;
2064 map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2065 map.end()
2066 }
2067 SchemaPiece::Uuid => {
2068 let mut map = serializer.serialize_map(Some(4))?;
2069 map.serialize_entry("type", "string")?;
2070 map.serialize_entry("logicalType", "uuid")?;
2071 map.end()
2072 }
2073 SchemaPiece::Record { .. }
2074 | SchemaPiece::Decimal {
2075 fixed_size: Some(_),
2076 ..
2077 }
2078 | SchemaPiece::Enum { .. }
2079 | SchemaPiece::Fixed { .. } => {
2080 unreachable!("Unexpected named schema piece in anonymous schema position")
2081 }
2082 SchemaPiece::ResolveIntLong
2083 | SchemaPiece::ResolveDateTimestamp
2084 | SchemaPiece::ResolveIntFloat
2085 | SchemaPiece::ResolveIntDouble
2086 | SchemaPiece::ResolveLongFloat
2087 | SchemaPiece::ResolveLongDouble
2088 | SchemaPiece::ResolveFloatDouble
2089 | SchemaPiece::ResolveConcreteUnion { .. }
2090 | SchemaPiece::ResolveUnionUnion { .. }
2091 | SchemaPiece::ResolveUnionConcrete { .. }
2092 | SchemaPiece::ResolveRecord { .. }
2093 | SchemaPiece::ResolveIntTsMicro
2094 | SchemaPiece::ResolveIntTsMilli
2095 | SchemaPiece::ResolveEnum { .. } => {
2096 panic!("Attempted to serialize resolved schema")
2097 }
2098 },
2099 SchemaPieceRefOrNamed::Named(index) => {
2100 let mut map = self.seen_named.borrow_mut();
2101 let named_piece = match map.get(&index) {
2102 Some(name) => {
2103 return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2104 }
2105 None => self.node.root.lookup(index),
2106 };
2107 let name = &named_piece.name;
2108 map.insert(index, name.clone());
2109 std::mem::drop(map);
2110 match &named_piece.piece {
2111 SchemaPiece::Record { doc, fields, .. } => {
2112 let mut map = serializer.serialize_map(None)?;
2113 map.serialize_entry("type", "record")?;
2114 map.serialize_entry("name", &name.name)?;
2115 if self.enclosing_ns != &name.namespace {
2116 map.serialize_entry("namespace", &name.namespace)?;
2117 }
2118 if let Some(docstr) = doc {
2119 map.serialize_entry("doc", docstr)?;
2120 }
2121 map.serialize_entry(
2123 "fields",
2124 &fields
2125 .iter()
2126 .map(|f| RecordFieldSerContext {
2127 outer: self,
2128 inner: f,
2129 })
2130 .collect::<Vec<_>>(),
2131 )?;
2132 map.end()
2133 }
2134 SchemaPiece::Enum {
2135 symbols,
2136 default_idx,
2137 ..
2138 } => {
2139 let mut map = serializer.serialize_map(None)?;
2140 map.serialize_entry("type", "enum")?;
2141 map.serialize_entry("name", &name.name)?;
2142 if self.enclosing_ns != &name.namespace {
2143 map.serialize_entry("namespace", &name.namespace)?;
2144 }
2145 map.serialize_entry("symbols", symbols)?;
2146 if let Some(default_idx) = *default_idx {
2147 assert!(default_idx < symbols.len());
2148 map.serialize_entry("default", &symbols[default_idx])?;
2149 }
2150 map.end()
2151 }
2152 SchemaPiece::Fixed { size } => {
2153 let mut map = serializer.serialize_map(None)?;
2154 map.serialize_entry("type", "fixed")?;
2155 map.serialize_entry("name", &name.name)?;
2156 if self.enclosing_ns != &name.namespace {
2157 map.serialize_entry("namespace", &name.namespace)?;
2158 }
2159 map.serialize_entry("size", size)?;
2160 map.end()
2161 }
2162 SchemaPiece::Decimal {
2163 scale,
2164 precision,
2165 fixed_size: Some(size),
2166 } => {
2167 let mut map = serializer.serialize_map(Some(6))?;
2168 map.serialize_entry("type", "fixed")?;
2169 map.serialize_entry("logicalType", "decimal")?;
2170 map.serialize_entry("name", &name.name)?;
2171 if self.enclosing_ns != &name.namespace {
2172 map.serialize_entry("namespace", &name.namespace)?;
2173 }
2174 map.serialize_entry("size", size)?;
2175 map.serialize_entry("precision", precision)?;
2176 map.serialize_entry("scale", scale)?;
2177 map.end()
2178 }
2179 SchemaPiece::Null
2180 | SchemaPiece::Boolean
2181 | SchemaPiece::Int
2182 | SchemaPiece::Long
2183 | SchemaPiece::Float
2184 | SchemaPiece::Double
2185 | SchemaPiece::Date
2186 | SchemaPiece::TimestampMilli
2187 | SchemaPiece::TimestampMicro
2188 | SchemaPiece::Decimal {
2189 fixed_size: None, ..
2190 }
2191 | SchemaPiece::Bytes
2192 | SchemaPiece::String
2193 | SchemaPiece::Array(_)
2194 | SchemaPiece::Map(_)
2195 | SchemaPiece::Union(_)
2196 | SchemaPiece::Uuid
2197 | SchemaPiece::Json => {
2198 unreachable!("Unexpected anonymous schema piece in named schema position")
2199 }
2200 SchemaPiece::ResolveIntLong
2201 | SchemaPiece::ResolveDateTimestamp
2202 | SchemaPiece::ResolveIntFloat
2203 | SchemaPiece::ResolveIntDouble
2204 | SchemaPiece::ResolveLongFloat
2205 | SchemaPiece::ResolveLongDouble
2206 | SchemaPiece::ResolveFloatDouble
2207 | SchemaPiece::ResolveConcreteUnion { .. }
2208 | SchemaPiece::ResolveUnionUnion { .. }
2209 | SchemaPiece::ResolveUnionConcrete { .. }
2210 | SchemaPiece::ResolveRecord { .. }
2211 | SchemaPiece::ResolveIntTsMilli
2212 | SchemaPiece::ResolveIntTsMicro
2213 | SchemaPiece::ResolveEnum { .. } => {
2214 panic!("Attempted to serialize resolved schema")
2215 }
2216 }
2217 }
2218 }
2219 }
2220}
2221
2222impl Serialize for Schema {
2223 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2224 where
2225 S: Serializer,
2226 {
2227 let ctx = SchemaSerContext {
2228 node: SchemaNodeOrNamed {
2229 root: self,
2230 inner: self.top.as_ref(),
2231 },
2232 seen_named: Rc::new(RefCell::new(Default::default())),
2233 enclosing_ns: "",
2234 };
2235 ctx.serialize(serializer)
2236 }
2237}
2238
2239impl<'a> Serialize for RecordFieldSerContext<'a> {
2240 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2241 where
2242 S: Serializer,
2243 {
2244 let mut map = serializer.serialize_map(None)?;
2245 map.serialize_entry("name", &self.inner.name)?;
2246 map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2247 if let Some(default) = &self.inner.default {
2248 map.serialize_entry("default", default)?;
2249 }
2250 if let Some(doc) = &self.inner.doc {
2251 map.serialize_entry("doc", doc)?;
2252 }
2253 map.end()
2254 }
2255}
2256
2257fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2260 pcf(schema, "", false)
2261}
2262
2263fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2264 match schema {
2265 serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2266 serde_json::Value::String(s) => pcf_string(s),
2267 serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2268 serde_json::Value::Number(n) => n.to_string(),
2269 _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2270 }
2271}
2272
2273fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2274 let default_ns = schema
2276 .get("namespace")
2277 .and_then(|v| v.as_str())
2278 .unwrap_or(enclosing_ns);
2279 let mut fields = Vec::new();
2280 let mut found_next_ns = None;
2281 let mut deferred_values = vec![];
2282 for (k, v) in schema {
2283 if schema.len() == 1 && k == "type" {
2285 if let serde_json::Value::String(s) = v {
2287 return pcf_string(s);
2288 }
2289 }
2290
2291 if field_ordering_position(k).is_none() {
2293 continue;
2294 }
2295
2296 if k == "name" {
2298 if in_fields {
2301 fields.push((
2302 k,
2303 format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2304 ));
2305 continue;
2306 }
2307 let name = v.as_str().unwrap();
2309 assert!(
2310 found_next_ns.is_none(),
2311 "`name` must not be specified multiple times"
2312 );
2313 let next_ns = match name.rsplit_once('.') {
2314 None => default_ns,
2315 Some((ns, _name)) => ns,
2316 };
2317 found_next_ns = Some(next_ns);
2318 let n = if next_ns.is_empty() {
2319 Cow::Borrowed(name)
2320 } else {
2321 Cow::Owned(format!("{next_ns}.{name}"))
2322 };
2323 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2324 continue;
2325 }
2326
2327 if k == "size" {
2329 let i = match v.as_str() {
2330 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2331 None => v.as_i64().unwrap(),
2332 };
2333 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2334 continue;
2335 }
2336
2337 deferred_values.push((k, v));
2340 }
2341
2342 let next_ns = found_next_ns.unwrap_or(default_ns);
2343 for (k, v) in deferred_values {
2344 fields.push((
2345 k,
2346 format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2347 ));
2348 }
2349
2350 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2352 let inter = fields
2353 .into_iter()
2354 .map(|(_, v)| v)
2355 .collect::<Vec<_>>()
2356 .join(",");
2357 format!("{{{}}}", inter)
2358}
2359
2360fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2361 let inter = arr
2362 .iter()
2363 .map(|s| pcf(s, enclosing_ns, in_fields))
2364 .collect::<Vec<String>>()
2365 .join(",");
2366 format!("[{}]", inter)
2367}
2368
2369fn pcf_string(s: &str) -> String {
2370 format!("\"{}\"", s)
2371}
2372
2373fn field_ordering_position(field: &str) -> Option<usize> {
2375 let v = match field {
2376 "name" => 1,
2377 "type" => 2,
2378 "fields" => 3,
2379 "symbols" => 4,
2380 "items" => 5,
2381 "values" => 6,
2382 "size" => 7,
2383 _ => return None,
2384 };
2385
2386 Some(v)
2387}
2388
2389#[cfg(test)]
2390mod tests {
2391 use mz_ore::{assert_err, assert_ok};
2392
2393 use crate::types::{Record, ToAvro};
2394
2395 use super::*;
2396
2397 fn check_schema(schema: &str, expected: SchemaPiece) {
2398 let schema = Schema::from_str(schema).unwrap();
2399 assert_eq!(&expected, schema.top_node().inner);
2400
2401 let schema = serde_json::to_string(&schema).unwrap();
2403 let schema = Schema::from_str(&schema).unwrap();
2404 assert_eq!(&expected, schema.top_node().inner);
2405 }
2406
2407 #[mz_ore::test]
2408 fn test_primitive_schema() {
2409 check_schema("\"null\"", SchemaPiece::Null);
2410 check_schema("\"int\"", SchemaPiece::Int);
2411 check_schema("\"double\"", SchemaPiece::Double);
2412 }
2413
2414 #[mz_ore::test]
2415 fn test_array_schema() {
2416 check_schema(
2417 r#"{"type": "array", "items": "string"}"#,
2418 SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2419 );
2420 }
2421
2422 #[mz_ore::test]
2423 fn test_map_schema() {
2424 check_schema(
2425 r#"{"type": "map", "values": "double"}"#,
2426 SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2427 );
2428 }
2429
2430 #[mz_ore::test]
2431 fn test_union_schema() {
2432 check_schema(
2433 r#"["null", "int"]"#,
2434 SchemaPiece::Union(
2435 UnionSchema::new(vec![
2436 SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2437 SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2438 ])
2439 .unwrap(),
2440 ),
2441 );
2442 }
2443
2444 #[mz_ore::test]
2445 fn test_multi_union_schema() {
2446 let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2447 assert_ok!(schema);
2448 let schema = schema.unwrap();
2449 let node = schema.top_node();
2450 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2451 let union_schema = match node.inner {
2452 SchemaPiece::Union(u) => u,
2453 _ => unreachable!(),
2454 };
2455 assert_eq!(union_schema.variants().len(), 5);
2456 let mut variants = union_schema.variants().iter();
2457 assert_eq!(
2458 SchemaKind::from(node.step(variants.next().unwrap())),
2459 SchemaKind::Null
2460 );
2461 assert_eq!(
2462 SchemaKind::from(node.step(variants.next().unwrap())),
2463 SchemaKind::Int
2464 );
2465 assert_eq!(
2466 SchemaKind::from(node.step(variants.next().unwrap())),
2467 SchemaKind::Float
2468 );
2469 assert_eq!(
2470 SchemaKind::from(node.step(variants.next().unwrap())),
2471 SchemaKind::String
2472 );
2473 assert_eq!(
2474 SchemaKind::from(node.step(variants.next().unwrap())),
2475 SchemaKind::Bytes
2476 );
2477 assert_eq!(variants.next(), None);
2478 }
2479
2480 #[mz_ore::test]
2481 fn test_record_schema() {
2482 let schema = r#"
2483 {
2484 "type": "record",
2485 "name": "test",
2486 "doc": "record doc",
2487 "fields": [
2488 {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2489 {"name": "b", "doc": "b doc", "type": "string"}
2490 ]
2491 }
2492 "#;
2493
2494 let mut lookup = BTreeMap::new();
2495 lookup.insert("a".to_owned(), 0);
2496 lookup.insert("b".to_owned(), 1);
2497
2498 let expected = SchemaPiece::Record {
2499 doc: Some("record doc".to_string()),
2500 fields: vec![
2501 RecordField {
2502 name: "a".to_string(),
2503 doc: Some("a doc".to_string()),
2504 default: Some(Value::Number(42i64.into())),
2505 schema: SchemaPiece::Long.into(),
2506 order: RecordFieldOrder::Ascending,
2507 position: 0,
2508 },
2509 RecordField {
2510 name: "b".to_string(),
2511 doc: Some("b doc".to_string()),
2512 default: None,
2513 schema: SchemaPiece::String.into(),
2514 order: RecordFieldOrder::Ascending,
2515 position: 1,
2516 },
2517 ],
2518 lookup,
2519 };
2520
2521 check_schema(schema, expected);
2522 }
2523
2524 #[mz_ore::test]
2525 fn test_enum_schema() {
2526 let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2527
2528 let expected = SchemaPiece::Enum {
2529 doc: None,
2530 symbols: vec![
2531 "diamonds".to_owned(),
2532 "spades".to_owned(),
2533 "jokers".to_owned(),
2534 "clubs".to_owned(),
2535 "hearts".to_owned(),
2536 ],
2537 default_idx: Some(2),
2538 };
2539
2540 check_schema(schema, expected);
2541
2542 let bad_schema = Schema::from_str(
2543 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2544 );
2545
2546 assert_err!(bad_schema);
2547 }
2548
2549 #[mz_ore::test]
2550 fn test_fixed_schema() {
2551 let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2552
2553 let expected = SchemaPiece::Fixed { size: 16usize };
2554
2555 check_schema(schema, expected);
2556 }
2557
2558 #[mz_ore::test]
2559 fn test_date_schema() {
2560 let kinds = &[
2561 r#"{
2562 "type": "int",
2563 "name": "datish",
2564 "logicalType": "date"
2565 }"#,
2566 r#"{
2567 "type": "int",
2568 "name": "datish",
2569 "connect.name": "io.debezium.time.Date"
2570 }"#,
2571 r#"{
2572 "type": "int",
2573 "name": "datish",
2574 "connect.name": "org.apache.kafka.connect.data.Date"
2575 }"#,
2576 ];
2577 for kind in kinds {
2578 check_schema(*kind, SchemaPiece::Date);
2579
2580 let schema = Schema::from_str(*kind).unwrap();
2581 assert_eq!(
2582 serde_json::to_string(&schema).unwrap(),
2583 r#"{"type":"int","logicalType":"date"}"#
2584 );
2585 }
2586 }
2587
2588 #[mz_ore::test]
2589 fn new_field_in_middle() {
2590 let reader = r#"{
2591 "type": "record",
2592 "name": "MyRecord",
2593 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2594 }"#;
2595 let writer = r#"{
2596 "type": "record",
2597 "name": "MyRecord",
2598 "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2599 }"#;
2600 let reader = Schema::from_str(reader).unwrap();
2601 let writer = Schema::from_str(writer).unwrap();
2602
2603 let mut record = Record::new(writer.top_node()).unwrap();
2604 record.put("f1", 1);
2605 record.put("f2", 2);
2606 record.put("f_interposed", 42);
2607
2608 let value = record.avro();
2609
2610 let mut buf = vec![];
2611 crate::encode::encode(&value, &writer, &mut buf);
2612
2613 let resolved = resolve_schemas(&writer, &reader).unwrap();
2614
2615 let reader = &mut &buf[..];
2616 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2617 let expected = crate::types::Value::Record(vec![
2618 ("f1".to_string(), crate::types::Value::Int(1)),
2619 ("f2".to_string(), crate::types::Value::Int(2)),
2620 ]);
2621 assert_eq!(reader_value, expected);
2622 assert!(reader.is_empty()); }
2624
2625 #[mz_ore::test]
2626 fn new_field_at_end() {
2627 let reader = r#"{
2628 "type": "record",
2629 "name": "MyRecord",
2630 "fields": [{"name": "f1", "type": "int"}]
2631 }"#;
2632 let writer = r#"{
2633 "type": "record",
2634 "name": "MyRecord",
2635 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2636 }"#;
2637 let reader = Schema::from_str(reader).unwrap();
2638 let writer = Schema::from_str(writer).unwrap();
2639
2640 let mut record = Record::new(writer.top_node()).unwrap();
2641 record.put("f1", 1);
2642 record.put("f2", 2);
2643
2644 let value = record.avro();
2645
2646 let mut buf = vec![];
2647 crate::encode::encode(&value, &writer, &mut buf);
2648
2649 let resolved = resolve_schemas(&writer, &reader).unwrap();
2650
2651 let reader = &mut &buf[..];
2652 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2653 let expected =
2654 crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2655 assert_eq!(reader_value, expected);
2656 assert!(reader.is_empty()); }
2658
2659 #[mz_ore::test]
2660 fn default_non_nums() {
2661 let reader = r#"{
2662 "type": "record",
2663 "name": "MyRecord",
2664 "fields": [
2665 {"name": "f1", "type": "double", "default": "NaN"},
2666 {"name": "f2", "type": "double", "default": "Infinity"},
2667 {"name": "f3", "type": "double", "default": "-Infinity"}
2668 ]
2669 }
2670 "#;
2671 let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2672
2673 let writer_schema = Schema::from_str(writer).unwrap();
2674 let reader_schema = Schema::from_str(reader).unwrap();
2675 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2676
2677 let record = Record::new(writer_schema.top_node()).unwrap();
2678
2679 let value = record.avro();
2680 let mut buf = vec![];
2681 crate::encode::encode(&value, &writer_schema, &mut buf);
2682
2683 let reader = &mut &buf[..];
2684 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2685 let expected = crate::types::Value::Record(vec![
2686 ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2687 ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2688 (
2689 "f3".to_string(),
2690 crate::types::Value::Double(f64::NEG_INFINITY),
2691 ),
2692 ]);
2693
2694 #[derive(Debug)]
2695 struct NanEq(crate::types::Value);
2696 impl std::cmp::PartialEq for NanEq {
2697 fn eq(&self, other: &Self) -> bool {
2698 match (self, other) {
2699 (
2700 NanEq(crate::types::Value::Double(x)),
2701 NanEq(crate::types::Value::Double(y)),
2702 ) if x.is_nan() && y.is_nan() => true,
2703 (
2704 NanEq(crate::types::Value::Float(x)),
2705 NanEq(crate::types::Value::Float(y)),
2706 ) if x.is_nan() && y.is_nan() => true,
2707 (
2708 NanEq(crate::types::Value::Record(xs)),
2709 NanEq(crate::types::Value::Record(ys)),
2710 ) => {
2711 let xs = xs
2712 .iter()
2713 .cloned()
2714 .map(|(k, v)| (k, NanEq(v)))
2715 .collect::<Vec<_>>();
2716 let ys = ys
2717 .iter()
2718 .cloned()
2719 .map(|(k, v)| (k, NanEq(v)))
2720 .collect::<Vec<_>>();
2721
2722 xs == ys
2723 }
2724 (NanEq(x), NanEq(y)) => x == y,
2725 }
2726 }
2727 }
2728
2729 assert_eq!(NanEq(reader_value), NanEq(expected));
2730 assert!(reader.is_empty());
2731 }
2732
2733 #[mz_ore::test]
2734 fn test_decimal_schemas() {
2735 let schema = r#"{
2736 "type": "fixed",
2737 "name": "dec",
2738 "size": 8,
2739 "logicalType": "decimal",
2740 "precision": 12,
2741 "scale": 5
2742 }"#;
2743 let expected = SchemaPiece::Decimal {
2744 precision: 12,
2745 scale: 5,
2746 fixed_size: Some(8),
2747 };
2748 check_schema(schema, expected);
2749
2750 let schema = r#"{
2751 "type": "bytes",
2752 "logicalType": "decimal",
2753 "precision": 12,
2754 "scale": 5
2755 }"#;
2756 let expected = SchemaPiece::Decimal {
2757 precision: 12,
2758 scale: 5,
2759 fixed_size: None,
2760 };
2761 check_schema(schema, expected);
2762
2763 let res = Schema::from_str(
2764 r#"["bytes", {
2765 "type": "bytes",
2766 "logicalType": "decimal",
2767 "precision": 12,
2768 "scale": 5
2769 }]"#,
2770 );
2771 assert_eq!(
2772 res.unwrap_err().to_string(),
2773 "Schema parse error: Unions cannot contain duplicate types"
2774 );
2775
2776 let writer_schema = Schema::from_str(
2777 r#"["null", {
2778 "type": "bytes"
2779 }]"#,
2780 )
2781 .unwrap();
2782 let reader_schema = Schema::from_str(
2783 r#"["null", {
2784 "type": "bytes",
2785 "logicalType": "decimal",
2786 "precision": 12,
2787 "scale": 5
2788 }]"#,
2789 )
2790 .unwrap();
2791 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2792
2793 let expected = SchemaPiece::ResolveUnionUnion {
2794 permutation: vec![
2795 Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2796 Ok((
2797 1,
2798 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2799 precision: 12,
2800 scale: 5,
2801 fixed_size: None,
2802 }),
2803 )),
2804 ],
2805 n_reader_variants: 2,
2806 reader_null_variant: Some(0),
2807 };
2808 assert_eq!(resolved.top_node().inner, &expected);
2809 }
2810
2811 #[mz_ore::test]
2812 fn test_no_documentation() {
2813 let schema =
2814 Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2815 .unwrap();
2816
2817 let doc = match schema.top_node().inner {
2818 SchemaPiece::Enum { doc, .. } => doc.clone(),
2819 _ => panic!(),
2820 };
2821
2822 assert_none!(doc);
2823 }
2824
2825 #[mz_ore::test]
2826 fn test_documentation() {
2827 let schema = Schema::from_str(
2828 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2829 ).unwrap();
2830
2831 let doc = match schema.top_node().inner {
2832 SchemaPiece::Enum { doc, .. } => doc.clone(),
2833 _ => None,
2834 };
2835
2836 assert_eq!("Some documentation".to_owned(), doc.unwrap());
2837 }
2838
2839 #[mz_ore::test]
2840 fn test_namespaces_and_names() {
2841 let schema = Schema::from_str(
2843 r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
2844 )
2845 .unwrap();
2846 assert_eq!(schema.named.len(), 1);
2847 assert_eq!(
2848 schema.named[0].name,
2849 FullName {
2850 name: "name".into(),
2851 namespace: "namespace".into()
2852 }
2853 );
2854
2855 let schema =
2857 Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
2858 .unwrap();
2859 assert_eq!(schema.named.len(), 1);
2860 assert_eq!(
2861 schema.named[0].name,
2862 FullName {
2863 name: "dots".into(),
2864 namespace: "name.has".into()
2865 }
2866 );
2867
2868 let schema = Schema::from_str(
2870 r#"{"type": "enum", "namespace": "namespace",
2871 "name": "name.has.dots", "symbols": ["A", "B"]}"#,
2872 )
2873 .unwrap();
2874 assert_eq!(schema.named.len(), 1);
2875 assert_eq!(
2876 schema.named[0].name,
2877 FullName {
2878 name: "dots".into(),
2879 namespace: "name.has".into()
2880 }
2881 );
2882
2883 let schema = Schema::from_str(
2886 r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
2887 "fields": [{"name": "name", "type": "string"}]}"#,
2888 )
2889 .unwrap();
2890 assert_eq!(schema.named.len(), 1);
2891 assert_eq!(
2892 schema.named[0].name,
2893 FullName {
2894 name: "TestDoc".into(),
2895 namespace: "".into()
2896 }
2897 );
2898
2899 let schema = Schema::from_str(
2901 r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
2902 "fields": [{"name": "name", "type": "string"}]}"#,
2903 )
2904 .unwrap();
2905 assert_eq!(schema.named.len(), 1);
2906 assert_eq!(
2907 schema.named[0].name,
2908 FullName {
2909 name: "TestDoc".into(),
2910 namespace: "".into()
2911 }
2912 );
2913
2914 let first = Schema::from_str(
2916 r#"{"type": "fixed", "namespace": "namespace",
2917 "name": "name", "size": 1}"#,
2918 )
2919 .unwrap();
2920 let second = Schema::from_str(
2921 r#"{"type": "fixed", "name": "namespace.name",
2922 "size": 1}"#,
2923 )
2924 .unwrap();
2925 assert_eq!(first.named[0].name, second.named[0].name);
2926
2927 let first = Schema::from_str(
2928 r#"{"type": "fixed", "namespace": "namespace",
2929 "name": "name", "size": 1}"#,
2930 )
2931 .unwrap();
2932 let second = Schema::from_str(
2933 r#"{"type": "fixed", "name": "namespace.Name",
2934 "size": 1}"#,
2935 )
2936 .unwrap();
2937 assert_ne!(first.named[0].name, second.named[0].name);
2938
2939 let first = Schema::from_str(
2940 r#"{"type": "fixed", "namespace": "Namespace",
2941 "name": "name", "size": 1}"#,
2942 )
2943 .unwrap();
2944 let second = Schema::from_str(
2945 r#"{"type": "fixed", "namespace": "namespace",
2946 "name": "name", "size": 1}"#,
2947 )
2948 .unwrap();
2949 assert_ne!(first.named[0].name, second.named[0].name);
2950
2951 assert!(
2954 Schema::from_str(
2955 r#"{"type": "record", "name": "99 problems but a name aint one",
2956 "fields": [{"name": "name", "type": "string"}]}"#
2957 )
2958 .is_err()
2959 );
2960
2961 assert!(
2962 Schema::from_str(
2963 r#"{"type": "record", "name": "!!!",
2964 "fields": [{"name": "name", "type": "string"}]}"#
2965 )
2966 .is_err()
2967 );
2968
2969 assert!(
2970 Schema::from_str(
2971 r#"{"type": "record", "name": "_valid_until_©",
2972 "fields": [{"name": "name", "type": "string"}]}"#
2973 )
2974 .is_err()
2975 );
2976
2977 let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
2979 {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
2980 {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
2981 {"name": "f3", "type": "MyEnum"},
2982 {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
2983 {"name": "f5", "type": "other.namespace.OtherEnum"},
2984 {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
2985 {"name": "f7", "type": "some.other.ThirdEnum"}
2986 ]}"#).unwrap();
2987 assert_eq!(schema.named.len(), 4);
2988
2989 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
2990 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
2998 panic!("Expected SchemaPiece::Record, found something else");
2999 }
3000
3001 let schema = Schema::from_str(
3002 r#"{"type": "record", "name": "x.Y", "fields": [
3003 {"name": "e", "type":
3004 {"type": "record", "name": "Z", "fields": [
3005 {"name": "f", "type": "x.Y"},
3006 {"name": "g", "type": "x.Z"}
3007 ]}
3008 }
3009 ]}"#,
3010 )
3011 .unwrap();
3012 assert_eq!(schema.named.len(), 2);
3013
3014 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3015 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
3017 panic!("Expected SchemaPiece::Record, found something else");
3018 }
3019
3020 if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3021 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
3024 panic!("Expected SchemaPiece::Record, found something else");
3025 }
3026
3027 let schema = Schema::from_str(
3028 r#"{"type": "record", "name": "R", "fields": [
3029 {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3030 {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3031 "symbols": ["Foo", "Bar"]}
3032 }
3033 ]}},
3034 {"name": "t", "type": "Z"}
3035 ]}"#,
3036 )
3037 .unwrap();
3038 assert_eq!(schema.named.len(), 3);
3039
3040 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3041 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
3044 panic!("Expected SchemaPiece::Record, found something else");
3045 }
3046 }
3047
3048 #[mz_ore::test]
3051 fn test_schema_is_send() {
3052 fn send<S: Send>(_s: S) {}
3053
3054 let schema = Schema {
3055 named: vec![],
3056 indices: Default::default(),
3057 top: SchemaPiece::Null.into(),
3058 };
3059 send(schema);
3060 }
3061
3062 #[mz_ore::test]
3063 fn test_schema_is_sync() {
3064 fn sync<S: Sync>(_s: S) {}
3065
3066 let schema = Schema {
3067 named: vec![],
3068 indices: Default::default(),
3069 top: SchemaPiece::Null.into(),
3070 };
3071 sync(&schema);
3072 sync(schema);
3073 }
3074
3075 #[mz_ore::test]
3076 #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
3078 let raw_schema = r#"
3079 {
3080 "type": "record",
3081 "name": "test",
3082 "fields": [
3083 {"name": "a", "type": "long", "default": 42},
3084 {"name": "b", "type": "string"}
3085 ]
3086 }
3087 "#;
3088 let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3089 let schema = Schema::from_str(raw_schema).unwrap();
3090 assert_eq!(&schema.canonical_form(), expected_canonical);
3091 let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3092 .as_ref()
3093 .iter()
3094 .map(|b| format!("{b:02x}"))
3095 .collect::<String>();
3096 assert_eq!(
3097 format!("{}", schema.fingerprint(&digest::SHA256)),
3098 expected_fingerprint
3099 );
3100
3101 let raw_schema = r#"
3102{
3103 "type": "record",
3104 "name": "ns.r1",
3105 "namespace": "ignored",
3106 "fields": [
3107 {
3108 "name": "f1",
3109 "type": {
3110 "type": "fixed",
3111 "name": "r2",
3112 "size": 1
3113 }
3114 }
3115 ]
3116}
3117"#;
3118 let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3119 let schema = Schema::from_str(raw_schema).unwrap();
3120 assert_eq!(&schema.canonical_form(), expected_canonical);
3121 let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3122 .as_ref()
3123 .iter()
3124 .map(|b| format!("{b:02x}"))
3125 .collect::<String>();
3126 assert_eq!(
3127 format!("{}", schema.fingerprint(&digest::SHA256)),
3128 expected_fingerprint
3129 );
3130 }
3131
3132 #[mz_ore::test]
3133 fn test_make_valid() {
3134 for (input, expected) in [
3135 ("foo", "foo"),
3136 ("az99", "az99"),
3137 ("99az", "_99az"),
3138 ("is,bad", "is_bad"),
3139 ("@#$%", "____"),
3140 ("i-amMisBehaved!", "i_amMisBehaved_"),
3141 ("", "_"),
3142 ] {
3143 let actual = Name::make_valid(input);
3144 assert_eq!(expected, actual, "Name::make_valid({input})")
3145 }
3146 }
3147
3148 #[mz_ore::test]
3149 fn test_parse_with_simple_reference() {
3150 let ref_schema_json = r#"{
3152 "type": "record",
3153 "name": "User",
3154 "namespace": "com.example",
3155 "fields": [{"name": "id", "type": "int"}]
3156 }"#;
3157
3158 let primary_json = r#"{
3160 "type": "record",
3161 "name": "Event",
3162 "namespace": "com.example",
3163 "fields": [{"name": "user", "type": "com.example.User"}]
3164 }"#;
3165
3166 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3167 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3168
3169 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3170
3171 let user_name = FullName {
3173 name: "User".to_string(),
3174 namespace: "com.example".to_string(),
3175 };
3176 let event_name = FullName {
3177 name: "Event".to_string(),
3178 namespace: "com.example".to_string(),
3179 };
3180
3181 assert!(
3182 schema.indices.contains_key(&user_name),
3183 "User type should be in schema indices"
3184 );
3185 assert!(
3186 schema.indices.contains_key(&event_name),
3187 "Event type should be in schema indices"
3188 );
3189
3190 if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3192 let event_piece = &schema.named[*event_idx].piece;
3193 if let SchemaPiece::Record { fields, .. } = event_piece {
3194 assert_eq!(fields.len(), 1);
3195 assert_eq!(fields[0].name, "user");
3196 assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3198 } else {
3199 panic!("Expected Event to be a record");
3200 }
3201 } else {
3202 panic!("Expected top to be Named");
3203 }
3204 }
3205
3206 #[mz_ore::test]
3207 fn test_parse_with_nested_references() {
3208 let schema_a = r#"{
3210 "type": "record",
3211 "name": "Address",
3212 "namespace": "com.example",
3213 "fields": [
3214 {"name": "street", "type": "string"},
3215 {"name": "city", "type": "string"}
3216 ]
3217 }"#;
3218
3219 let schema_b = r#"{
3221 "type": "record",
3222 "name": "User",
3223 "namespace": "com.example",
3224 "fields": [
3225 {"name": "id", "type": "int"},
3226 {"name": "address", "type": "com.example.Address"}
3227 ]
3228 }"#;
3229
3230 let schema_c = r#"{
3232 "type": "record",
3233 "name": "Event",
3234 "namespace": "com.example",
3235 "fields": [
3236 {"name": "user", "type": "com.example.User"},
3237 {"name": "timestamp", "type": "long"}
3238 ]
3239 }"#;
3240
3241 let ref_schema_a = Schema::from_str(schema_a).unwrap();
3243
3244 let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3246 let ref_schema_b =
3247 Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3248 .unwrap();
3249
3250 let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3252 let final_schema =
3253 Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3254
3255 for name in ["Address", "User", "Event"] {
3257 let full_name = FullName {
3258 name: name.to_string(),
3259 namespace: "com.example".to_string(),
3260 };
3261 assert!(
3262 final_schema.indices.contains_key(&full_name),
3263 "{} type should be in schema indices",
3264 name
3265 );
3266 }
3267 }
3268
3269 #[mz_ore::test]
3270 fn test_parse_with_multiple_types_in_reference() {
3271 let ref_schema_json = r#"{
3273 "type": "record",
3274 "name": "ContactInfo",
3275 "namespace": "com.example",
3276 "fields": [
3277 {
3278 "name": "address",
3279 "type": {
3280 "type": "record",
3281 "name": "Address",
3282 "fields": [{"name": "street", "type": "string"}]
3283 }
3284 },
3285 {
3286 "name": "phone",
3287 "type": {
3288 "type": "record",
3289 "name": "PhoneNumber",
3290 "fields": [{"name": "number", "type": "string"}]
3291 }
3292 }
3293 ]
3294 }"#;
3295
3296 let primary_json = r#"{
3298 "type": "record",
3299 "name": "User",
3300 "namespace": "com.example",
3301 "fields": [
3302 {"name": "id", "type": "int"},
3303 {"name": "home", "type": "com.example.Address"},
3304 {"name": "mobile", "type": "com.example.PhoneNumber"}
3305 ]
3306 }"#;
3307
3308 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3309 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3310
3311 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3312
3313 for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3315 let full_name = FullName {
3316 name: name.to_string(),
3317 namespace: "com.example".to_string(),
3318 };
3319 assert!(
3320 schema.indices.contains_key(&full_name),
3321 "{} type should be in schema indices",
3322 name
3323 );
3324 }
3325 }
3326
3327 #[mz_ore::test]
3328 fn test_parse_with_no_references() {
3329 let schema_json = r#"{
3331 "type": "record",
3332 "name": "Simple",
3333 "fields": [{"name": "id", "type": "int"}]
3334 }"#;
3335
3336 let value: Value = serde_json::from_str(schema_json).unwrap();
3337
3338 let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3339 let schema_normal = Schema::parse(&value).unwrap();
3340
3341 assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3343 assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3344 }
3345
3346 #[mz_ore::test]
3347 fn test_parse_with_reference_in_array() {
3348 let ref_schema_json = r#"{
3350 "type": "record",
3351 "name": "Item",
3352 "namespace": "com.example",
3353 "fields": [{"name": "name", "type": "string"}]
3354 }"#;
3355
3356 let primary_json = r#"{
3358 "type": "record",
3359 "name": "Order",
3360 "namespace": "com.example",
3361 "fields": [
3362 {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3363 ]
3364 }"#;
3365
3366 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3367 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3368
3369 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3370
3371 let item_name = FullName {
3373 name: "Item".to_string(),
3374 namespace: "com.example".to_string(),
3375 };
3376 assert!(schema.indices.contains_key(&item_name));
3377
3378 if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3380 let order_piece = &schema.named[*order_idx].piece;
3381 if let SchemaPiece::Record { fields, .. } = order_piece {
3382 if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3383 assert!(
3384 matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3385 "Array items should be a Named reference to Item"
3386 );
3387 } else {
3388 panic!("Expected items field to be an array");
3389 }
3390 } else {
3391 panic!("Expected Order to be a record");
3392 }
3393 } else {
3394 panic!("Expected top to be Named");
3395 }
3396 }
3397}