1use std::borrow::Cow;
27use std::cell::RefCell;
28use std::collections::btree_map::Entry;
29use std::collections::{BTreeMap, BTreeSet};
30use std::fmt;
31use std::rc::Rc;
32use std::str::FromStr;
33use std::sync::LazyLock;
34
35use aws_lc_rs::digest;
36use itertools::Itertools;
37use mz_ore::assert_none;
38use regex::Regex;
39use serde::ser::{SerializeMap, SerializeSeq};
40use serde::{Serialize, Serializer};
41use serde_json::{self, Map, Value};
42use tracing::{debug, warn};
43
44use crate::decode::build_ts_value;
45use crate::error::Error as AvroError;
46use crate::reader::SchemaResolver;
47use crate::types::{self, DecimalValue, Value as AvroValue};
48use crate::util::{MapHelper, TsUnit};
49
50pub fn resolve_schemas(
51 writer_schema: &Schema,
52 reader_schema: &Schema,
53) -> Result<Schema, AvroError> {
54 let r_indices = reader_schema.indices.clone();
55 let (reader_to_writer_names, writer_to_reader_names): (BTreeMap<_, _>, BTreeMap<_, _>) =
56 writer_schema
57 .indices
58 .iter()
59 .flat_map(|(name, widx)| {
60 r_indices
61 .get(name)
62 .map(|ridx| ((*ridx, *widx), (*widx, *ridx)))
63 })
64 .unzip();
65 let reader_fullnames = reader_schema
66 .indices
67 .iter()
68 .map(|(f, i)| (*i, f))
69 .collect::<BTreeMap<_, _>>();
70 let mut resolver = SchemaResolver {
71 named: Default::default(),
72 indices: Default::default(),
73 human_readable_field_path: Vec::new(),
74 current_human_readable_path_start: 0,
75 writer_to_reader_names,
76 reader_to_writer_names,
77 reader_to_resolved_names: Default::default(),
78 reader_fullnames,
79 reader_schema,
80 };
81 let writer_node = writer_schema.top_node_or_named();
82 let reader_node = reader_schema.top_node_or_named();
83 let inner = resolver.resolve(writer_node, reader_node)?;
84 let sch = Schema {
85 named: resolver.named.into_iter().map(Option::unwrap).collect(),
86 indices: resolver.indices,
87 top: inner,
88 };
89 Ok(sch)
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
94pub struct ParseSchemaError(String);
95
96impl ParseSchemaError {
97 pub fn new<S>(msg: S) -> ParseSchemaError
98 where
99 S: Into<String>,
100 {
101 ParseSchemaError(msg.into())
102 }
103}
104
105impl fmt::Display for ParseSchemaError {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 self.0.fmt(f)
108 }
109}
110
111impl std::error::Error for ParseSchemaError {}
112
113#[derive(Debug)]
117pub struct SchemaFingerprint {
118 pub bytes: Vec<u8>,
119}
120
121impl fmt::Display for SchemaFingerprint {
122 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
123 write!(
124 f,
125 "{}",
126 self.bytes
127 .iter()
128 .map(|byte| format!("{:02x}", byte))
129 .collect::<Vec<String>>()
130 .join("")
131 )
132 }
133}
134
135#[derive(Clone, Debug, PartialEq)]
136pub enum SchemaPieceOrNamed {
137 Piece(SchemaPiece),
138 Named(usize),
139}
140impl SchemaPieceOrNamed {
141 pub fn get_human_name(&self, root: &Schema) -> String {
142 match self {
143 Self::Piece(piece) => format!("{:?}", piece),
144 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
145 }
146 }
147 #[inline(always)]
148 pub fn get_piece_and_name<'a>(
149 &'a self,
150 root: &'a Schema,
151 ) -> (&'a SchemaPiece, Option<&'a FullName>) {
152 self.as_ref().get_piece_and_name(root)
153 }
154
155 #[inline(always)]
156 pub fn as_ref(&self) -> SchemaPieceRefOrNamed<'_> {
157 match self {
158 SchemaPieceOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
159 SchemaPieceOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(*index),
160 }
161 }
162}
163
164impl From<SchemaPiece> for SchemaPieceOrNamed {
165 #[inline(always)]
166 fn from(piece: SchemaPiece) -> Self {
167 Self::Piece(piece)
168 }
169}
170
171#[derive(Clone, Debug, PartialEq)]
172pub enum SchemaPiece {
173 Null,
175 Boolean,
177 Int,
179 Long,
181 Float,
183 Double,
185 Date,
187 TimestampMilli,
191 TimestampMicro,
195 Decimal {
201 precision: usize,
202 scale: usize,
203 fixed_size: Option<usize>,
204 },
205 Bytes,
208 String,
211 Json,
213 Uuid,
215 Array(Box<SchemaPieceOrNamed>),
218 Map(Box<SchemaPieceOrNamed>),
222 Union(UnionSchema),
224 ResolveIntTsMilli,
227 ResolveIntTsMicro,
230 ResolveDateTimestamp,
233 ResolveIntLong,
235 ResolveIntFloat,
237 ResolveIntDouble,
239 ResolveLongFloat,
241 ResolveLongDouble,
243 ResolveFloatDouble,
245 ResolveConcreteUnion {
248 index: usize,
250 inner: Box<SchemaPieceOrNamed>,
252 n_reader_variants: usize,
253 reader_null_variant: Option<usize>,
254 },
255 ResolveUnionUnion {
258 permutation: Vec<Result<(usize, SchemaPieceOrNamed), AvroError>>,
264 n_reader_variants: usize,
265 reader_null_variant: Option<usize>,
266 },
267 ResolveUnionConcrete {
269 index: usize,
270 inner: Box<SchemaPieceOrNamed>,
271 },
272 Record {
277 doc: Documentation,
278 fields: Vec<RecordField>,
279 lookup: BTreeMap<String, usize>,
280 },
281 Enum {
283 doc: Documentation,
284 symbols: Vec<String>,
285 default_idx: Option<usize>,
291 },
292 Fixed { size: usize },
294 ResolveRecord {
297 defaults: Vec<ResolvedDefaultValueField>,
300 fields: Vec<ResolvedRecordField>,
304 n_reader_fields: usize,
306 },
307 ResolveEnum {
310 doc: Documentation,
311 symbols: Vec<Result<(usize, String), String>>,
314 default: Option<(usize, String)>,
316 },
317}
318
319impl SchemaPiece {
320 pub fn is_underlying_int(&self) -> bool {
322 self.try_make_int_value(0).is_some()
323 }
324 pub fn is_underlying_long(&self) -> bool {
326 self.try_make_long_value(0).is_some()
327 }
328 pub fn try_make_int_value(&self, int: i32) -> Option<Result<AvroValue, AvroError>> {
331 match self {
332 SchemaPiece::Int => Some(Ok(AvroValue::Int(int))),
333 SchemaPiece::Date => Some(Ok(AvroValue::Date(int))),
336 _ => None,
337 }
338 }
339 pub fn try_make_long_value(&self, long: i64) -> Option<Result<AvroValue, AvroError>> {
342 match self {
343 SchemaPiece::Long => Some(Ok(AvroValue::Long(long))),
344 SchemaPiece::TimestampMilli => Some(build_ts_value(long, TsUnit::Millis)),
345 SchemaPiece::TimestampMicro => Some(build_ts_value(long, TsUnit::Micros)),
346 _ => None,
347 }
348 }
349}
350
351#[derive(Clone, PartialEq)]
355pub struct Schema {
356 pub(crate) named: Vec<NamedSchemaPiece>,
357 pub(crate) indices: BTreeMap<FullName, usize>,
358 pub top: SchemaPieceOrNamed,
359}
360
361impl ToString for Schema {
362 fn to_string(&self) -> String {
363 let json = serde_json::to_value(self).unwrap();
364 json.to_string()
365 }
366}
367
368impl std::fmt::Debug for Schema {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 if f.alternate() {
371 f.write_str(
372 &serde_json::to_string_pretty(self)
373 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
374 )
375 } else {
376 f.write_str(
377 &serde_json::to_string(self)
378 .unwrap_or_else(|e| format!("failed to serialize: {}", e)),
379 )
380 }
381 }
382}
383
384impl Schema {
385 pub fn top_node(&self) -> SchemaNode<'_> {
386 let (inner, name) = self.top.get_piece_and_name(self);
387 SchemaNode {
388 root: self,
389 inner,
390 name,
391 }
392 }
393 pub fn top_node_or_named(&self) -> SchemaNodeOrNamed<'_> {
394 SchemaNodeOrNamed {
395 root: self,
396 inner: self.top.as_ref(),
397 }
398 }
399 pub fn lookup(&self, idx: usize) -> &NamedSchemaPiece {
400 &self.named[idx]
401 }
402 pub fn try_lookup_name(&self, name: &FullName) -> Option<&NamedSchemaPiece> {
403 self.indices.get(name).map(|&idx| &self.named[idx])
404 }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
415pub enum SchemaKind {
416 Null,
418 Boolean,
419 Int,
420 Long,
421 Float,
422 Double,
423 Bytes,
425 String,
426 Array,
427 Map,
428 Union,
429 Record,
430 Enum,
431 Fixed,
432 Unknown,
435}
436
437impl SchemaKind {
438 pub fn name(self) -> &'static str {
439 match self {
440 SchemaKind::Null => "null",
441 SchemaKind::Boolean => "boolean",
442 SchemaKind::Int => "int",
443 SchemaKind::Long => "long",
444 SchemaKind::Float => "float",
445 SchemaKind::Double => "double",
446 SchemaKind::Bytes => "bytes",
447 SchemaKind::String => "string",
448 SchemaKind::Array => "array",
449 SchemaKind::Map => "map",
450 SchemaKind::Union => "union",
451 SchemaKind::Record => "record",
452 SchemaKind::Enum => "enum",
453 SchemaKind::Fixed => "fixed",
454 SchemaKind::Unknown => "unknown",
455 }
456 }
457}
458
459impl<'a> From<&'a SchemaPiece> for SchemaKind {
460 #[inline(always)]
461 fn from(piece: &'a SchemaPiece) -> SchemaKind {
462 match piece {
463 SchemaPiece::Null => SchemaKind::Null,
464 SchemaPiece::Boolean => SchemaKind::Boolean,
465 SchemaPiece::Int => SchemaKind::Int,
466 SchemaPiece::Long => SchemaKind::Long,
467 SchemaPiece::Float => SchemaKind::Float,
468 SchemaPiece::Double => SchemaKind::Double,
469 SchemaPiece::Date => SchemaKind::Int,
470 SchemaPiece::TimestampMilli
471 | SchemaPiece::TimestampMicro
472 | SchemaPiece::ResolveIntTsMilli
473 | SchemaPiece::ResolveDateTimestamp
474 | SchemaPiece::ResolveIntTsMicro => SchemaKind::Long,
475 SchemaPiece::Decimal {
476 fixed_size: None, ..
477 } => SchemaKind::Bytes,
478 SchemaPiece::Decimal {
479 fixed_size: Some(_),
480 ..
481 } => SchemaKind::Fixed,
482 SchemaPiece::Bytes => SchemaKind::Bytes,
483 SchemaPiece::String => SchemaKind::String,
484 SchemaPiece::Array(_) => SchemaKind::Array,
485 SchemaPiece::Map(_) => SchemaKind::Map,
486 SchemaPiece::Union(_) => SchemaKind::Union,
487 SchemaPiece::ResolveUnionUnion { .. } => SchemaKind::Union,
488 SchemaPiece::ResolveIntLong => SchemaKind::Long,
489 SchemaPiece::ResolveIntFloat => SchemaKind::Float,
490 SchemaPiece::ResolveIntDouble => SchemaKind::Double,
491 SchemaPiece::ResolveLongFloat => SchemaKind::Float,
492 SchemaPiece::ResolveLongDouble => SchemaKind::Double,
493 SchemaPiece::ResolveFloatDouble => SchemaKind::Double,
494 SchemaPiece::ResolveConcreteUnion { .. } => SchemaKind::Union,
495 SchemaPiece::ResolveUnionConcrete { inner: _, .. } => SchemaKind::Unknown,
496 SchemaPiece::Record { .. } => SchemaKind::Record,
497 SchemaPiece::Enum { .. } => SchemaKind::Enum,
498 SchemaPiece::Fixed { .. } => SchemaKind::Fixed,
499 SchemaPiece::ResolveRecord { .. } => SchemaKind::Record,
500 SchemaPiece::ResolveEnum { .. } => SchemaKind::Enum,
501 SchemaPiece::Json => SchemaKind::String,
502 SchemaPiece::Uuid => SchemaKind::String,
503 }
504 }
505}
506
507impl<'a> From<SchemaNode<'a>> for SchemaKind {
508 #[inline(always)]
509 fn from(schema: SchemaNode<'a>) -> SchemaKind {
510 SchemaKind::from(schema.inner)
511 }
512}
513
514impl<'a> From<&'a Schema> for SchemaKind {
515 #[inline(always)]
516 fn from(schema: &'a Schema) -> SchemaKind {
517 Self::from(schema.top_node())
518 }
519}
520
521#[derive(Clone, Debug, PartialEq)]
532pub struct Name {
533 pub name: String,
534 pub namespace: Option<String>,
535 pub aliases: Option<Vec<String>>,
536}
537
538#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
539pub struct FullName {
540 name: String,
541 namespace: String,
542}
543
544impl FullName {
545 pub fn from_parts(name: &str, namespace: Option<&str>, default_namespace: &str) -> FullName {
547 if let Some(ns) = namespace {
548 FullName {
549 name: name.to_owned(),
550 namespace: ns.to_owned(),
551 }
552 } else {
553 let mut split = name.rsplitn(2, '.');
554 let name = split.next().unwrap();
555 let namespace = split.next().unwrap_or(default_namespace);
556
557 FullName {
558 name: name.into(),
559 namespace: namespace.into(),
560 }
561 }
562 }
563 pub fn base_name(&self) -> &str {
564 &self.name
565 }
566 pub fn human_name(&self) -> String {
567 if self.namespace.is_empty() {
568 return self.name.clone();
569 }
570 format!("{}.{}", self.namespace, self.name)
571 }
572 pub fn short_name(&self, enclosing_ns: &str) -> Cow<'_, str> {
577 if enclosing_ns == &self.namespace {
578 Cow::Borrowed(&self.name)
579 } else {
580 Cow::Owned(format!("{}.{}", self.namespace, self.name))
581 }
582 }
583 pub fn namespace(&self) -> &str {
585 &self.namespace
586 }
587}
588
589impl fmt::Debug for FullName {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 write!(f, "{}.{}", self.namespace, self.name)
592 }
593}
594
595pub type Documentation = Option<String>;
597
598impl Name {
599 pub fn is_valid(name: &str) -> bool {
603 static MATCHER: LazyLock<Regex> =
604 LazyLock::new(|| Regex::new(r"(^[A-Za-z_][A-Za-z0-9_]*)$").unwrap());
605 MATCHER.is_match(name)
606 }
607
608 pub fn validate(name: &str) -> Result<(), AvroError> {
610 match Self::is_valid(name) {
611 true => Ok(()),
612 false => {
613 Err(ParseSchemaError::new(format!(
614 "Invalid name. Must start with [A-Za-z_] and subsequently only contain [A-Za-z0-9_]. Found: {}",
615 name
616 )).into())
617 }
618 }
619 }
620
621 pub fn make_valid(name: &str) -> String {
629 let mut out = String::new();
630 let mut chars = name.chars();
631 match chars.next() {
632 Some(ch @ ('a'..='z' | 'A'..='Z')) => out.push(ch),
633 Some(ch @ '0'..='9') => {
634 out.push('_');
635 out.push(ch);
636 }
637 _ => out.push('_'),
638 }
639 for ch in chars {
640 match ch {
641 '0'..='9' | 'a'..='z' | 'A'..='Z' => out.push(ch),
642 _ => out.push('_'),
643 }
644 }
645 debug_assert!(
646 Name::is_valid(&out),
647 "make_valid({name}) produced invalid name: {out}"
648 );
649 out
650 }
651
652 pub fn parse(complex: &Map<String, Value>) -> Result<Self, AvroError> {
654 let name = complex
655 .name()
656 .ok_or_else(|| ParseSchemaError::new("No `name` field"))?;
657 if name.is_empty() {
658 return Err(ParseSchemaError::new(format!(
659 "Name cannot be the empty string: {:?}",
660 complex
661 ))
662 .into());
663 }
664
665 let (namespace, name) = if let Some(index) = name.rfind('.') {
666 let computed_namespace = name[..index].to_owned();
667 let computed_name = name[index + 1..].to_owned();
668 if let Some(provided_namespace) = complex.string("namespace") {
669 if provided_namespace != computed_namespace {
670 warn!(
671 "Found dots in name {}, updating to namespace {} and name {}",
672 name, computed_namespace, computed_name
673 );
674 }
675 }
676 (Some(computed_namespace), computed_name)
677 } else {
678 (complex.string("namespace"), name)
679 };
680
681 Self::validate(&name)?;
682
683 let aliases: Option<Vec<String>> = complex
684 .get("aliases")
685 .and_then(|aliases| aliases.as_array())
686 .and_then(|aliases| {
687 aliases
688 .iter()
689 .map(|alias| alias.as_str())
690 .map(|alias| alias.map(|a| a.to_string()))
691 .collect::<Option<_>>()
692 });
693
694 Ok(Name {
695 name,
696 namespace,
697 aliases,
698 })
699 }
700
701 pub fn parse_simple(name: &str) -> Result<Self, AvroError> {
703 let mut map = serde_json::map::Map::new();
704 map.insert("name".into(), serde_json::Value::String(name.into()));
705 Self::parse(&map)
706 }
707
708 pub fn fullname(&self, default_namespace: &str) -> FullName {
713 FullName::from_parts(&self.name, self.namespace.as_deref(), default_namespace)
714 }
715}
716
717#[derive(Clone, Debug, PartialEq)]
718pub struct ResolvedDefaultValueField {
719 pub name: String,
720 pub doc: Documentation,
721 pub default: types::Value,
722 pub order: RecordFieldOrder,
723 pub position: usize,
724}
725
726#[derive(Clone, Debug, PartialEq)]
727pub enum ResolvedRecordField {
728 Absent(Schema),
729 Present(RecordField),
730}
731
732#[derive(Clone, Debug, PartialEq)]
734pub struct RecordField {
735 pub name: String,
737 pub doc: Documentation,
739 pub default: Option<Value>,
743 pub schema: SchemaPieceOrNamed,
745 pub order: RecordFieldOrder,
749 pub position: usize,
751}
752
753#[derive(Copy, Clone, Debug, PartialEq)]
755pub enum RecordFieldOrder {
756 Ascending,
757 Descending,
758 Ignore,
759}
760
761impl RecordField {}
762
763#[derive(Debug, Clone)]
764pub struct UnionSchema {
765 schemas: Vec<SchemaPieceOrNamed>,
766
767 anon_variant_index: BTreeMap<SchemaKind, usize>,
770
771 named_variant_index: BTreeMap<usize, usize>,
773}
774
775impl UnionSchema {
776 pub(crate) fn new(schemas: Vec<SchemaPieceOrNamed>) -> Result<Self, AvroError> {
777 let mut avindex = BTreeMap::new();
778 let mut nvindex = BTreeMap::new();
779 for (i, schema) in schemas.iter().enumerate() {
780 match schema {
781 SchemaPieceOrNamed::Piece(sp) => {
782 if let SchemaPiece::Union(_) = sp {
783 return Err(ParseSchemaError::new(
784 "Unions may not directly contain a union",
785 )
786 .into());
787 }
788 let kind = SchemaKind::from(sp);
789 if avindex.insert(kind, i).is_some() {
790 return Err(
791 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
792 );
793 }
794 }
795 SchemaPieceOrNamed::Named(idx) => {
796 if nvindex.insert(*idx, i).is_some() {
797 return Err(
798 ParseSchemaError::new("Unions cannot contain duplicate types").into(),
799 );
800 }
801 }
802 }
803 }
804 Ok(UnionSchema {
805 schemas,
806 anon_variant_index: avindex,
807 named_variant_index: nvindex,
808 })
809 }
810
811 pub fn variants(&self) -> &[SchemaPieceOrNamed] {
813 &self.schemas
814 }
815
816 pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
818 &mut self.schemas
819 }
820
821 pub fn is_nullable(&self) -> bool {
823 !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
824 }
825
826 pub fn match_piece(&self, sp: &SchemaPiece) -> Option<(usize, &SchemaPieceOrNamed)> {
827 self.anon_variant_index
828 .get(&SchemaKind::from(sp))
829 .map(|idx| (*idx, &self.schemas[*idx]))
830 }
831
832 pub fn match_ref(
833 &self,
834 other: SchemaPieceRefOrNamed,
835 names_map: &BTreeMap<usize, usize>,
836 ) -> Option<(usize, &SchemaPieceOrNamed)> {
837 match other {
838 SchemaPieceRefOrNamed::Piece(sp) => self.match_piece(sp),
839 SchemaPieceRefOrNamed::Named(idx) => names_map
840 .get(&idx)
841 .and_then(|idx| self.named_variant_index.get(idx))
842 .map(|idx| (*idx, &self.schemas[*idx])),
843 }
844 }
845
846 #[inline(always)]
847 pub fn match_(
848 &self,
849 other: &SchemaPieceOrNamed,
850 names_map: &BTreeMap<usize, usize>,
851 ) -> Option<(usize, &SchemaPieceOrNamed)> {
852 self.match_ref(other.as_ref(), names_map)
853 }
854
855 pub fn match_promote_writer(
859 &self,
860 writer: &SchemaPieceOrNamed,
861 names_map: &BTreeMap<usize, usize>,
862 ) -> Option<(usize, &SchemaPieceOrNamed)> {
863 self.match_ref_promote_writer(writer.as_ref(), names_map)
864 }
865
866 pub fn match_ref_promote_writer(
873 &self,
874 writer: SchemaPieceRefOrNamed,
875 names_map: &BTreeMap<usize, usize>,
876 ) -> Option<(usize, &SchemaPieceOrNamed)> {
877 if let Some(hit) = self.match_ref(writer, names_map) {
878 return Some(hit);
879 }
880 let SchemaPieceRefOrNamed::Piece(wp) = writer else {
881 return None;
882 };
883 self.schemas
884 .iter()
885 .enumerate()
886 .find_map(|(idx, variant)| match variant {
887 SchemaPieceOrNamed::Piece(rp) if can_promote(wp, rp) => Some((idx, variant)),
888 _ => None,
889 })
890 }
891
892 pub fn match_ref_promote_reader(
898 &self,
899 reader: SchemaPieceRefOrNamed,
900 names_map: &BTreeMap<usize, usize>,
901 ) -> Option<(usize, &SchemaPieceOrNamed)> {
902 if let Some(hit) = self.match_ref(reader, names_map) {
903 return Some(hit);
904 }
905 let SchemaPieceRefOrNamed::Piece(rp) = reader else {
906 return None;
907 };
908 self.schemas
909 .iter()
910 .enumerate()
911 .find_map(|(idx, variant)| match variant {
912 SchemaPieceOrNamed::Piece(wp) if can_promote(wp, rp) => Some((idx, variant)),
913 _ => None,
914 })
915 }
916}
917
918fn can_promote(writer: &SchemaPiece, reader: &SchemaPiece) -> bool {
927 use SchemaPiece::*;
928 matches!(
929 (writer, reader),
930 (Int, Long)
931 | (Int, Float)
932 | (Int, Double)
933 | (Long, Float)
934 | (Long, Double)
935 | (Float, Double)
936 )
937}
938
939impl PartialEq for UnionSchema {
941 fn eq(&self, other: &UnionSchema) -> bool {
942 self.schemas.eq(&other.schemas)
943 }
944}
945
946#[derive(Default)]
947struct SchemaParser {
948 named: Vec<Option<NamedSchemaPiece>>,
949 indices: BTreeMap<FullName, usize>,
950 depth: usize,
951}
952
953const MAX_SCHEMA_DEPTH: usize = 128;
958
959impl SchemaParser {
960 fn parse(mut self, value: &Value) -> Result<Schema, AvroError> {
961 let top = self.parse_inner("", value)?;
962 let SchemaParser { named, indices, .. } = self;
963 Ok(Schema {
964 named: named.into_iter().map(|o| o.unwrap()).collect(),
965 indices,
966 top,
967 })
968 }
969
970 fn parse_inner(
971 &mut self,
972 default_namespace: &str,
973 value: &Value,
974 ) -> Result<SchemaPieceOrNamed, AvroError> {
975 self.depth += 1;
976 if self.depth > MAX_SCHEMA_DEPTH {
977 self.depth -= 1;
978 return Err(ParseSchemaError::new(format!(
979 "schema nesting depth exceeds limit {MAX_SCHEMA_DEPTH}"
980 ))
981 .into());
982 }
983 let result = self.parse_inner_impl(default_namespace, value);
984 self.depth -= 1;
985 result
986 }
987
988 fn parse_inner_impl(
989 &mut self,
990 default_namespace: &str,
991 value: &Value,
992 ) -> Result<SchemaPieceOrNamed, AvroError> {
993 match *value {
994 Value::String(ref t) => {
995 let name = FullName::from_parts(t.as_str(), None, default_namespace);
996 if let Some(idx) = self.indices.get(&name) {
997 Ok(SchemaPieceOrNamed::Named(*idx))
998 } else {
999 Ok(SchemaPieceOrNamed::Piece(Schema::parse_primitive(
1000 t.as_str(),
1001 )?))
1002 }
1003 }
1004 Value::Object(ref data) => self.parse_complex(default_namespace, data),
1005 Value::Array(ref data) => Ok(SchemaPieceOrNamed::Piece(
1006 self.parse_union(default_namespace, data)?,
1007 )),
1008 _ => Err(ParseSchemaError::new("Must be a JSON string, object or array").into()),
1009 }
1010 }
1011
1012 fn alloc_name(&mut self, fullname: FullName) -> Result<usize, AvroError> {
1013 let idx = match self.indices.entry(fullname) {
1014 Entry::Vacant(ve) => *ve.insert(self.named.len()),
1015 Entry::Occupied(oe) => {
1016 return Err(ParseSchemaError::new(format!(
1017 "Sub-schema with name {:?} encountered multiple times",
1018 oe.key()
1019 ))
1020 .into());
1021 }
1022 };
1023 self.named.push(None);
1024 Ok(idx)
1025 }
1026
1027 fn insert(&mut self, index: usize, schema: NamedSchemaPiece) {
1028 assert_none!(self.named[index]);
1029 self.named[index] = Some(schema);
1030 }
1031
1032 fn parse_named_type(
1033 &mut self,
1034 type_name: &str,
1035 default_namespace: &str,
1036 complex: &Map<String, Value>,
1037 ) -> Result<usize, AvroError> {
1038 let name = Name::parse(complex)?;
1039 match name.name.as_str() {
1040 "null" | "boolean" | "int" | "long" | "float" | "double" | "bytes" | "string" => {
1041 return Err(ParseSchemaError::new(format!(
1042 "{} may not be used as a custom type name",
1043 name.name
1044 ))
1045 .into());
1046 }
1047 _ => {}
1048 };
1049 let fullname = name.fullname(default_namespace);
1050 let default_namespace = fullname.namespace.clone();
1051 let idx = self.alloc_name(fullname.clone())?;
1052 let piece = match type_name {
1053 "record" => self.parse_record(&default_namespace, complex),
1054 "enum" => self.parse_enum(complex),
1055 "fixed" => self.parse_fixed(&default_namespace, complex),
1056 _ => unreachable!("Unknown named type kind: {}", type_name),
1057 }?;
1058
1059 self.insert(
1060 idx,
1061 NamedSchemaPiece {
1062 name: fullname,
1063 piece,
1064 },
1065 );
1066
1067 Ok(idx)
1068 }
1069
1070 fn parse_complex(
1076 &mut self,
1077 default_namespace: &str,
1078 complex: &Map<String, Value>,
1079 ) -> Result<SchemaPieceOrNamed, AvroError> {
1080 match complex.get("type") {
1081 Some(&Value::String(ref t)) => Ok(match t.as_str() {
1082 "record" | "enum" | "fixed" => SchemaPieceOrNamed::Named(self.parse_named_type(
1083 t,
1084 default_namespace,
1085 complex,
1086 )?),
1087 "array" => SchemaPieceOrNamed::Piece(self.parse_array(default_namespace, complex)?),
1088 "map" => SchemaPieceOrNamed::Piece(self.parse_map(default_namespace, complex)?),
1089 "bytes" => SchemaPieceOrNamed::Piece(Self::parse_bytes(complex)?),
1090 "int" => SchemaPieceOrNamed::Piece(Self::parse_int(complex)?),
1091 "long" => SchemaPieceOrNamed::Piece(Self::parse_long(complex)?),
1092 "string" => SchemaPieceOrNamed::Piece(Self::from_string(complex)),
1093 other => {
1094 let name = FullName {
1095 name: other.into(),
1096 namespace: default_namespace.into(),
1097 };
1098 if let Some(idx) = self.indices.get(&name) {
1099 SchemaPieceOrNamed::Named(*idx)
1100 } else {
1101 SchemaPieceOrNamed::Piece(Schema::parse_primitive(t.as_str())?)
1102 }
1103 }
1104 }),
1105 Some(&Value::Object(ref data)) => match data.get("type") {
1106 Some(value) => self.parse_inner(default_namespace, value),
1107 None => Err(
1108 ParseSchemaError::new(format!("Unknown complex type: {:?}", complex)).into(),
1109 ),
1110 },
1111 _ => Err(ParseSchemaError::new("No `type` in complex type").into()),
1112 }
1113 }
1114
1115 fn parse_record(
1118 &mut self,
1119 default_namespace: &str,
1120 complex: &Map<String, Value>,
1121 ) -> Result<SchemaPiece, AvroError> {
1122 let mut lookup = BTreeMap::new();
1123
1124 let fields: Vec<RecordField> = complex
1125 .get("fields")
1126 .and_then(|fields| fields.as_array())
1127 .ok_or_else(|| ParseSchemaError::new("No `fields` in record").into())
1128 .and_then(|fields| {
1129 fields
1130 .iter()
1131 .filter_map(|field| field.as_object())
1132 .enumerate()
1133 .map(|(position, field)| {
1134 self.parse_record_field(default_namespace, field, position)
1135 })
1136 .collect::<Result<_, _>>()
1137 })?;
1138
1139 for field in &fields {
1140 lookup.insert(field.name.clone(), field.position);
1141 }
1142
1143 Ok(SchemaPiece::Record {
1144 doc: complex.doc(),
1145 fields,
1146 lookup,
1147 })
1148 }
1149
1150 fn parse_record_field(
1152 &mut self,
1153 default_namespace: &str,
1154 field: &Map<String, Value>,
1155 position: usize,
1156 ) -> Result<RecordField, AvroError> {
1157 let name = field
1158 .name()
1159 .ok_or_else(|| ParseSchemaError::new("No `name` in record field"))?;
1160
1161 Name::validate(&name)?;
1162
1163 let schema = field
1164 .get("type")
1165 .ok_or_else(|| ParseSchemaError::new("No `type` in record field").into())
1166 .and_then(|type_| self.parse_inner(default_namespace, type_))?;
1167
1168 let default = field.get("default").cloned();
1169
1170 let order = field
1171 .get("order")
1172 .and_then(|order| order.as_str())
1173 .and_then(|order| match order {
1174 "ascending" => Some(RecordFieldOrder::Ascending),
1175 "descending" => Some(RecordFieldOrder::Descending),
1176 "ignore" => Some(RecordFieldOrder::Ignore),
1177 _ => None,
1178 })
1179 .unwrap_or(RecordFieldOrder::Ascending);
1180
1181 Ok(RecordField {
1182 name,
1183 doc: field.doc(),
1184 default,
1185 schema,
1186 order,
1187 position,
1188 })
1189 }
1190
1191 fn parse_enum(&self, complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1194 let symbols: Vec<String> = complex
1195 .get("symbols")
1196 .and_then(|v| v.as_array())
1197 .ok_or_else(|| ParseSchemaError::new("No `symbols` field in enum"))
1198 .and_then(|symbols| {
1199 symbols
1200 .iter()
1201 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1202 .collect::<Option<_>>()
1203 .ok_or_else(|| ParseSchemaError::new("Unable to parse `symbols` in enum"))
1204 })?;
1205
1206 let mut unique_symbols: BTreeSet<&String> = BTreeSet::new();
1207 for symbol in symbols.iter() {
1208 if unique_symbols.contains(symbol) {
1209 return Err(ParseSchemaError::new(format!(
1210 "Enum symbols must be unique, found multiple: {}",
1211 symbol
1212 ))
1213 .into());
1214 } else {
1215 unique_symbols.insert(symbol);
1216 }
1217 }
1218
1219 let default_idx = if let Some(default) = complex.get("default") {
1220 let default_str = default.as_str().ok_or_else(|| {
1221 ParseSchemaError::new(format!(
1222 "Enum default should be a string, got: {:?}",
1223 default
1224 ))
1225 })?;
1226 let default_idx = symbols
1227 .iter()
1228 .position(|x| x == default_str)
1229 .ok_or_else(|| {
1230 ParseSchemaError::new(format!(
1231 "Enum default not found in list of symbols: {}",
1232 default_str
1233 ))
1234 })?;
1235 Some(default_idx)
1236 } else {
1237 None
1238 };
1239
1240 Ok(SchemaPiece::Enum {
1241 doc: complex.doc(),
1242 symbols,
1243 default_idx,
1244 })
1245 }
1246
1247 fn parse_array(
1250 &mut self,
1251 default_namespace: &str,
1252 complex: &Map<String, Value>,
1253 ) -> Result<SchemaPiece, AvroError> {
1254 complex
1255 .get("items")
1256 .ok_or_else(|| ParseSchemaError::new("No `items` in array").into())
1257 .and_then(|items| self.parse_inner(default_namespace, items))
1258 .map(|schema| SchemaPiece::Array(Box::new(schema)))
1259 }
1260
1261 fn parse_map(
1264 &mut self,
1265 default_namespace: &str,
1266 complex: &Map<String, Value>,
1267 ) -> Result<SchemaPiece, AvroError> {
1268 complex
1269 .get("values")
1270 .ok_or_else(|| ParseSchemaError::new("No `values` in map").into())
1271 .and_then(|items| self.parse_inner(default_namespace, items))
1272 .map(|schema| SchemaPiece::Map(Box::new(schema)))
1273 }
1274
1275 fn parse_union(
1278 &mut self,
1279 default_namespace: &str,
1280 items: &[Value],
1281 ) -> Result<SchemaPiece, AvroError> {
1282 items
1283 .iter()
1284 .map(|value| self.parse_inner(default_namespace, value))
1285 .collect::<Result<Vec<_>, _>>()
1286 .and_then(|schemas| Ok(SchemaPiece::Union(UnionSchema::new(schemas)?)))
1287 }
1288
1289 fn parse_decimal(complex: &Map<String, Value>) -> Result<(usize, usize), AvroError> {
1292 let precision = complex
1293 .get("precision")
1294 .and_then(|v| v.as_i64())
1295 .ok_or_else(|| ParseSchemaError::new("No `precision` in decimal"))?;
1296
1297 let scale = complex.get("scale").and_then(|v| v.as_i64()).unwrap_or(0);
1298
1299 if scale < 0 {
1300 return Err(ParseSchemaError::new("Decimal scale must be greater than zero").into());
1301 }
1302
1303 if precision < 0 {
1304 return Err(
1305 ParseSchemaError::new("Decimal precision must be greater than zero").into(),
1306 );
1307 }
1308
1309 if scale > precision {
1310 return Err(ParseSchemaError::new("Decimal scale is greater than precision").into());
1311 }
1312
1313 Ok((precision as usize, scale as usize))
1314 }
1315
1316 fn parse_bytes(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1319 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1320
1321 if let Some("decimal") = logical_type {
1322 match Self::parse_decimal(complex) {
1323 Ok((precision, scale)) => {
1324 return Ok(SchemaPiece::Decimal {
1325 precision,
1326 scale,
1327 fixed_size: None,
1328 });
1329 }
1330 Err(e) => warn!(
1331 "parsing decimal as regular bytes due to parse error: {:?}, {:?}",
1332 complex, e
1333 ),
1334 }
1335 }
1336
1337 Ok(SchemaPiece::Bytes)
1338 }
1339
1340 fn parse_int(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1348 const AVRO_DATE: &str = "date";
1349 const DEBEZIUM_DATE: &str = "io.debezium.time.Date";
1350 const KAFKA_DATE: &str = "org.apache.kafka.connect.data.Date";
1351 if let Some(name) = complex.get("connect.name") {
1352 if name == DEBEZIUM_DATE || name == KAFKA_DATE {
1353 if name == KAFKA_DATE {
1354 warn!("using deprecated debezium date format");
1355 }
1356 return Ok(SchemaPiece::Date);
1357 }
1358 }
1359 if let Some(name) = complex.get("logicalType") {
1363 if name == AVRO_DATE {
1364 return Ok(SchemaPiece::Date);
1365 }
1366 }
1367 if !complex.is_empty() {
1368 debug!("parsing complex type as regular int: {:?}", complex);
1369 }
1370 Ok(SchemaPiece::Int)
1371 }
1372
1373 fn parse_long(complex: &Map<String, Value>) -> Result<SchemaPiece, AvroError> {
1381 const AVRO_MILLI_TS: &str = "timestamp-millis";
1382 const AVRO_MICRO_TS: &str = "timestamp-micros";
1383
1384 const CONNECT_MILLI_TS: &[&str] = &[
1385 "io.debezium.time.Timestamp",
1386 "org.apache.kafka.connect.data.Timestamp",
1387 ];
1388 const CONNECT_MICRO_TS: &str = "io.debezium.time.MicroTimestamp";
1389
1390 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1391 if CONNECT_MILLI_TS.contains(&&**name) {
1392 return Ok(SchemaPiece::TimestampMilli);
1393 }
1394 if name == CONNECT_MICRO_TS {
1395 return Ok(SchemaPiece::TimestampMicro);
1396 }
1397 }
1398 if let Some(name) = complex.get("logicalType") {
1399 if name == AVRO_MILLI_TS {
1400 return Ok(SchemaPiece::TimestampMilli);
1401 }
1402 if name == AVRO_MICRO_TS {
1403 return Ok(SchemaPiece::TimestampMicro);
1404 }
1405 }
1406 if !complex.is_empty() {
1407 debug!("parsing complex type as regular long: {:?}", complex);
1408 }
1409 Ok(SchemaPiece::Long)
1410 }
1411
1412 fn from_string(complex: &Map<String, Value>) -> SchemaPiece {
1413 const CONNECT_JSON: &str = "io.debezium.data.Json";
1414
1415 if let Some(serde_json::Value::String(name)) = complex.get("connect.name") {
1416 if CONNECT_JSON == name.as_str() {
1417 return SchemaPiece::Json;
1418 }
1419 }
1420 if let Some(name) = complex.get("logicalType") {
1421 if name == "uuid" {
1422 return SchemaPiece::Uuid;
1423 }
1424 }
1425 debug!("parsing complex type as regular string: {:?}", complex);
1426 SchemaPiece::String
1427 }
1428
1429 fn parse_fixed(
1432 &self,
1433 _default_namespace: &str,
1434 complex: &Map<String, Value>,
1435 ) -> Result<SchemaPiece, AvroError> {
1436 let _name = Name::parse(complex)?;
1437
1438 let size = complex
1439 .get("size")
1440 .and_then(|v| v.as_i64())
1441 .ok_or_else(|| ParseSchemaError::new("No `size` in fixed"))?;
1442 if size <= 0 {
1443 return Err(ParseSchemaError::new(format!(
1444 "Fixed values require a positive size attribute, found: {}",
1445 size
1446 ))
1447 .into());
1448 }
1449
1450 let logical_type = complex.get("logicalType").and_then(|v| v.as_str());
1451
1452 if let Some("decimal") = logical_type {
1453 match Self::parse_decimal(complex) {
1454 Ok((precision, scale)) => {
1455 let max = ((8 * size - 1) as f64 * 2_f64.log10()).floor() as usize;
1456 if precision > max {
1457 warn!(
1458 "Decimal precision {} requires more than {} bytes of space, parsing as fixed",
1459 precision, size
1460 );
1461 } else {
1462 return Ok(SchemaPiece::Decimal {
1463 precision,
1464 scale,
1465 fixed_size: Some(size as usize),
1466 });
1467 }
1468 }
1469 Err(e) => warn!(
1470 "parsing decimal as fixed due to parse error: {:?}, {:?}",
1471 complex, e
1472 ),
1473 }
1474 }
1475
1476 Ok(SchemaPiece::Fixed {
1477 size: size as usize,
1478 })
1479 }
1480}
1481
1482impl Schema {
1483 pub fn parse(value: &Value) -> Result<Self, AvroError> {
1486 Self::parse_with_references(value, &[])
1487 }
1488
1489 pub fn parse_with_references(
1500 primary: &Value,
1501 reference_schemas: &[Schema],
1502 ) -> Result<Self, AvroError> {
1503 let (named, indices) = Self::collect_named_types(reference_schemas);
1505
1506 let p = SchemaParser {
1508 named: named.into_iter().map(Some).collect(),
1509 indices,
1510 depth: 0,
1511 };
1512 p.parse(primary)
1513 }
1514
1515 fn collect_named_types(
1520 schemas: &[Schema],
1521 ) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
1522 let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
1523 let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();
1524
1525 for schema in schemas {
1526 let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
1530 let mut new_type_offset = combined_named.len();
1531
1532 for named_piece in &schema.named {
1533 if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
1534 index_map.push(existing_idx);
1536 } else {
1537 index_map.push(new_type_offset);
1539 new_type_offset += 1;
1540 }
1541 }
1542
1543 for named_piece in &schema.named {
1545 if combined_indices.contains_key(&named_piece.name) {
1546 continue;
1547 }
1548
1549 let mut remapped = named_piece.clone();
1550 Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);
1551
1552 let new_idx = combined_named.len();
1553 combined_indices.insert(remapped.name.clone(), new_idx);
1554 combined_named.push(remapped);
1555 }
1556 }
1557
1558 (combined_named, combined_indices)
1559 }
1560
1561 fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
1563 match piece {
1564 SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
1565 SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
1566 SchemaPiece::Union(union) => {
1567 for variant in union.variants_mut() {
1568 Self::remap_indices_with_map(variant, index_map);
1569 }
1570 }
1571 SchemaPiece::Record { fields, .. } => {
1572 for field in fields {
1573 Self::remap_indices_with_map(&mut field.schema, index_map);
1574 }
1575 }
1576 _ => {}
1577 }
1578 }
1579
1580 fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
1582 match item {
1583 SchemaPieceOrNamed::Named(idx) => {
1584 if let Some(&new_idx) = index_map.get(*idx) {
1585 *idx = new_idx;
1586 }
1587 }
1588 SchemaPieceOrNamed::Piece(piece) => {
1589 Self::remap_indices_in_piece_with_map(piece, index_map)
1590 }
1591 }
1592 }
1593
1594 pub fn canonical_form(&self) -> String {
1599 let json = serde_json::to_value(self).unwrap();
1600 parsing_canonical_form(&json)
1601 }
1602
1603 pub fn fingerprint(&self, algorithm: &'static digest::Algorithm) -> SchemaFingerprint {
1608 let hash = digest::digest(algorithm, self.canonical_form().as_bytes());
1609 SchemaFingerprint {
1610 bytes: hash.as_ref().to_vec(),
1611 }
1612 }
1613
1614 fn parse_primitive(primitive: &str) -> Result<SchemaPiece, AvroError> {
1617 match primitive {
1618 "null" => Ok(SchemaPiece::Null),
1619 "boolean" => Ok(SchemaPiece::Boolean),
1620 "int" => Ok(SchemaPiece::Int),
1621 "long" => Ok(SchemaPiece::Long),
1622 "double" => Ok(SchemaPiece::Double),
1623 "float" => Ok(SchemaPiece::Float),
1624 "bytes" => Ok(SchemaPiece::Bytes),
1625 "string" => Ok(SchemaPiece::String),
1626 other => Err(ParseSchemaError::new(format!("Unknown type: {}", other)).into()),
1627 }
1628 }
1629}
1630
1631impl FromStr for Schema {
1632 type Err = AvroError;
1633
1634 fn from_str(input: &str) -> Result<Self, AvroError> {
1636 let value = serde_json::from_str(input)
1637 .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?;
1638 Self::parse(&value)
1639 }
1640}
1641
1642#[derive(Clone, Debug, PartialEq)]
1643pub struct NamedSchemaPiece {
1644 pub name: FullName,
1645 pub piece: SchemaPiece,
1646}
1647
1648#[derive(Copy, Clone, Debug)]
1649pub struct SchemaNode<'a> {
1650 pub root: &'a Schema,
1651 pub inner: &'a SchemaPiece,
1652 pub name: Option<&'a FullName>,
1653}
1654
1655#[derive(Copy, Clone, Debug)]
1656pub enum SchemaPieceRefOrNamed<'a> {
1657 Piece(&'a SchemaPiece),
1658 Named(usize),
1659}
1660
1661impl<'a> SchemaPieceRefOrNamed<'a> {
1662 pub fn get_human_name(&self, root: &Schema) -> String {
1663 match self {
1664 Self::Piece(piece) => format!("{:?}", piece),
1665 Self::Named(idx) => format!("{:?}", root.lookup(*idx).name),
1666 }
1667 }
1668
1669 #[inline(always)]
1670 pub fn get_piece_and_name(self, root: &'a Schema) -> (&'a SchemaPiece, Option<&'a FullName>) {
1671 match self {
1672 SchemaPieceRefOrNamed::Piece(sp) => (sp, None),
1673 SchemaPieceRefOrNamed::Named(index) => {
1674 let named_piece = root.lookup(index);
1675 (&named_piece.piece, Some(&named_piece.name))
1676 }
1677 }
1678 }
1679}
1680
1681#[derive(Copy, Clone, Debug)]
1682pub struct SchemaNodeOrNamed<'a> {
1683 pub root: &'a Schema,
1684 pub inner: SchemaPieceRefOrNamed<'a>,
1685}
1686
1687impl<'a> SchemaNodeOrNamed<'a> {
1688 #[inline(always)]
1689 pub fn lookup(self) -> SchemaNode<'a> {
1690 let (inner, name) = self.inner.get_piece_and_name(self.root);
1691 SchemaNode {
1692 root: self.root,
1693 inner,
1694 name,
1695 }
1696 }
1697 #[inline(always)]
1698 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1699 self.step_ref(next.as_ref())
1700 }
1701 #[inline(always)]
1702 pub fn step_ref(self, next: SchemaPieceRefOrNamed<'a>) -> Self {
1703 Self {
1704 root: self.root,
1705 inner: match next {
1706 SchemaPieceRefOrNamed::Piece(piece) => SchemaPieceRefOrNamed::Piece(piece),
1707 SchemaPieceRefOrNamed::Named(index) => SchemaPieceRefOrNamed::Named(index),
1708 },
1709 }
1710 }
1711
1712 pub fn to_schema(self) -> Schema {
1713 let mut cloner = SchemaSubtreeDeepCloner {
1714 old_root: self.root,
1715 old_to_new_names: Default::default(),
1716 named: Default::default(),
1717 };
1718 let piece = cloner.clone_piece_or_named(self.inner);
1719 let named: Vec<NamedSchemaPiece> = cloner.named.into_iter().map(Option::unwrap).collect();
1720 let indices: BTreeMap<FullName, usize> = named
1721 .iter()
1722 .enumerate()
1723 .map(|(i, nsp)| (nsp.name.clone(), i))
1724 .collect();
1725 Schema {
1726 named,
1727 indices,
1728 top: piece,
1729 }
1730 }
1731
1732 pub fn namespace(self) -> Option<&'a str> {
1733 let SchemaNode { name, .. } = self.lookup();
1734 name.map(|FullName { namespace, .. }| namespace.as_str())
1735 }
1736}
1737
1738struct SchemaSubtreeDeepCloner<'a> {
1739 old_root: &'a Schema,
1740 old_to_new_names: BTreeMap<usize, usize>,
1741 named: Vec<Option<NamedSchemaPiece>>,
1742}
1743
1744impl<'a> SchemaSubtreeDeepCloner<'a> {
1745 fn clone_piece(&mut self, piece: &SchemaPiece) -> SchemaPiece {
1746 match piece {
1747 SchemaPiece::Null => SchemaPiece::Null,
1748 SchemaPiece::Boolean => SchemaPiece::Boolean,
1749 SchemaPiece::Int => SchemaPiece::Int,
1750 SchemaPiece::Long => SchemaPiece::Long,
1751 SchemaPiece::Float => SchemaPiece::Float,
1752 SchemaPiece::Double => SchemaPiece::Double,
1753 SchemaPiece::Date => SchemaPiece::Date,
1754 SchemaPiece::TimestampMilli => SchemaPiece::TimestampMilli,
1755 SchemaPiece::TimestampMicro => SchemaPiece::TimestampMicro,
1756 SchemaPiece::Json => SchemaPiece::Json,
1757 SchemaPiece::Decimal {
1758 scale,
1759 precision,
1760 fixed_size,
1761 } => SchemaPiece::Decimal {
1762 scale: *scale,
1763 precision: *precision,
1764 fixed_size: *fixed_size,
1765 },
1766 SchemaPiece::Bytes => SchemaPiece::Bytes,
1767 SchemaPiece::String => SchemaPiece::String,
1768 SchemaPiece::Uuid => SchemaPiece::Uuid,
1769 SchemaPiece::Array(inner) => {
1770 SchemaPiece::Array(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1771 }
1772 SchemaPiece::Map(inner) => {
1773 SchemaPiece::Map(Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())))
1774 }
1775 SchemaPiece::Union(us) => SchemaPiece::Union(UnionSchema {
1776 schemas: us
1777 .schemas
1778 .iter()
1779 .map(|s| self.clone_piece_or_named(s.as_ref()))
1780 .collect(),
1781 anon_variant_index: us.anon_variant_index.clone(),
1782 named_variant_index: us.named_variant_index.clone(),
1783 }),
1784 SchemaPiece::ResolveIntLong => SchemaPiece::ResolveIntLong,
1785 SchemaPiece::ResolveIntFloat => SchemaPiece::ResolveIntFloat,
1786 SchemaPiece::ResolveIntDouble => SchemaPiece::ResolveIntDouble,
1787 SchemaPiece::ResolveLongFloat => SchemaPiece::ResolveLongFloat,
1788 SchemaPiece::ResolveLongDouble => SchemaPiece::ResolveLongDouble,
1789 SchemaPiece::ResolveFloatDouble => SchemaPiece::ResolveFloatDouble,
1790 SchemaPiece::ResolveIntTsMilli => SchemaPiece::ResolveIntTsMilli,
1791 SchemaPiece::ResolveIntTsMicro => SchemaPiece::ResolveIntTsMicro,
1792 SchemaPiece::ResolveDateTimestamp => SchemaPiece::ResolveDateTimestamp,
1793 SchemaPiece::ResolveConcreteUnion {
1794 index,
1795 inner,
1796 n_reader_variants,
1797 reader_null_variant,
1798 } => SchemaPiece::ResolveConcreteUnion {
1799 index: *index,
1800 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1801 n_reader_variants: *n_reader_variants,
1802 reader_null_variant: *reader_null_variant,
1803 },
1804 SchemaPiece::ResolveUnionUnion {
1805 permutation,
1806 n_reader_variants,
1807 reader_null_variant,
1808 } => SchemaPiece::ResolveUnionUnion {
1809 permutation: permutation
1810 .clone()
1811 .into_iter()
1812 .map(|o| o.map(|(idx, piece)| (idx, self.clone_piece_or_named(piece.as_ref()))))
1813 .collect(),
1814 n_reader_variants: *n_reader_variants,
1815 reader_null_variant: *reader_null_variant,
1816 },
1817 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1818 SchemaPiece::ResolveUnionConcrete {
1819 index: *index,
1820 inner: Box::new(self.clone_piece_or_named(inner.as_ref().as_ref())),
1821 }
1822 }
1823 SchemaPiece::Record {
1824 doc,
1825 fields,
1826 lookup,
1827 } => SchemaPiece::Record {
1828 doc: doc.clone(),
1829 fields: fields
1830 .iter()
1831 .map(|rf| RecordField {
1832 name: rf.name.clone(),
1833 doc: rf.doc.clone(),
1834 default: rf.default.clone(),
1835 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1836 order: rf.order,
1837 position: rf.position,
1838 })
1839 .collect(),
1840 lookup: lookup.clone(),
1841 },
1842 SchemaPiece::Enum {
1843 doc,
1844 symbols,
1845 default_idx,
1846 } => SchemaPiece::Enum {
1847 doc: doc.clone(),
1848 symbols: symbols.clone(),
1849 default_idx: *default_idx,
1850 },
1851 SchemaPiece::Fixed { size } => SchemaPiece::Fixed { size: *size },
1852 SchemaPiece::ResolveRecord {
1853 defaults,
1854 fields,
1855 n_reader_fields,
1856 } => SchemaPiece::ResolveRecord {
1857 defaults: defaults.clone(),
1858 fields: fields
1859 .iter()
1860 .map(|rf| match rf {
1861 ResolvedRecordField::Present(rf) => {
1862 ResolvedRecordField::Present(RecordField {
1863 name: rf.name.clone(),
1864 doc: rf.doc.clone(),
1865 default: rf.default.clone(),
1866 schema: self.clone_piece_or_named(rf.schema.as_ref()),
1867 order: rf.order,
1868 position: rf.position,
1869 })
1870 }
1871 ResolvedRecordField::Absent(writer_schema) => {
1872 ResolvedRecordField::Absent(writer_schema.clone())
1873 }
1874 })
1875 .collect(),
1876 n_reader_fields: *n_reader_fields,
1877 },
1878 SchemaPiece::ResolveEnum {
1879 doc,
1880 symbols,
1881 default,
1882 } => SchemaPiece::ResolveEnum {
1883 doc: doc.clone(),
1884 symbols: symbols.clone(),
1885 default: default.clone(),
1886 },
1887 }
1888 }
1889 fn clone_piece_or_named(&mut self, piece: SchemaPieceRefOrNamed) -> SchemaPieceOrNamed {
1890 match piece {
1891 SchemaPieceRefOrNamed::Piece(piece) => self.clone_piece(piece).into(),
1892 SchemaPieceRefOrNamed::Named(index) => {
1893 let new_index = match self.old_to_new_names.entry(index) {
1894 Entry::Vacant(ve) => {
1895 let new_index = self.named.len();
1896 self.named.push(None);
1897 ve.insert(new_index);
1898 let old_named_piece = self.old_root.lookup(index);
1899 let new_named_piece = NamedSchemaPiece {
1900 name: old_named_piece.name.clone(),
1901 piece: self.clone_piece(&old_named_piece.piece),
1902 };
1903 self.named[new_index] = Some(new_named_piece);
1904 new_index
1905 }
1906 Entry::Occupied(oe) => *oe.get(),
1907 };
1908 SchemaPieceOrNamed::Named(new_index)
1909 }
1910 }
1911 }
1912}
1913
1914impl<'a> SchemaNode<'a> {
1915 #[inline(always)]
1916 pub fn step(self, next: &'a SchemaPieceOrNamed) -> Self {
1917 let (inner, name) = next.get_piece_and_name(self.root);
1918 Self {
1919 root: self.root,
1920 inner,
1921 name,
1922 }
1923 }
1924
1925 pub fn json_to_value(self, json: &serde_json::Value) -> Result<AvroValue, ParseSchemaError> {
1926 use serde_json::Value::*;
1927 let val = match (json, self.inner) {
1928 (json, SchemaPiece::Union(us)) => match us.schemas.first() {
1930 Some(variant) => AvroValue::Union {
1931 index: 0,
1932 inner: Box::new(self.step(variant).json_to_value(json)?),
1933 n_variants: us.schemas.len(),
1934 null_variant: us
1935 .schemas
1936 .iter()
1937 .position(|s| s == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
1938 },
1939 None => return Err(ParseSchemaError("Union schema has no variants".to_owned())),
1940 },
1941 (Null, SchemaPiece::Null) => AvroValue::Null,
1942 (Bool(b), SchemaPiece::Boolean) => AvroValue::Boolean(*b),
1943 (Number(n), piece) => {
1944 match piece {
1945 piece if piece.is_underlying_int() => {
1946 let i =
1947 n.as_i64()
1948 .and_then(|i| i32::try_from(i).ok())
1949 .ok_or_else(|| {
1950 ParseSchemaError(format!("{} is not a 32-bit integer", n))
1951 })?;
1952 piece.try_make_int_value(i).unwrap().map_err(|e| {
1953 ParseSchemaError(format!("invalid default int {i}: {e}"))
1954 })?
1955 }
1956 piece if piece.is_underlying_long() => {
1957 let i = n.as_i64().ok_or_else(|| {
1958 ParseSchemaError(format!("{} is not a 64-bit integer", n))
1959 })?;
1960 piece.try_make_long_value(i).unwrap().map_err(|e| {
1961 ParseSchemaError(format!("invalid default long {i}: {e}"))
1962 })?
1963 }
1964 SchemaPiece::Float => {
1965 let f = n.as_f64().ok_or_else(|| {
1966 ParseSchemaError(format!("{} is not a 32-bit float", n))
1967 })?;
1968 AvroValue::Float(f as f32)
1969 }
1970 SchemaPiece::Double => {
1971 let f = n.as_f64().ok_or_else(|| {
1972 ParseSchemaError(format!("{} is not a 64-bit float", n))
1973 })?;
1974 AvroValue::Double(f)
1975 }
1976 _ => {
1977 return Err(ParseSchemaError(format!(
1978 "Unexpected number in default: {}",
1979 n
1980 )));
1981 }
1982 }
1983 }
1984 (String(s), piece)
1985 if s.eq_ignore_ascii_case("nan")
1986 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1987 {
1988 match piece {
1989 SchemaPiece::Float => AvroValue::Float(f32::NAN),
1990 SchemaPiece::Double => AvroValue::Double(f64::NAN),
1991 _ => unreachable!(),
1992 }
1993 }
1994 (String(s), piece)
1995 if s.eq_ignore_ascii_case("infinity")
1996 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
1997 {
1998 match piece {
1999 SchemaPiece::Float => AvroValue::Float(f32::INFINITY),
2000 SchemaPiece::Double => AvroValue::Double(f64::INFINITY),
2001 _ => unreachable!(),
2002 }
2003 }
2004 (String(s), piece)
2005 if s.eq_ignore_ascii_case("-infinity")
2006 && (piece == &SchemaPiece::Float || piece == &SchemaPiece::Double) =>
2007 {
2008 match piece {
2009 SchemaPiece::Float => AvroValue::Float(f32::NEG_INFINITY),
2010 SchemaPiece::Double => AvroValue::Double(f64::NEG_INFINITY),
2011 _ => unreachable!(),
2012 }
2013 }
2014 (String(s), SchemaPiece::Bytes) => AvroValue::Bytes(s.clone().into_bytes()),
2015 (
2016 String(s),
2017 SchemaPiece::Decimal {
2018 precision, scale, ..
2019 },
2020 ) => AvroValue::Decimal(DecimalValue {
2021 precision: *precision,
2022 scale: *scale,
2023 unscaled: s.clone().into_bytes(),
2024 }),
2025 (String(s), SchemaPiece::String) => AvroValue::String(s.clone()),
2026 (Object(map), SchemaPiece::Record { fields, .. }) => {
2027 let field_values = fields
2028 .iter()
2029 .map(|rf| {
2030 let jval = map.get(&rf.name).ok_or_else(|| {
2031 ParseSchemaError(format!(
2032 "Field not found in default value: {}",
2033 rf.name
2034 ))
2035 })?;
2036 let value = self.step(&rf.schema).json_to_value(jval)?;
2037 Ok((rf.name.clone(), value))
2038 })
2039 .collect::<Result<Vec<(std::string::String, AvroValue)>, ParseSchemaError>>()?;
2040 AvroValue::Record(field_values)
2041 }
2042 (String(s), SchemaPiece::Enum { symbols, .. }) => {
2043 match symbols.iter().find_position(|sym| s == *sym) {
2044 Some((index, sym)) => AvroValue::Enum(index, sym.clone()),
2045 None => return Err(ParseSchemaError(format!("Enum variant not found: {}", s))),
2046 }
2047 }
2048 (Array(vals), SchemaPiece::Array(inner)) => {
2049 let node = self.step(&**inner);
2050 let vals = vals
2051 .iter()
2052 .map(|val| node.json_to_value(val))
2053 .collect::<Result<Vec<_>, ParseSchemaError>>()?;
2054 AvroValue::Array(vals)
2055 }
2056 (Object(map), SchemaPiece::Map(inner)) => {
2057 let node = self.step(&**inner);
2058 let map = map
2059 .iter()
2060 .map(|(k, v)| node.json_to_value(v).map(|v| (k.clone(), v)))
2061 .collect::<Result<BTreeMap<_, _>, ParseSchemaError>>()?;
2062 AvroValue::Map(map)
2063 }
2064 (String(s), SchemaPiece::Fixed { size }) if s.len() == *size => {
2065 AvroValue::Fixed(*size, s.clone().into_bytes())
2066 }
2067 _ => {
2068 return Err(ParseSchemaError(format!(
2069 "Json default value {} does not match schema",
2070 json
2071 )));
2072 }
2073 };
2074 Ok(val)
2075 }
2076}
2077
2078#[derive(Clone)]
2079struct SchemaSerContext<'a> {
2080 node: SchemaNodeOrNamed<'a>,
2081 seen_named: Rc<RefCell<BTreeMap<usize, FullName>>>,
2086 enclosing_ns: &'a str,
2088}
2089
2090#[derive(Clone)]
2091struct RecordFieldSerContext<'a> {
2092 outer: &'a SchemaSerContext<'a>,
2093 inner: &'a RecordField,
2094}
2095
2096impl<'a> SchemaSerContext<'a> {
2097 fn step(&'a self, next: SchemaPieceRefOrNamed<'a>) -> Self {
2098 let ns = self.node.namespace().unwrap_or(self.enclosing_ns);
2099 Self {
2100 node: self.node.step_ref(next),
2101 seen_named: Rc::clone(&self.seen_named),
2102 enclosing_ns: ns,
2103 }
2104 }
2105}
2106
2107impl<'a> Serialize for SchemaSerContext<'a> {
2108 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2109 where
2110 S: Serializer,
2111 {
2112 match self.node.inner {
2113 SchemaPieceRefOrNamed::Piece(piece) => match piece {
2114 SchemaPiece::Null => serializer.serialize_str("null"),
2115 SchemaPiece::Boolean => serializer.serialize_str("boolean"),
2116 SchemaPiece::Int => serializer.serialize_str("int"),
2117 SchemaPiece::Long => serializer.serialize_str("long"),
2118 SchemaPiece::Float => serializer.serialize_str("float"),
2119 SchemaPiece::Double => serializer.serialize_str("double"),
2120 SchemaPiece::Date => {
2121 let mut map = serializer.serialize_map(Some(2))?;
2122 map.serialize_entry("type", "int")?;
2123 map.serialize_entry("logicalType", "date")?;
2124 map.end()
2125 }
2126 SchemaPiece::TimestampMilli | SchemaPiece::TimestampMicro => {
2127 let mut map = serializer.serialize_map(Some(2))?;
2128 map.serialize_entry("type", "long")?;
2129 if piece == &SchemaPiece::TimestampMilli {
2130 map.serialize_entry("logicalType", "timestamp-millis")?;
2131 } else {
2132 map.serialize_entry("logicalType", "timestamp-micros")?;
2133 }
2134 map.end()
2135 }
2136 SchemaPiece::Decimal {
2137 precision,
2138 scale,
2139 fixed_size: None,
2140 } => {
2141 let mut map = serializer.serialize_map(Some(4))?;
2142 map.serialize_entry("type", "bytes")?;
2143 map.serialize_entry("precision", precision)?;
2144 map.serialize_entry("scale", scale)?;
2145 map.serialize_entry("logicalType", "decimal")?;
2146 map.end()
2147 }
2148 SchemaPiece::Bytes => serializer.serialize_str("bytes"),
2149 SchemaPiece::String => serializer.serialize_str("string"),
2150 SchemaPiece::Array(inner) => {
2151 let mut map = serializer.serialize_map(Some(2))?;
2152 map.serialize_entry("type", "array")?;
2153 map.serialize_entry("items", &self.step(inner.as_ref().as_ref()))?;
2154 map.end()
2155 }
2156 SchemaPiece::Map(inner) => {
2157 let mut map = serializer.serialize_map(Some(2))?;
2158 map.serialize_entry("type", "map")?;
2159 map.serialize_entry("values", &self.step(inner.as_ref().as_ref()))?;
2160 map.end()
2161 }
2162 SchemaPiece::Union(inner) => {
2163 let variants = inner.variants();
2164 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2165 for v in variants {
2166 seq.serialize_element(&self.step(v.as_ref()))?;
2167 }
2168 seq.end()
2169 }
2170 SchemaPiece::Json => {
2171 let mut map = serializer.serialize_map(Some(2))?;
2172 map.serialize_entry("type", "string")?;
2173 map.serialize_entry("connect.name", "io.debezium.data.Json")?;
2174 map.end()
2175 }
2176 SchemaPiece::Uuid => {
2177 let mut map = serializer.serialize_map(Some(4))?;
2178 map.serialize_entry("type", "string")?;
2179 map.serialize_entry("logicalType", "uuid")?;
2180 map.end()
2181 }
2182 SchemaPiece::Record { .. }
2183 | SchemaPiece::Decimal {
2184 fixed_size: Some(_),
2185 ..
2186 }
2187 | SchemaPiece::Enum { .. }
2188 | SchemaPiece::Fixed { .. } => {
2189 unreachable!("Unexpected named schema piece in anonymous schema position")
2190 }
2191 SchemaPiece::ResolveIntLong
2192 | SchemaPiece::ResolveDateTimestamp
2193 | SchemaPiece::ResolveIntFloat
2194 | SchemaPiece::ResolveIntDouble
2195 | SchemaPiece::ResolveLongFloat
2196 | SchemaPiece::ResolveLongDouble
2197 | SchemaPiece::ResolveFloatDouble
2198 | SchemaPiece::ResolveConcreteUnion { .. }
2199 | SchemaPiece::ResolveUnionUnion { .. }
2200 | SchemaPiece::ResolveUnionConcrete { .. }
2201 | SchemaPiece::ResolveRecord { .. }
2202 | SchemaPiece::ResolveIntTsMicro
2203 | SchemaPiece::ResolveIntTsMilli
2204 | SchemaPiece::ResolveEnum { .. } => {
2205 panic!("Attempted to serialize resolved schema")
2206 }
2207 },
2208 SchemaPieceRefOrNamed::Named(index) => {
2209 let mut map = self.seen_named.borrow_mut();
2210 let named_piece = match map.get(&index) {
2211 Some(name) => {
2212 return serializer.serialize_str(&*name.short_name(self.enclosing_ns));
2213 }
2214 None => self.node.root.lookup(index),
2215 };
2216 let name = &named_piece.name;
2217 map.insert(index, name.clone());
2218 std::mem::drop(map);
2219 match &named_piece.piece {
2220 SchemaPiece::Record { doc, fields, .. } => {
2221 let mut map = serializer.serialize_map(None)?;
2222 map.serialize_entry("type", "record")?;
2223 map.serialize_entry("name", &name.name)?;
2224 if self.enclosing_ns != &name.namespace {
2225 map.serialize_entry("namespace", &name.namespace)?;
2226 }
2227 if let Some(docstr) = doc {
2228 map.serialize_entry("doc", docstr)?;
2229 }
2230 map.serialize_entry(
2232 "fields",
2233 &fields
2234 .iter()
2235 .map(|f| RecordFieldSerContext {
2236 outer: self,
2237 inner: f,
2238 })
2239 .collect::<Vec<_>>(),
2240 )?;
2241 map.end()
2242 }
2243 SchemaPiece::Enum {
2244 symbols,
2245 default_idx,
2246 ..
2247 } => {
2248 let mut map = serializer.serialize_map(None)?;
2249 map.serialize_entry("type", "enum")?;
2250 map.serialize_entry("name", &name.name)?;
2251 if self.enclosing_ns != &name.namespace {
2252 map.serialize_entry("namespace", &name.namespace)?;
2253 }
2254 map.serialize_entry("symbols", symbols)?;
2255 if let Some(default_idx) = *default_idx {
2256 assert!(default_idx < symbols.len());
2257 map.serialize_entry("default", &symbols[default_idx])?;
2258 }
2259 map.end()
2260 }
2261 SchemaPiece::Fixed { size } => {
2262 let mut map = serializer.serialize_map(None)?;
2263 map.serialize_entry("type", "fixed")?;
2264 map.serialize_entry("name", &name.name)?;
2265 if self.enclosing_ns != &name.namespace {
2266 map.serialize_entry("namespace", &name.namespace)?;
2267 }
2268 map.serialize_entry("size", size)?;
2269 map.end()
2270 }
2271 SchemaPiece::Decimal {
2272 scale,
2273 precision,
2274 fixed_size: Some(size),
2275 } => {
2276 let mut map = serializer.serialize_map(Some(6))?;
2277 map.serialize_entry("type", "fixed")?;
2278 map.serialize_entry("logicalType", "decimal")?;
2279 map.serialize_entry("name", &name.name)?;
2280 if self.enclosing_ns != &name.namespace {
2281 map.serialize_entry("namespace", &name.namespace)?;
2282 }
2283 map.serialize_entry("size", size)?;
2284 map.serialize_entry("precision", precision)?;
2285 map.serialize_entry("scale", scale)?;
2286 map.end()
2287 }
2288 SchemaPiece::Null
2289 | SchemaPiece::Boolean
2290 | SchemaPiece::Int
2291 | SchemaPiece::Long
2292 | SchemaPiece::Float
2293 | SchemaPiece::Double
2294 | SchemaPiece::Date
2295 | SchemaPiece::TimestampMilli
2296 | SchemaPiece::TimestampMicro
2297 | SchemaPiece::Decimal {
2298 fixed_size: None, ..
2299 }
2300 | SchemaPiece::Bytes
2301 | SchemaPiece::String
2302 | SchemaPiece::Array(_)
2303 | SchemaPiece::Map(_)
2304 | SchemaPiece::Union(_)
2305 | SchemaPiece::Uuid
2306 | SchemaPiece::Json => {
2307 unreachable!("Unexpected anonymous schema piece in named schema position")
2308 }
2309 SchemaPiece::ResolveIntLong
2310 | SchemaPiece::ResolveDateTimestamp
2311 | SchemaPiece::ResolveIntFloat
2312 | SchemaPiece::ResolveIntDouble
2313 | SchemaPiece::ResolveLongFloat
2314 | SchemaPiece::ResolveLongDouble
2315 | SchemaPiece::ResolveFloatDouble
2316 | SchemaPiece::ResolveConcreteUnion { .. }
2317 | SchemaPiece::ResolveUnionUnion { .. }
2318 | SchemaPiece::ResolveUnionConcrete { .. }
2319 | SchemaPiece::ResolveRecord { .. }
2320 | SchemaPiece::ResolveIntTsMilli
2321 | SchemaPiece::ResolveIntTsMicro
2322 | SchemaPiece::ResolveEnum { .. } => {
2323 panic!("Attempted to serialize resolved schema")
2324 }
2325 }
2326 }
2327 }
2328 }
2329}
2330
2331impl Serialize for Schema {
2332 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2333 where
2334 S: Serializer,
2335 {
2336 let ctx = SchemaSerContext {
2337 node: SchemaNodeOrNamed {
2338 root: self,
2339 inner: self.top.as_ref(),
2340 },
2341 seen_named: Rc::new(RefCell::new(Default::default())),
2342 enclosing_ns: "",
2343 };
2344 ctx.serialize(serializer)
2345 }
2346}
2347
2348impl<'a> Serialize for RecordFieldSerContext<'a> {
2349 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2350 where
2351 S: Serializer,
2352 {
2353 let mut map = serializer.serialize_map(None)?;
2354 map.serialize_entry("name", &self.inner.name)?;
2355 map.serialize_entry("type", &self.outer.step(self.inner.schema.as_ref()))?;
2356 if let Some(default) = &self.inner.default {
2357 map.serialize_entry("default", default)?;
2358 }
2359 if let Some(doc) = &self.inner.doc {
2360 map.serialize_entry("doc", doc)?;
2361 }
2362 map.end()
2363 }
2364}
2365
2366fn parsing_canonical_form(schema: &serde_json::Value) -> String {
2369 pcf(schema, "", false)
2370}
2371
2372fn pcf(schema: &serde_json::Value, enclosing_ns: &str, in_fields: bool) -> String {
2373 match schema {
2374 serde_json::Value::Object(map) => pcf_map(map, enclosing_ns, in_fields),
2375 serde_json::Value::String(s) => pcf_string(s),
2376 serde_json::Value::Array(v) => pcf_array(v, enclosing_ns, in_fields),
2377 serde_json::Value::Number(n) => n.to_string(),
2378 _ => unreachable!("{:?} cannot yet be printed in canonical form", schema),
2379 }
2380}
2381
2382fn pcf_map(schema: &Map<String, serde_json::Value>, enclosing_ns: &str, in_fields: bool) -> String {
2383 let default_ns = schema
2385 .get("namespace")
2386 .and_then(|v| v.as_str())
2387 .unwrap_or(enclosing_ns);
2388 let mut fields = Vec::new();
2389 let mut found_next_ns = None;
2390 let mut deferred_values = vec![];
2391 for (k, v) in schema {
2392 if schema.len() == 1 && k == "type" {
2394 if let serde_json::Value::String(s) = v {
2396 return pcf_string(s);
2397 }
2398 }
2399
2400 if field_ordering_position(k).is_none() {
2402 continue;
2403 }
2404
2405 if k == "name" {
2407 if in_fields {
2410 fields.push((
2411 k,
2412 format!("{}:{}", pcf_string(k), pcf_string(v.as_str().unwrap())),
2413 ));
2414 continue;
2415 }
2416 let name = v.as_str().unwrap();
2418 assert!(
2419 found_next_ns.is_none(),
2420 "`name` must not be specified multiple times"
2421 );
2422 let next_ns = match name.rsplit_once('.') {
2423 None => default_ns,
2424 Some((ns, _name)) => ns,
2425 };
2426 found_next_ns = Some(next_ns);
2427 let n = if next_ns.is_empty() {
2428 Cow::Borrowed(name)
2429 } else {
2430 Cow::Owned(format!("{next_ns}.{name}"))
2431 };
2432 fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&*n))));
2433 continue;
2434 }
2435
2436 if k == "size" {
2438 let i = match v.as_str() {
2439 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2440 None => v.as_i64().unwrap(),
2441 };
2442 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2443 continue;
2444 }
2445
2446 deferred_values.push((k, v));
2449 }
2450
2451 let next_ns = found_next_ns.unwrap_or(default_ns);
2452 for (k, v) in deferred_values {
2453 fields.push((
2454 k,
2455 format!("{}:{}", pcf_string(k), pcf(v, next_ns, &*k == "fields")),
2456 ));
2457 }
2458
2459 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2461 let inter = fields
2462 .into_iter()
2463 .map(|(_, v)| v)
2464 .collect::<Vec<_>>()
2465 .join(",");
2466 format!("{{{}}}", inter)
2467}
2468
2469fn pcf_array(arr: &[serde_json::Value], enclosing_ns: &str, in_fields: bool) -> String {
2470 let inter = arr
2471 .iter()
2472 .map(|s| pcf(s, enclosing_ns, in_fields))
2473 .collect::<Vec<String>>()
2474 .join(",");
2475 format!("[{}]", inter)
2476}
2477
2478fn pcf_string(s: &str) -> String {
2479 format!("\"{}\"", s)
2480}
2481
2482fn field_ordering_position(field: &str) -> Option<usize> {
2484 let v = match field {
2485 "name" => 1,
2486 "type" => 2,
2487 "fields" => 3,
2488 "symbols" => 4,
2489 "items" => 5,
2490 "values" => 6,
2491 "size" => 7,
2492 _ => return None,
2493 };
2494
2495 Some(v)
2496}
2497
2498#[cfg(test)]
2499mod tests {
2500 use mz_ore::{assert_err, assert_ok};
2501
2502 use crate::types::{Record, ToAvro};
2503
2504 use super::*;
2505
2506 fn check_schema(schema: &str, expected: SchemaPiece) {
2507 let schema = Schema::from_str(schema).unwrap();
2508 assert_eq!(&expected, schema.top_node().inner);
2509
2510 let schema = serde_json::to_string(&schema).unwrap();
2512 let schema = Schema::from_str(&schema).unwrap();
2513 assert_eq!(&expected, schema.top_node().inner);
2514 }
2515
2516 #[mz_ore::test]
2517 fn test_primitive_schema() {
2518 check_schema("\"null\"", SchemaPiece::Null);
2519 check_schema("\"int\"", SchemaPiece::Int);
2520 check_schema("\"double\"", SchemaPiece::Double);
2521 }
2522
2523 #[mz_ore::test]
2524 fn test_array_schema() {
2525 check_schema(
2526 r#"{"type": "array", "items": "string"}"#,
2527 SchemaPiece::Array(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::String))),
2528 );
2529 }
2530
2531 #[mz_ore::test]
2532 fn test_map_schema() {
2533 check_schema(
2534 r#"{"type": "map", "values": "double"}"#,
2535 SchemaPiece::Map(Box::new(SchemaPieceOrNamed::Piece(SchemaPiece::Double))),
2536 );
2537 }
2538
2539 #[mz_ore::test]
2540 fn test_union_schema() {
2541 check_schema(
2542 r#"["null", "int"]"#,
2543 SchemaPiece::Union(
2544 UnionSchema::new(vec![
2545 SchemaPieceOrNamed::Piece(SchemaPiece::Null),
2546 SchemaPieceOrNamed::Piece(SchemaPiece::Int),
2547 ])
2548 .unwrap(),
2549 ),
2550 );
2551 }
2552
2553 #[mz_ore::test]
2554 fn test_multi_union_schema() {
2555 let schema = Schema::from_str(r#"["null", "int", "float", "string", "bytes"]"#);
2556 assert_ok!(schema);
2557 let schema = schema.unwrap();
2558 let node = schema.top_node();
2559 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2560 let union_schema = match node.inner {
2561 SchemaPiece::Union(u) => u,
2562 _ => unreachable!(),
2563 };
2564 assert_eq!(union_schema.variants().len(), 5);
2565 let mut variants = union_schema.variants().iter();
2566 assert_eq!(
2567 SchemaKind::from(node.step(variants.next().unwrap())),
2568 SchemaKind::Null
2569 );
2570 assert_eq!(
2571 SchemaKind::from(node.step(variants.next().unwrap())),
2572 SchemaKind::Int
2573 );
2574 assert_eq!(
2575 SchemaKind::from(node.step(variants.next().unwrap())),
2576 SchemaKind::Float
2577 );
2578 assert_eq!(
2579 SchemaKind::from(node.step(variants.next().unwrap())),
2580 SchemaKind::String
2581 );
2582 assert_eq!(
2583 SchemaKind::from(node.step(variants.next().unwrap())),
2584 SchemaKind::Bytes
2585 );
2586 assert_eq!(variants.next(), None);
2587 }
2588
2589 #[mz_ore::test]
2590 fn test_record_schema() {
2591 let schema = r#"
2592 {
2593 "type": "record",
2594 "name": "test",
2595 "doc": "record doc",
2596 "fields": [
2597 {"name": "a", "doc": "a doc", "type": "long", "default": 42},
2598 {"name": "b", "doc": "b doc", "type": "string"}
2599 ]
2600 }
2601 "#;
2602
2603 let mut lookup = BTreeMap::new();
2604 lookup.insert("a".to_owned(), 0);
2605 lookup.insert("b".to_owned(), 1);
2606
2607 let expected = SchemaPiece::Record {
2608 doc: Some("record doc".to_string()),
2609 fields: vec![
2610 RecordField {
2611 name: "a".to_string(),
2612 doc: Some("a doc".to_string()),
2613 default: Some(Value::Number(42i64.into())),
2614 schema: SchemaPiece::Long.into(),
2615 order: RecordFieldOrder::Ascending,
2616 position: 0,
2617 },
2618 RecordField {
2619 name: "b".to_string(),
2620 doc: Some("b doc".to_string()),
2621 default: None,
2622 schema: SchemaPiece::String.into(),
2623 order: RecordFieldOrder::Ascending,
2624 position: 1,
2625 },
2626 ],
2627 lookup,
2628 };
2629
2630 check_schema(schema, expected);
2631 }
2632
2633 #[mz_ore::test]
2634 fn test_enum_schema() {
2635 let schema = r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "jokers"}"#;
2636
2637 let expected = SchemaPiece::Enum {
2638 doc: None,
2639 symbols: vec![
2640 "diamonds".to_owned(),
2641 "spades".to_owned(),
2642 "jokers".to_owned(),
2643 "clubs".to_owned(),
2644 "hearts".to_owned(),
2645 ],
2646 default_idx: Some(2),
2647 };
2648
2649 check_schema(schema, expected);
2650
2651 let bad_schema = Schema::from_str(
2652 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "jokers", "clubs", "hearts"], "default": "blah"}"#,
2653 );
2654
2655 assert_err!(bad_schema);
2656 }
2657
2658 #[mz_ore::test]
2659 fn test_fixed_schema() {
2660 let schema = r#"{"type": "fixed", "name": "test", "size": 16}"#;
2661
2662 let expected = SchemaPiece::Fixed { size: 16usize };
2663
2664 check_schema(schema, expected);
2665 }
2666
2667 #[mz_ore::test]
2668 fn test_date_schema() {
2669 let kinds = &[
2670 r#"{
2671 "type": "int",
2672 "name": "datish",
2673 "logicalType": "date"
2674 }"#,
2675 r#"{
2676 "type": "int",
2677 "name": "datish",
2678 "connect.name": "io.debezium.time.Date"
2679 }"#,
2680 r#"{
2681 "type": "int",
2682 "name": "datish",
2683 "connect.name": "org.apache.kafka.connect.data.Date"
2684 }"#,
2685 ];
2686 for kind in kinds {
2687 check_schema(*kind, SchemaPiece::Date);
2688
2689 let schema = Schema::from_str(*kind).unwrap();
2690 assert_eq!(
2691 serde_json::to_string(&schema).unwrap(),
2692 r#"{"type":"int","logicalType":"date"}"#
2693 );
2694 }
2695 }
2696
2697 #[mz_ore::test]
2698 fn new_field_in_middle() {
2699 let reader = r#"{
2700 "type": "record",
2701 "name": "MyRecord",
2702 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2703 }"#;
2704 let writer = r#"{
2705 "type": "record",
2706 "name": "MyRecord",
2707 "fields": [{"name": "f1", "type": "int"}, {"name": "f_interposed", "type": "int"}, {"name": "f2", "type": "int"}]
2708 }"#;
2709 let reader = Schema::from_str(reader).unwrap();
2710 let writer = Schema::from_str(writer).unwrap();
2711
2712 let mut record = Record::new(writer.top_node()).unwrap();
2713 record.put("f1", 1);
2714 record.put("f2", 2);
2715 record.put("f_interposed", 42);
2716
2717 let value = record.avro();
2718
2719 let mut buf = vec![];
2720 crate::encode::encode(&value, &writer, &mut buf);
2721
2722 let resolved = resolve_schemas(&writer, &reader).unwrap();
2723
2724 let reader = &mut &buf[..];
2725 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2726 let expected = crate::types::Value::Record(vec![
2727 ("f1".to_string(), crate::types::Value::Int(1)),
2728 ("f2".to_string(), crate::types::Value::Int(2)),
2729 ]);
2730 assert_eq!(reader_value, expected);
2731 assert!(reader.is_empty()); }
2733
2734 #[mz_ore::test]
2735 fn new_field_at_end() {
2736 let reader = r#"{
2737 "type": "record",
2738 "name": "MyRecord",
2739 "fields": [{"name": "f1", "type": "int"}]
2740 }"#;
2741 let writer = r#"{
2742 "type": "record",
2743 "name": "MyRecord",
2744 "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}]
2745 }"#;
2746 let reader = Schema::from_str(reader).unwrap();
2747 let writer = Schema::from_str(writer).unwrap();
2748
2749 let mut record = Record::new(writer.top_node()).unwrap();
2750 record.put("f1", 1);
2751 record.put("f2", 2);
2752
2753 let value = record.avro();
2754
2755 let mut buf = vec![];
2756 crate::encode::encode(&value, &writer, &mut buf);
2757
2758 let resolved = resolve_schemas(&writer, &reader).unwrap();
2759
2760 let reader = &mut &buf[..];
2761 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2762 let expected =
2763 crate::types::Value::Record(vec![("f1".to_string(), crate::types::Value::Int(1))]);
2764 assert_eq!(reader_value, expected);
2765 assert!(reader.is_empty()); }
2767
2768 #[mz_ore::test]
2775 fn test_union_promotion_agrees_with_resolution() {
2776 fn resolves(writer: &str, reader: &str) -> bool {
2779 let writer = Schema::from_str(&format!("\"{writer}\"")).unwrap();
2780 let reader = Schema::from_str(&format!("\"{reader}\"")).unwrap();
2781 resolve_schemas(&writer, &reader).is_ok()
2782 }
2783 fn promotes(writer: &str, reader: &str) -> bool {
2784 can_promote(
2785 &Schema::parse_primitive(writer).unwrap(),
2786 &Schema::parse_primitive(reader).unwrap(),
2787 )
2788 }
2789
2790 let primitives = [
2793 "null", "boolean", "int", "long", "float", "double", "bytes", "string",
2794 ];
2795 for w in primitives {
2796 for r in primitives {
2797 if promotes(w, r) {
2798 assert!(
2799 resolves(w, r),
2800 "{w} -> {r} is accepted for matching but cannot be resolved",
2801 );
2802 }
2803 }
2804 }
2805
2806 let numeric = ["int", "long", "float", "double"];
2810 for w in numeric {
2811 for r in numeric {
2812 if w != r {
2813 assert_eq!(
2814 promotes(w, r),
2815 resolves(w, r),
2816 "matching and resolution disagree on {w} -> {r}",
2817 );
2818 }
2819 }
2820 }
2821 }
2822
2823 #[mz_ore::test]
2824 fn default_non_nums() {
2825 let reader = r#"{
2826 "type": "record",
2827 "name": "MyRecord",
2828 "fields": [
2829 {"name": "f1", "type": "double", "default": "NaN"},
2830 {"name": "f2", "type": "double", "default": "Infinity"},
2831 {"name": "f3", "type": "double", "default": "-Infinity"}
2832 ]
2833 }
2834 "#;
2835 let writer = r#"{"type": "record", "name": "MyRecord", "fields": []}"#;
2836
2837 let writer_schema = Schema::from_str(writer).unwrap();
2838 let reader_schema = Schema::from_str(reader).unwrap();
2839 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2840
2841 let record = Record::new(writer_schema.top_node()).unwrap();
2842
2843 let value = record.avro();
2844 let mut buf = vec![];
2845 crate::encode::encode(&value, &writer_schema, &mut buf);
2846
2847 let reader = &mut &buf[..];
2848 let reader_value = crate::decode::decode(resolved.top_node(), reader).unwrap();
2849 let expected = crate::types::Value::Record(vec![
2850 ("f1".to_string(), crate::types::Value::Double(f64::NAN)),
2851 ("f2".to_string(), crate::types::Value::Double(f64::INFINITY)),
2852 (
2853 "f3".to_string(),
2854 crate::types::Value::Double(f64::NEG_INFINITY),
2855 ),
2856 ]);
2857
2858 #[derive(Debug)]
2859 struct NanEq(crate::types::Value);
2860 impl std::cmp::PartialEq for NanEq {
2861 fn eq(&self, other: &Self) -> bool {
2862 match (self, other) {
2863 (
2864 NanEq(crate::types::Value::Double(x)),
2865 NanEq(crate::types::Value::Double(y)),
2866 ) if x.is_nan() && y.is_nan() => true,
2867 (
2868 NanEq(crate::types::Value::Float(x)),
2869 NanEq(crate::types::Value::Float(y)),
2870 ) if x.is_nan() && y.is_nan() => true,
2871 (
2872 NanEq(crate::types::Value::Record(xs)),
2873 NanEq(crate::types::Value::Record(ys)),
2874 ) => {
2875 let xs = xs
2876 .iter()
2877 .cloned()
2878 .map(|(k, v)| (k, NanEq(v)))
2879 .collect::<Vec<_>>();
2880 let ys = ys
2881 .iter()
2882 .cloned()
2883 .map(|(k, v)| (k, NanEq(v)))
2884 .collect::<Vec<_>>();
2885
2886 xs == ys
2887 }
2888 (NanEq(x), NanEq(y)) => x == y,
2889 }
2890 }
2891 }
2892
2893 assert_eq!(NanEq(reader_value), NanEq(expected));
2894 assert!(reader.is_empty());
2895 }
2896
2897 #[mz_ore::test]
2898 fn test_decimal_schemas() {
2899 let schema = r#"{
2900 "type": "fixed",
2901 "name": "dec",
2902 "size": 8,
2903 "logicalType": "decimal",
2904 "precision": 12,
2905 "scale": 5
2906 }"#;
2907 let expected = SchemaPiece::Decimal {
2908 precision: 12,
2909 scale: 5,
2910 fixed_size: Some(8),
2911 };
2912 check_schema(schema, expected);
2913
2914 let schema = r#"{
2915 "type": "bytes",
2916 "logicalType": "decimal",
2917 "precision": 12,
2918 "scale": 5
2919 }"#;
2920 let expected = SchemaPiece::Decimal {
2921 precision: 12,
2922 scale: 5,
2923 fixed_size: None,
2924 };
2925 check_schema(schema, expected);
2926
2927 let res = Schema::from_str(
2928 r#"["bytes", {
2929 "type": "bytes",
2930 "logicalType": "decimal",
2931 "precision": 12,
2932 "scale": 5
2933 }]"#,
2934 );
2935 assert_eq!(
2936 res.unwrap_err().to_string(),
2937 "Schema parse error: Unions cannot contain duplicate types"
2938 );
2939
2940 let writer_schema = Schema::from_str(
2941 r#"["null", {
2942 "type": "bytes"
2943 }]"#,
2944 )
2945 .unwrap();
2946 let reader_schema = Schema::from_str(
2947 r#"["null", {
2948 "type": "bytes",
2949 "logicalType": "decimal",
2950 "precision": 12,
2951 "scale": 5
2952 }]"#,
2953 )
2954 .unwrap();
2955 let resolved = resolve_schemas(&writer_schema, &reader_schema).unwrap();
2956
2957 let expected = SchemaPiece::ResolveUnionUnion {
2958 permutation: vec![
2959 Ok((0, SchemaPieceOrNamed::Piece(SchemaPiece::Null))),
2960 Ok((
2961 1,
2962 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
2963 precision: 12,
2964 scale: 5,
2965 fixed_size: None,
2966 }),
2967 )),
2968 ],
2969 n_reader_variants: 2,
2970 reader_null_variant: Some(0),
2971 };
2972 assert_eq!(resolved.top_node().inner, &expected);
2973 }
2974
2975 #[mz_ore::test]
2976 fn test_no_documentation() {
2977 let schema =
2978 Schema::from_str(r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#)
2979 .unwrap();
2980
2981 let doc = match schema.top_node().inner {
2982 SchemaPiece::Enum { doc, .. } => doc.clone(),
2983 _ => panic!(),
2984 };
2985
2986 assert_none!(doc);
2987 }
2988
2989 #[mz_ore::test]
2990 fn test_documentation() {
2991 let schema = Schema::from_str(
2992 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#
2993 ).unwrap();
2994
2995 let doc = match schema.top_node().inner {
2996 SchemaPiece::Enum { doc, .. } => doc.clone(),
2997 _ => None,
2998 };
2999
3000 assert_eq!("Some documentation".to_owned(), doc.unwrap());
3001 }
3002
3003 #[mz_ore::test]
3004 fn test_namespaces_and_names() {
3005 let schema = Schema::from_str(
3007 r#"{"type": "fixed", "namespace": "namespace", "name": "name", "size": 1}"#,
3008 )
3009 .unwrap();
3010 assert_eq!(schema.named.len(), 1);
3011 assert_eq!(
3012 schema.named[0].name,
3013 FullName {
3014 name: "name".into(),
3015 namespace: "namespace".into()
3016 }
3017 );
3018
3019 let schema =
3021 Schema::from_str(r#"{"type": "enum", "name": "name.has.dots", "symbols": ["A", "B"]}"#)
3022 .unwrap();
3023 assert_eq!(schema.named.len(), 1);
3024 assert_eq!(
3025 schema.named[0].name,
3026 FullName {
3027 name: "dots".into(),
3028 namespace: "name.has".into()
3029 }
3030 );
3031
3032 let schema = Schema::from_str(
3034 r#"{"type": "enum", "namespace": "namespace",
3035 "name": "name.has.dots", "symbols": ["A", "B"]}"#,
3036 )
3037 .unwrap();
3038 assert_eq!(schema.named.len(), 1);
3039 assert_eq!(
3040 schema.named[0].name,
3041 FullName {
3042 name: "dots".into(),
3043 namespace: "name.has".into()
3044 }
3045 );
3046
3047 let schema = Schema::from_str(
3050 r#"{"type": "record", "name": "TestDoc", "doc": "Doc string",
3051 "fields": [{"name": "name", "type": "string"}]}"#,
3052 )
3053 .unwrap();
3054 assert_eq!(schema.named.len(), 1);
3055 assert_eq!(
3056 schema.named[0].name,
3057 FullName {
3058 name: "TestDoc".into(),
3059 namespace: "".into()
3060 }
3061 );
3062
3063 let schema = Schema::from_str(
3065 r#"{"type": "record", "namespace": "", "name": "TestDoc", "doc": "Doc string",
3066 "fields": [{"name": "name", "type": "string"}]}"#,
3067 )
3068 .unwrap();
3069 assert_eq!(schema.named.len(), 1);
3070 assert_eq!(
3071 schema.named[0].name,
3072 FullName {
3073 name: "TestDoc".into(),
3074 namespace: "".into()
3075 }
3076 );
3077
3078 let first = Schema::from_str(
3080 r#"{"type": "fixed", "namespace": "namespace",
3081 "name": "name", "size": 1}"#,
3082 )
3083 .unwrap();
3084 let second = Schema::from_str(
3085 r#"{"type": "fixed", "name": "namespace.name",
3086 "size": 1}"#,
3087 )
3088 .unwrap();
3089 assert_eq!(first.named[0].name, second.named[0].name);
3090
3091 let first = Schema::from_str(
3092 r#"{"type": "fixed", "namespace": "namespace",
3093 "name": "name", "size": 1}"#,
3094 )
3095 .unwrap();
3096 let second = Schema::from_str(
3097 r#"{"type": "fixed", "name": "namespace.Name",
3098 "size": 1}"#,
3099 )
3100 .unwrap();
3101 assert_ne!(first.named[0].name, second.named[0].name);
3102
3103 let first = Schema::from_str(
3104 r#"{"type": "fixed", "namespace": "Namespace",
3105 "name": "name", "size": 1}"#,
3106 )
3107 .unwrap();
3108 let second = Schema::from_str(
3109 r#"{"type": "fixed", "namespace": "namespace",
3110 "name": "name", "size": 1}"#,
3111 )
3112 .unwrap();
3113 assert_ne!(first.named[0].name, second.named[0].name);
3114
3115 assert!(
3118 Schema::from_str(
3119 r#"{"type": "record", "name": "99 problems but a name aint one",
3120 "fields": [{"name": "name", "type": "string"}]}"#
3121 )
3122 .is_err()
3123 );
3124
3125 assert!(
3126 Schema::from_str(
3127 r#"{"type": "record", "name": "!!!",
3128 "fields": [{"name": "name", "type": "string"}]}"#
3129 )
3130 .is_err()
3131 );
3132
3133 assert!(
3134 Schema::from_str(
3135 r#"{"type": "record", "name": "_valid_until_©",
3136 "fields": [{"name": "name", "type": "string"}]}"#
3137 )
3138 .is_err()
3139 );
3140
3141 let schema = Schema::from_str(r#"{"type": "record", "name": "org.apache.avro.tests.Hello", "fields": [
3143 {"name": "f1", "type": {"type": "enum", "name": "MyEnum", "symbols": ["Foo", "Bar", "Baz"]}},
3144 {"name": "f2", "type": "org.apache.avro.tests.MyEnum"},
3145 {"name": "f3", "type": "MyEnum"},
3146 {"name": "f4", "type": {"type": "enum", "name": "other.namespace.OtherEnum", "symbols": ["one", "two", "three"]}},
3147 {"name": "f5", "type": "other.namespace.OtherEnum"},
3148 {"name": "f6", "type": {"type": "enum", "name": "ThirdEnum", "namespace": "some.other", "symbols": ["Alice", "Bob"]}},
3149 {"name": "f7", "type": "some.other.ThirdEnum"}
3150 ]}"#).unwrap();
3151 assert_eq!(schema.named.len(), 4);
3152
3153 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3154 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[2].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[3].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[4].schema, SchemaPieceOrNamed::Named(2)); assert_eq!(fields[5].schema, SchemaPieceOrNamed::Named(3)); assert_eq!(fields[6].schema, SchemaPieceOrNamed::Named(3)); } else {
3162 panic!("Expected SchemaPiece::Record, found something else");
3163 }
3164
3165 let schema = Schema::from_str(
3166 r#"{"type": "record", "name": "x.Y", "fields": [
3167 {"name": "e", "type":
3168 {"type": "record", "name": "Z", "fields": [
3169 {"name": "f", "type": "x.Y"},
3170 {"name": "g", "type": "x.Z"}
3171 ]}
3172 }
3173 ]}"#,
3174 )
3175 .unwrap();
3176 assert_eq!(schema.named.len(), 2);
3177
3178 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3179 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); } else {
3181 panic!("Expected SchemaPiece::Record, found something else");
3182 }
3183
3184 if let SchemaPiece::Record { fields, .. } = schema.named[1].clone().piece {
3185 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(0)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(1)); } else {
3188 panic!("Expected SchemaPiece::Record, found something else");
3189 }
3190
3191 let schema = Schema::from_str(
3192 r#"{"type": "record", "name": "R", "fields": [
3193 {"name": "s", "type": {"type": "record", "namespace": "x", "name": "Y", "fields": [
3194 {"name": "e", "type": {"type": "enum", "namespace": "", "name": "Z",
3195 "symbols": ["Foo", "Bar"]}
3196 }
3197 ]}},
3198 {"name": "t", "type": "Z"}
3199 ]}"#,
3200 )
3201 .unwrap();
3202 assert_eq!(schema.named.len(), 3);
3203
3204 if let SchemaPiece::Record { fields, .. } = schema.named[0].clone().piece {
3205 assert_eq!(fields[0].schema, SchemaPieceOrNamed::Named(1)); assert_eq!(fields[1].schema, SchemaPieceOrNamed::Named(2)); } else {
3208 panic!("Expected SchemaPiece::Record, found something else");
3209 }
3210 }
3211
3212 #[mz_ore::test]
3215 fn test_schema_is_send() {
3216 fn send<S: Send>(_s: S) {}
3217
3218 let schema = Schema {
3219 named: vec![],
3220 indices: Default::default(),
3221 top: SchemaPiece::Null.into(),
3222 };
3223 send(schema);
3224 }
3225
3226 #[mz_ore::test]
3227 fn test_schema_is_sync() {
3228 fn sync<S: Sync>(_s: S) {}
3229
3230 let schema = Schema {
3231 named: vec![],
3232 indices: Default::default(),
3233 top: SchemaPiece::Null.into(),
3234 };
3235 sync(&schema);
3236 sync(schema);
3237 }
3238
3239 #[mz_ore::test]
3240 #[cfg_attr(miri, ignore)] fn test_schema_fingerprint() {
3242 let raw_schema = r#"
3243 {
3244 "type": "record",
3245 "name": "test",
3246 "fields": [
3247 {"name": "a", "type": "long", "default": 42},
3248 {"name": "b", "type": "string"}
3249 ]
3250 }
3251 "#;
3252 let expected_canonical = r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}]}"#;
3253 let schema = Schema::from_str(raw_schema).unwrap();
3254 assert_eq!(&schema.canonical_form(), expected_canonical);
3255 let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3256 .as_ref()
3257 .iter()
3258 .map(|b| format!("{b:02x}"))
3259 .collect::<String>();
3260 assert_eq!(
3261 format!("{}", schema.fingerprint(&digest::SHA256)),
3262 expected_fingerprint
3263 );
3264
3265 let raw_schema = r#"
3266{
3267 "type": "record",
3268 "name": "ns.r1",
3269 "namespace": "ignored",
3270 "fields": [
3271 {
3272 "name": "f1",
3273 "type": {
3274 "type": "fixed",
3275 "name": "r2",
3276 "size": 1
3277 }
3278 }
3279 ]
3280}
3281"#;
3282 let expected_canonical = r#"{"name":"ns.r1","type":"record","fields":[{"name":"f1","type":{"name":"ns.r2","type":"fixed","size":1}}]}"#;
3283 let schema = Schema::from_str(raw_schema).unwrap();
3284 assert_eq!(&schema.canonical_form(), expected_canonical);
3285 let expected_fingerprint = digest::digest(&digest::SHA256, expected_canonical.as_bytes())
3286 .as_ref()
3287 .iter()
3288 .map(|b| format!("{b:02x}"))
3289 .collect::<String>();
3290 assert_eq!(
3291 format!("{}", schema.fingerprint(&digest::SHA256)),
3292 expected_fingerprint
3293 );
3294 }
3295
3296 #[mz_ore::test]
3297 fn test_make_valid() {
3298 for (input, expected) in [
3299 ("foo", "foo"),
3300 ("az99", "az99"),
3301 ("99az", "_99az"),
3302 ("is,bad", "is_bad"),
3303 ("@#$%", "____"),
3304 ("i-amMisBehaved!", "i_amMisBehaved_"),
3305 ("", "_"),
3306 ] {
3307 let actual = Name::make_valid(input);
3308 assert_eq!(expected, actual, "Name::make_valid({input})")
3309 }
3310 }
3311
3312 #[mz_ore::test]
3313 fn test_parse_with_simple_reference() {
3314 let ref_schema_json = r#"{
3316 "type": "record",
3317 "name": "User",
3318 "namespace": "com.example",
3319 "fields": [{"name": "id", "type": "int"}]
3320 }"#;
3321
3322 let primary_json = r#"{
3324 "type": "record",
3325 "name": "Event",
3326 "namespace": "com.example",
3327 "fields": [{"name": "user", "type": "com.example.User"}]
3328 }"#;
3329
3330 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3331 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3332
3333 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3334
3335 let user_name = FullName {
3337 name: "User".to_string(),
3338 namespace: "com.example".to_string(),
3339 };
3340 let event_name = FullName {
3341 name: "Event".to_string(),
3342 namespace: "com.example".to_string(),
3343 };
3344
3345 assert!(
3346 schema.indices.contains_key(&user_name),
3347 "User type should be in schema indices"
3348 );
3349 assert!(
3350 schema.indices.contains_key(&event_name),
3351 "Event type should be in schema indices"
3352 );
3353
3354 if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
3356 let event_piece = &schema.named[*event_idx].piece;
3357 if let SchemaPiece::Record { fields, .. } = event_piece {
3358 assert_eq!(fields.len(), 1);
3359 assert_eq!(fields[0].name, "user");
3360 assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
3362 } else {
3363 panic!("Expected Event to be a record");
3364 }
3365 } else {
3366 panic!("Expected top to be Named");
3367 }
3368 }
3369
3370 #[mz_ore::test]
3371 fn test_parse_with_nested_references() {
3372 let schema_a = r#"{
3374 "type": "record",
3375 "name": "Address",
3376 "namespace": "com.example",
3377 "fields": [
3378 {"name": "street", "type": "string"},
3379 {"name": "city", "type": "string"}
3380 ]
3381 }"#;
3382
3383 let schema_b = r#"{
3385 "type": "record",
3386 "name": "User",
3387 "namespace": "com.example",
3388 "fields": [
3389 {"name": "id", "type": "int"},
3390 {"name": "address", "type": "com.example.Address"}
3391 ]
3392 }"#;
3393
3394 let schema_c = r#"{
3396 "type": "record",
3397 "name": "Event",
3398 "namespace": "com.example",
3399 "fields": [
3400 {"name": "user", "type": "com.example.User"},
3401 {"name": "timestamp", "type": "long"}
3402 ]
3403 }"#;
3404
3405 let ref_schema_a = Schema::from_str(schema_a).unwrap();
3407
3408 let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
3410 let ref_schema_b =
3411 Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
3412 .unwrap();
3413
3414 let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
3416 let final_schema =
3417 Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();
3418
3419 for name in ["Address", "User", "Event"] {
3421 let full_name = FullName {
3422 name: name.to_string(),
3423 namespace: "com.example".to_string(),
3424 };
3425 assert!(
3426 final_schema.indices.contains_key(&full_name),
3427 "{} type should be in schema indices",
3428 name
3429 );
3430 }
3431 }
3432
3433 #[mz_ore::test]
3434 fn test_parse_with_multiple_types_in_reference() {
3435 let ref_schema_json = r#"{
3437 "type": "record",
3438 "name": "ContactInfo",
3439 "namespace": "com.example",
3440 "fields": [
3441 {
3442 "name": "address",
3443 "type": {
3444 "type": "record",
3445 "name": "Address",
3446 "fields": [{"name": "street", "type": "string"}]
3447 }
3448 },
3449 {
3450 "name": "phone",
3451 "type": {
3452 "type": "record",
3453 "name": "PhoneNumber",
3454 "fields": [{"name": "number", "type": "string"}]
3455 }
3456 }
3457 ]
3458 }"#;
3459
3460 let primary_json = r#"{
3462 "type": "record",
3463 "name": "User",
3464 "namespace": "com.example",
3465 "fields": [
3466 {"name": "id", "type": "int"},
3467 {"name": "home", "type": "com.example.Address"},
3468 {"name": "mobile", "type": "com.example.PhoneNumber"}
3469 ]
3470 }"#;
3471
3472 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3473 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3474
3475 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3476
3477 for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
3479 let full_name = FullName {
3480 name: name.to_string(),
3481 namespace: "com.example".to_string(),
3482 };
3483 assert!(
3484 schema.indices.contains_key(&full_name),
3485 "{} type should be in schema indices",
3486 name
3487 );
3488 }
3489 }
3490
3491 #[mz_ore::test]
3492 fn test_parse_with_no_references() {
3493 let schema_json = r#"{
3495 "type": "record",
3496 "name": "Simple",
3497 "fields": [{"name": "id", "type": "int"}]
3498 }"#;
3499
3500 let value: Value = serde_json::from_str(schema_json).unwrap();
3501
3502 let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
3503 let schema_normal = Schema::parse(&value).unwrap();
3504
3505 assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
3507 assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
3508 }
3509
3510 #[mz_ore::test]
3511 fn test_parse_with_reference_in_array() {
3512 let ref_schema_json = r#"{
3514 "type": "record",
3515 "name": "Item",
3516 "namespace": "com.example",
3517 "fields": [{"name": "name", "type": "string"}]
3518 }"#;
3519
3520 let primary_json = r#"{
3522 "type": "record",
3523 "name": "Order",
3524 "namespace": "com.example",
3525 "fields": [
3526 {"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
3527 ]
3528 }"#;
3529
3530 let ref_schema = Schema::from_str(ref_schema_json).unwrap();
3531 let primary_value: Value = serde_json::from_str(primary_json).unwrap();
3532
3533 let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();
3534
3535 let item_name = FullName {
3537 name: "Item".to_string(),
3538 namespace: "com.example".to_string(),
3539 };
3540 assert!(schema.indices.contains_key(&item_name));
3541
3542 if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
3544 let order_piece = &schema.named[*order_idx].piece;
3545 if let SchemaPiece::Record { fields, .. } = order_piece {
3546 if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
3547 assert!(
3548 matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
3549 "Array items should be a Named reference to Item"
3550 );
3551 } else {
3552 panic!("Expected items field to be an array");
3553 }
3554 } else {
3555 panic!("Expected Order to be a record");
3556 }
3557 } else {
3558 panic!("Expected top to be Named");
3559 }
3560 }
3561}