1use std::collections::BTreeMap;
27use std::str::{FromStr, from_utf8};
28
29use serde_json::from_slice;
30use sha2::Sha256;
31
32use crate::decode::{AvroRead, decode};
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35 FullName, NamedSchemaPiece, ParseSchemaError, RecordField, ResolvedDefaultValueField,
36 ResolvedRecordField, Schema, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
37 SchemaPieceRefOrNamed, resolve_schemas,
38};
39use crate::types::Value;
40use crate::{Codec, SchemaResolutionError, util};
41
42#[derive(Debug, Clone)]
43pub(crate) struct Header {
44 writer_schema: Schema,
45 marker: [u8; 16],
46 codec: Codec,
47}
48
49impl Header {
50 pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
51 let meta_schema = Schema {
52 named: vec![],
53 indices: Default::default(),
54 top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
55 };
56
57 let mut buf = [0u8; 4];
58 reader.read_exact(&mut buf)?;
59
60 if buf != [b'O', b'b', b'j', 1u8] {
61 return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
62 }
63
64 if let Value::Map(meta) = decode(meta_schema.top_node(), reader)? {
65 let json = meta
67 .get("avro.schema")
68 .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
69 .and_then(|bytes| {
70 if let Value::Bytes(ref bytes) = *bytes {
71 from_slice(bytes.as_ref()).map_err(|e| {
72 AvroError::ParseSchema(ParseSchemaError::new(format!(
73 "unable to decode schema bytes: {}",
74 e
75 )))
76 })
77 } else {
78 unreachable!()
79 }
80 })?;
81 let writer_schema = Schema::parse(&json).map_err(|e| {
82 ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
83 })?;
84
85 let codec = meta
86 .get("avro.codec")
87 .map(|val| match val {
88 Value::Bytes(bytes) => from_utf8(bytes.as_ref())
89 .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
90 .and_then(|codec| {
91 Codec::from_str(codec).map_err(|_| {
92 AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
93 })
94 }),
95 _ => unreachable!(),
96 })
97 .unwrap_or(Ok(Codec::Null))?;
98
99 let mut marker = [0u8; 16];
100 reader.read_exact(&mut marker)?;
101
102 Ok(Header {
103 writer_schema,
104 marker,
105 codec,
106 })
107 } else {
108 unreachable!()
109 }
110 }
111
112 pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
113 (self.writer_schema, self.marker, self.codec)
114 }
115}
116
117pub struct Reader<R> {
118 header: Header,
119 inner: R,
120 errored: bool,
121 resolved_schema: Option<Schema>,
122 messages_remaining: usize,
123 buf: Vec<u8>,
125 buf_idx: usize,
126}
127
128pub struct BlockIter<R> {
130 inner: Reader<R>,
131}
132
133#[derive(Debug, Clone)]
135pub struct Block {
136 pub bytes: Vec<u8>,
138 pub len: usize,
140}
141
142impl<R: AvroRead> BlockIter<R> {
143 pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
144 Ok(Self {
145 inner: Reader::with_schema(reader_schema, inner)?,
146 })
147 }
148}
149
150impl<R: AvroRead> Iterator for BlockIter<R> {
151 type Item = Result<Block, AvroError>;
152
153 fn next(&mut self) -> Option<Self::Item> {
154 assert!(self.inner.is_empty());
155
156 match self.inner.read_block_next() {
157 Ok(()) => {
158 if self.inner.is_empty() {
159 None
160 } else {
161 let bytes = std::mem::take(&mut self.inner.buf);
162 let len = std::mem::take(&mut self.inner.messages_remaining);
163 Some(Ok(Block { bytes, len }))
164 }
165 }
166 Err(e) => Some(Err(e)),
167 }
168 }
169}
170
171impl<R: AvroRead> Reader<R> {
172 pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
177 let header = Header::from_reader(&mut inner)?;
178 let reader = Reader {
179 header,
180 inner,
181 errored: false,
182 resolved_schema: None,
183 messages_remaining: 0,
184 buf: vec![],
185 buf_idx: 0,
186 };
187 Ok(reader)
188 }
189
190 pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
195 let header = Header::from_reader(&mut inner)?;
196
197 let writer_schema = &header.writer_schema;
198 let resolved_schema = if reader_schema.fingerprint::<Sha256>().bytes
199 != writer_schema.fingerprint::<Sha256>().bytes
200 {
201 Some(resolve_schemas(writer_schema, reader_schema)?)
202 } else {
203 None
204 };
205
206 Ok(Reader {
207 header,
208 errored: false,
209 resolved_schema,
210 inner,
211 messages_remaining: 0,
212 buf: vec![],
213 buf_idx: 0,
214 })
215 }
216
217 pub fn writer_schema(&self) -> &Schema {
219 &self.header.writer_schema
220 }
221
222 pub fn schema(&self) -> &Schema {
226 match &self.resolved_schema {
227 Some(schema) => schema,
228 None => self.writer_schema(),
229 }
230 }
231
232 #[inline]
233 pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
235 if self.is_empty() {
236 self.read_block_next()?;
237 if self.is_empty() {
238 return Ok(None);
239 }
240 }
241
242 let mut block_bytes = &self.buf[self.buf_idx..];
243 let b_original = block_bytes.len();
244 let schema = self.schema();
245 let item = from_avro_datum(schema, &mut block_bytes)?;
246 self.buf_idx += b_original - block_bytes.len();
247 self.messages_remaining -= 1;
248 Ok(Some(item))
249 }
250
251 fn is_empty(&self) -> bool {
252 self.messages_remaining == 0
253 }
254
255 fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
256 if n >= self.buf.len() {
258 self.buf.resize(n, 0);
259 }
260
261 self.inner.read_exact(&mut self.buf[..n])?;
262 self.buf_idx = 0;
263 Ok(())
264 }
265
266 fn read_block_next(&mut self) -> Result<(), AvroError> {
267 assert!(self.is_empty(), "Expected self to be empty!");
268 match util::read_long(&mut self.inner) {
269 Ok(block_len) => {
270 self.messages_remaining = block_len as usize;
271 let block_bytes = util::read_long(&mut self.inner)?;
272 self.fill_buf(block_bytes as usize)?;
273 let mut marker = [0u8; 16];
274 self.inner.read_exact(&mut marker)?;
275
276 if marker != self.header.marker {
277 return Err(DecodeError::MismatchedBlockHeader {
278 expected: self.header.marker,
279 actual: marker,
280 }
281 .into());
282 }
283
284 self.header.codec.decompress(&mut self.buf)?;
291
292 Ok(())
293 }
294 Err(e) => {
295 if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
296 Ok(())
298 } else {
299 Err(e)
300 }
301 }
302 }
303 }
304}
305
306impl<R: AvroRead> Iterator for Reader<R> {
307 type Item = Result<Value, AvroError>;
308
309 fn next(&mut self) -> Option<Self::Item> {
310 if self.errored {
312 return None;
313 };
314 match self.read_next() {
315 Ok(opt) => opt.map(Ok),
316 Err(e) => {
317 self.errored = true;
318 Some(Err(e))
319 }
320 }
321 }
322}
323
324pub struct SchemaResolver<'a> {
325 pub named: Vec<Option<NamedSchemaPiece>>,
326 pub indices: BTreeMap<FullName, usize>,
327 pub human_readable_field_path: Vec<String>,
328 pub current_human_readable_path_start: usize,
329 pub writer_to_reader_names: BTreeMap<usize, usize>,
330 pub reader_to_writer_names: BTreeMap<usize, usize>,
331 pub reader_to_resolved_names: BTreeMap<usize, usize>,
332 #[allow(dead_code)]
333 pub reader_fullnames: BTreeMap<usize, &'a FullName>,
334 pub reader_schema: &'a Schema,
335}
336
337impl<'a> SchemaResolver<'a> {
338 fn resolve_named(
339 &mut self,
340 writer: &Schema,
341 reader: &Schema,
342 writer_index: usize,
343 reader_index: usize,
344 ) -> Result<SchemaPiece, AvroError> {
345 let ws = writer.lookup(writer_index);
346 let rs = reader.lookup(reader_index);
347 let typ = match (&ws.piece, &rs.piece) {
348 (
349 SchemaPiece::Record {
350 fields: w_fields,
351 lookup: w_lookup,
352 ..
353 },
354 SchemaPiece::Record {
355 fields: r_fields,
356 lookup: _r_lookup,
357 ..
358 },
359 ) => {
360 let mut defaults = Vec::new();
361 let mut fields: Vec<Option<RecordField>> = Vec::new();
362 for (r_index, rf) in r_fields.iter().enumerate() {
363 match w_lookup.get(&rf.name) {
364 None => {
365 let default_field = match &rf.default {
366 Some(v) => ResolvedDefaultValueField {
367 name: rf.name.clone(),
368 doc: rf.doc.clone(),
369 default: reader
370 .top_node_or_named()
371 .step(&rf.schema)
372 .lookup()
373 .json_to_value(v)?,
374 order: rf.order.clone(),
375 position: r_index,
376 },
377 None => return Err(SchemaResolutionError::new(format!(
378 "Reader field `{}.{}` not found in writer, and has no default",
379 self.get_current_human_readable_path(),
380 rf.name
381 ))
382 .into()),
383 };
384 defaults.push(default_field);
385 }
386 Some(w_index) => {
387 if fields.len() > *w_index && fields[*w_index].is_some() {
388 return Err(SchemaResolutionError::new(format!(
389 "Duplicate field `{}.{}` in schema",
390 self.get_current_human_readable_path(),
391 rf.name
392 ))
393 .into());
394 }
395 let wf = &w_fields[*w_index];
396 let w_node = SchemaNodeOrNamed {
397 root: writer,
398 inner: wf.schema.as_ref(),
399 };
400 let r_node = SchemaNodeOrNamed {
401 root: reader,
402 inner: rf.schema.as_ref(),
403 };
404
405 self.human_readable_field_path.push(rf.name.clone());
406 let new_inner = self.resolve(w_node, r_node)?;
407 self.human_readable_field_path.pop();
408
409 let field = RecordField {
410 name: rf.name.clone(),
411 doc: rf.doc.clone(),
412 default: rf.default.clone(),
413 schema: new_inner,
414 order: rf.order.clone(),
415 position: r_index,
416 };
417 while fields.len() <= *w_index {
418 fields.push(None);
419 }
420 fields[*w_index] = Some(field)
421 }
422 }
423 }
424 while fields.len() < w_fields.len() {
425 fields.push(None);
426 }
427 let mut n_present = 0;
428 let fields = fields
429 .into_iter()
430 .enumerate()
431 .map(|(i, rf)| match rf {
432 Some(rf) => {
433 n_present += 1;
434 ResolvedRecordField::Present(rf)
435 }
436 None => {
437 let writer_schema_piece = SchemaNodeOrNamed {
449 root: writer,
450 inner: w_fields[i].schema.as_ref(),
451 }
452 .to_schema();
453 ResolvedRecordField::Absent(writer_schema_piece)
454 }
455 })
456 .collect();
457 let n_reader_fields = defaults.len() + n_present;
458 SchemaPiece::ResolveRecord {
459 defaults,
460 fields,
461 n_reader_fields,
462 }
463 }
464 (
465 SchemaPiece::Enum {
466 symbols: w_symbols, ..
467 },
468 SchemaPiece::Enum {
469 symbols: r_symbols,
470 doc,
471 default_idx,
472 },
473 ) => {
474 let r_map = r_symbols
475 .iter()
476 .enumerate()
477 .map(|(i, s)| (s, i))
478 .collect::<BTreeMap<_, _>>();
479 let symbols = w_symbols
480 .iter()
481 .map(|s| {
482 r_map
483 .get(s)
484 .map(|i| (*i, s.clone()))
485 .ok_or_else(|| s.clone())
486 })
487 .collect();
488 SchemaPiece::ResolveEnum {
489 doc: doc.clone(),
490 symbols,
491 default: default_idx.map(|i| (i, r_symbols[i].clone())),
492 }
493 }
494 (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
495 if *wsz == *rsz {
496 SchemaPiece::Fixed { size: *wsz }
497 } else {
498 return Err(SchemaResolutionError::new(format!(
499 "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
500 &rs.name,
501 wsz,
502 rsz,
503 self.get_current_human_readable_path(),
504 ))
505 .into());
506 }
507 }
508 (
509 SchemaPiece::Decimal {
510 precision: wp,
511 scale: wscale,
512 fixed_size: wsz,
513 },
514 SchemaPiece::Decimal {
515 precision: rp,
516 scale: rscale,
517 fixed_size: rsz,
518 },
519 ) => {
520 if wp != rp {
521 return Err(SchemaResolutionError::new(format!(
522 "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
523 &rs.name,
524 wp,
525 rp,
526 self.get_current_human_readable_path(),
527 ))
528 .into());
529 }
530 if wscale != rscale {
531 return Err(SchemaResolutionError::new(format!(
532 "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
533 &rs.name,
534 wscale,
535 rscale,
536 self.get_current_human_readable_path(),
537 ))
538 .into());
539 }
540 if wsz != rsz {
541 return Err(SchemaResolutionError::new(format!(
542 "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
543 &rs.name,
544 wsz,
545 rsz,
546 self.get_current_human_readable_path(),
547 ))
548 .into());
549 }
550 SchemaPiece::Decimal {
551 precision: *wp,
552 scale: *wscale,
553 fixed_size: *wsz,
554 }
555 }
556 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
557 if *fixed_size == Some(*size) =>
558 {
559 SchemaPiece::Fixed { size: *size }
560 }
561 (
562 SchemaPiece::Fixed { size },
563 SchemaPiece::Decimal {
564 precision,
565 scale,
566 fixed_size,
567 },
568 ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
569 precision: *precision,
570 scale: *scale,
571 fixed_size: *fixed_size,
572 },
573
574 (_, SchemaPiece::ResolveRecord { .. })
575 | (_, SchemaPiece::ResolveEnum { .. })
576 | (SchemaPiece::ResolveRecord { .. }, _)
577 | (SchemaPiece::ResolveEnum { .. }, _) => {
578 return Err(SchemaResolutionError::new(
579 "Attempted to resolve an already resolved schema".to_string(),
580 )
581 .into());
582 }
583
584 (_wt, _rt) => {
585 return Err(SchemaResolutionError::new(format!(
586 "Non-matching schemas: writer: {:?}, reader: {:?}",
587 ws.name, rs.name
588 ))
589 .into());
590 }
591 };
592 Ok(typ)
593 }
594
595 pub fn resolve(
596 &mut self,
597 writer: SchemaNodeOrNamed,
598 reader: SchemaNodeOrNamed,
599 ) -> Result<SchemaPieceOrNamed, AvroError> {
600 let previous_human_readable_path_start = self.current_human_readable_path_start;
601 let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
602 if let Some(full_name) = named_node {
603 self.current_human_readable_path_start = self.human_readable_field_path.len();
604 self.human_readable_field_path.push(full_name.human_name());
605 }
606
607 let inner = match (writer.inner, reader.inner) {
608 (
622 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
623 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
624 ) => {
625 let w2r = self.writer_to_reader_names.clone();
626 let permutation = w_inner
634 .variants()
635 .iter()
636 .map(|w_variant| {
637 let (r_idx, r_variant) =
638 r_inner.match_(w_variant, &w2r).ok_or_else(|| {
639 SchemaResolutionError::new(format!(
640 "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
641 w_variant.get_human_name(writer.root),
642 self.get_current_human_readable_path()
643 ))
644 })?;
645 let resolved =
646 self.resolve(writer.step(w_variant), reader.step(r_variant))?;
647 Ok((r_idx, resolved))
648 })
649 .collect();
650 let n_reader_variants = r_inner.variants().len();
651 let reader_null_variant = r_inner
652 .variants()
653 .iter()
654 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
655 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
656 permutation,
657 n_reader_variants,
658 reader_null_variant,
659 })
660 }
661 (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
663 let n_reader_variants = r_inner.variants().len();
664 let reader_null_variant = r_inner
665 .variants()
666 .iter()
667 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
668 let (index, r_inner) = r_inner
669 .match_ref(other, &self.writer_to_reader_names)
670 .ok_or_else(|| {
671 SchemaResolutionError::new(
672 format!("No matching schema in reader union for writer type `{}` for field `{}`",
673 other.get_human_name(writer.root),
674 self.get_current_human_readable_path()))
675 })?;
676 let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
677 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
678 index,
679 inner,
680 n_reader_variants,
681 reader_null_variant,
682 })
683 }
684 (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
686 let (index, w_inner) = w_inner
687 .match_ref(other, &self.reader_to_writer_names)
688 .ok_or_else(|| {
689 SchemaResolutionError::new(
690 format!("No matching schema in writer union for reader type `{}` for field `{}`",
691 other.get_human_name(writer.root),
692 self.get_current_human_readable_path()))
693 })?;
694 let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
695 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
696 }
697 (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
699 match (wp, rp) {
700 (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
707 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
708 }
709 (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
711 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
712 }
713 (SchemaPiece::Date, SchemaPiece::TimestampMilli)
714 | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
715 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
716 }
717 (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
718 SchemaPieceOrNamed::Piece(rp.clone()) }
720 (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
721 SchemaPieceOrNamed::Piece(rp.clone()) }
723 (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
724 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
725 }
726 (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
727 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
728 }
729 (SchemaPiece::Null, SchemaPiece::Null) => {
730 SchemaPieceOrNamed::Piece(SchemaPiece::Null)
731 }
732 (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
733 SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
734 }
735 (SchemaPiece::Int, SchemaPiece::Long) => {
736 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
737 }
738 (SchemaPiece::Int, SchemaPiece::Float) => {
739 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
740 }
741 (SchemaPiece::Int, SchemaPiece::Double) => {
742 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
743 }
744 (SchemaPiece::Long, SchemaPiece::Float) => {
745 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
746 }
747 (SchemaPiece::Long, SchemaPiece::Double) => {
748 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
749 }
750 (SchemaPiece::Float, SchemaPiece::Float) => {
751 SchemaPieceOrNamed::Piece(SchemaPiece::Float)
752 }
753 (SchemaPiece::Float, SchemaPiece::Double) => {
754 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
755 }
756 (SchemaPiece::Double, SchemaPiece::Double) => {
757 SchemaPieceOrNamed::Piece(SchemaPiece::Double)
758 }
759 (b, SchemaPiece::Bytes)
760 if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
761 {
762 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
763 }
764 (s, SchemaPiece::String)
765 if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
766 {
767 SchemaPieceOrNamed::Piece(SchemaPiece::String)
768 }
769 (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
770 let inner =
771 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
772 SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
773 }
774 (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
775 let inner =
776 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
777 SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
778 }
779 (
780 SchemaPiece::Decimal {
781 precision: wp,
782 scale: ws,
783 fixed_size: wf,
784 },
785 SchemaPiece::Decimal {
786 precision: rp,
787 scale: rs,
788 fixed_size: rf,
789 },
790 ) => {
791 if wp == rp && ws == rs && wf == rf {
792 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
793 precision: *wp,
794 scale: *ws,
795 fixed_size: *wf,
796 })
797 } else {
798 return Err(SchemaResolutionError::new(format!(
799 "Decimal types must match in precision, scale, and fixed size. \
800 Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
801 wp,
802 ws,
803 wf,
804 rp,
805 rs,
806 rf,
807 self.get_current_human_readable_path(),
808 ))
809 .into());
810 }
811 }
812 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
813 if *fixed_size == None =>
814 {
815 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
816 }
817 (SchemaPiece::Json, SchemaPiece::Json) => {
821 SchemaPieceOrNamed::Piece(SchemaPiece::Json)
822 }
823 (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
824 SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
825 }
826 (
827 SchemaPiece::Bytes,
828 SchemaPiece::Decimal {
829 precision,
830 scale,
831 fixed_size,
832 },
833 ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
834 precision: *precision,
835 scale: *scale,
836 fixed_size: *fixed_size,
837 }),
838 (ws, rs) => {
839 return Err(SchemaResolutionError::new(format!(
840 "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
841 ws,
842 rs,
843 self.get_current_human_readable_path(),
844 ))
845 .into());
846 }
847 }
848 }
849 (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
851 if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
852 let (w_name, r_name) = (
854 &writer.root.lookup(w_index).name,
855 &reader.root.lookup(r_index).name,
856 );
857 return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {:?} against reader schema node named {:?}", w_name, r_name)).into());
858 }
859 let idx = match self.reader_to_resolved_names.get(&r_index) {
862 Some(resolved) => *resolved,
863 None => {
864 let resolved_idx = self.named.len();
870 self.reader_to_resolved_names.insert(r_index, resolved_idx);
871 self.named.push(None);
872 let piece =
873 match self.resolve_named(writer.root, reader.root, w_index, r_index) {
874 Ok(piece) => piece,
875 Err(e) => {
876 self.named.pop();
878 self.reader_to_resolved_names.remove(&r_index);
879 return Err(e);
880 }
881 };
882 let name = &self.reader_schema.named[r_index].name;
883 let ns = NamedSchemaPiece {
884 name: name.clone(),
885 piece,
886 };
887 self.named[resolved_idx] = Some(ns);
888 self.indices.insert(name.clone(), resolved_idx);
889
890 resolved_idx
891 }
892 };
893 SchemaPieceOrNamed::Named(idx)
894 }
895 (ws, rs) => {
896 return Err(SchemaResolutionError::new(format!(
897 "Schemas don't match: {:?}, {:?} for field `{}`",
898 ws.get_piece_and_name(writer.root).0,
899 rs.get_piece_and_name(reader.root).0,
900 self.get_current_human_readable_path(),
901 ))
902 .into());
903 }
904 };
905 if named_node.is_some() {
906 self.human_readable_field_path.pop();
907 self.current_human_readable_path_start = previous_human_readable_path_start;
908 }
909 Ok(inner)
910 }
911
912 fn get_current_human_readable_path(&self) -> String {
913 self.human_readable_field_path[self.current_human_readable_path_start..].join(".")
914 }
915}
916
917pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
926 let value = decode(schema.top_node(), reader)?;
927 Ok(value)
928}
929
930#[cfg(test)]
931mod tests {
932 use std::io::Cursor;
933
934 use mz_ore::assert_err;
935
936 use crate::Reader;
937 use crate::types::{Record, ToAvro};
938
939 use super::*;
940
941 static SCHEMA: &str = r#"
942 {
943 "type": "record",
944 "name": "test",
945 "fields": [
946 {"name": "a", "type": "long", "default": 42},
947 {"name": "b", "type": "string"}
948 ]
949 }
950 "#;
951 static UNION_SCHEMA: &str = r#"
952 ["null", "long"]
953 "#;
954 static ENCODED: &[u8] = &[
955 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
956 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
957 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
958 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
959 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
960 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
961 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
962 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
963 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
964 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
965 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
966 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
967 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
968 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
969 ];
970
971 #[mz_ore::test]
972 fn test_from_avro_datum() {
973 let schema: Schema = SCHEMA.parse().unwrap();
974 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
975
976 let mut record = Record::new(schema.top_node()).unwrap();
977 record.put("a", 27i64);
978 record.put("b", "foo");
979 let expected = record.avro();
980
981 assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
982 }
983
984 #[mz_ore::test]
985 fn test_null_union() {
986 let schema: Schema = UNION_SCHEMA.parse().unwrap();
987 let mut encoded: &'static [u8] = &[2, 0];
988
989 assert_eq!(
990 from_avro_datum(&schema, &mut encoded).unwrap(),
991 Value::Union {
992 index: 1,
993 inner: Box::new(Value::Long(0)),
994 n_variants: 2,
995 null_variant: Some(0)
996 }
997 );
998 }
999
1000 #[mz_ore::test]
1001 #[cfg_attr(miri, ignore)] fn test_reader_stream() {
1003 let schema: Schema = SCHEMA.parse().unwrap();
1004 let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1005
1006 let mut record1 = Record::new(schema.top_node()).unwrap();
1007 record1.put("a", 27i64);
1008 record1.put("b", "foo");
1009
1010 let mut record2 = Record::new(schema.top_node()).unwrap();
1011 record2.put("a", 42i64);
1012 record2.put("b", "bar");
1013
1014 let expected = [record1.avro(), record2.avro()];
1015
1016 for (i, value) in reader.enumerate() {
1017 assert_eq!(value.unwrap(), expected[i]);
1018 }
1019 }
1020
1021 #[mz_ore::test]
1022 fn test_reader_invalid_header() {
1023 let schema: Schema = SCHEMA.parse().unwrap();
1024 let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1025 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1026 }
1027
1028 #[mz_ore::test]
1029 #[cfg_attr(miri, ignore)] fn test_reader_invalid_block() {
1031 let schema: Schema = SCHEMA.parse().unwrap();
1032 let invalid = ENCODED
1033 .iter()
1034 .rev()
1035 .skip(19)
1036 .copied()
1037 .collect::<Vec<u8>>()
1038 .into_iter()
1039 .rev()
1040 .collect::<Vec<u8>>();
1041 let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1042 for value in reader {
1043 assert_err!(value);
1044 }
1045 }
1046
1047 #[mz_ore::test]
1048 fn test_reader_empty_buffer() {
1049 let empty = Cursor::new(Vec::new());
1050 assert!(Reader::new(empty).is_err());
1051 }
1052
1053 #[mz_ore::test]
1054 fn test_reader_only_header() {
1055 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1056 let reader = Reader::new(&invalid[..]).unwrap();
1057 for value in reader {
1058 assert_err!(value);
1059 }
1060 }
1061
1062 #[mz_ore::test]
1063 fn test_resolution_nested_types_error() {
1064 let r = r#"
1065{
1066 "type": "record",
1067 "name": "com.materialize.foo",
1068 "fields": [
1069 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1070 ]
1071}
1072"#;
1073 let w = r#"
1074{
1075 "type": "record",
1076 "name": "com.materialize.foo",
1077 "fields": [
1078 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1079 ]
1080}
1081"#;
1082 let r: Schema = r.parse().unwrap();
1083 let w: Schema = w.parse().unwrap();
1084 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1085 resolve_schemas(&w, &r)
1086 {
1087 s
1088 } else {
1089 panic!("Expected schema resolution failure");
1090 };
1091 assert_eq!(
1095 &err_str,
1096 "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`"
1097 );
1098 }
1099
1100 #[mz_ore::test]
1101 fn test_extra_fields_without_default_error() {
1102 let r = r#"
1103{
1104 "type": "record",
1105 "name": "com.materialize.foo",
1106 "fields": [
1107 {"name": "f1", "type": "int"},
1108 {"name": "f2", "type": "int"}
1109 ]
1110}
1111"#;
1112 let w = r#"
1113{
1114 "type": "record",
1115 "name": "com.materialize.foo",
1116 "fields": [
1117 {"name": "f1", "type": "int"}
1118 ]
1119}
1120"#;
1121 let r: Schema = r.parse().unwrap();
1122 let w: Schema = w.parse().unwrap();
1123 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1124 resolve_schemas(&w, &r)
1125 {
1126 s
1127 } else {
1128 panic!("Expected schema resolution failure");
1129 };
1130 assert_eq!(
1131 &err_str,
1132 "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1133 );
1134 }
1135
1136 #[mz_ore::test]
1137 fn test_duplicate_field_error() {
1138 let r = r#"
1139{
1140 "type": "record",
1141 "name": "com.materialize.bar",
1142 "fields": [
1143 {"name": "f1", "type": "int"},
1144 {"name": "f1", "type": "int"}
1145 ]
1146}
1147"#;
1148 let w = r#"
1149{
1150 "type": "record",
1151 "name": "com.materialize.bar",
1152 "fields": [
1153 {"name": "f1", "type": "int"}
1154 ]
1155}
1156"#;
1157 let r: Schema = r.parse().unwrap();
1158 let w: Schema = w.parse().unwrap();
1159 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1160 resolve_schemas(&w, &r)
1161 {
1162 s
1163 } else {
1164 panic!("Expected schema resolution failure");
1165 };
1166 assert_eq!(
1167 &err_str,
1168 "Duplicate field `com.materialize.bar.f1` in schema"
1169 );
1170 }
1171
1172 #[mz_ore::test]
1173 fn test_decimal_field_mismatch_error() {
1174 let r = r#"
1175{
1176 "type": "record",
1177 "name": "com.materialize.foo",
1178 "fields": [
1179 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1180 ]
1181}
1182"#;
1183 let w = r#"
1184{
1185 "type": "record",
1186 "name": "com.materialize.foo",
1187 "fields": [
1188 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1189 ]
1190}
1191"#;
1192 let r: Schema = r.parse().unwrap();
1193 let w: Schema = w.parse().unwrap();
1194 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1195 resolve_schemas(&w, &r)
1196 {
1197 s
1198 } else {
1199 panic!("Expected schema resolution failure");
1200 };
1201 assert_eq!(
1202 &err_str,
1203 "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1204 );
1205 }
1206}