1use std::collections::BTreeMap;
27use std::str::{FromStr, from_utf8};
28
29use aws_lc_rs::digest;
30use serde_json::from_slice;
31
32use crate::decode::{AvroRead, decode};
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35 FullName, NamedSchemaPiece, ParseSchemaError, RecordField, ResolvedDefaultValueField,
36 ResolvedRecordField, Schema, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
37 SchemaPieceRefOrNamed, resolve_schemas,
38};
39use crate::types::Value;
40use crate::{Codec, SchemaResolutionError, util};
41
42#[derive(Debug, Clone)]
43pub(crate) struct Header {
44 writer_schema: Schema,
45 marker: [u8; 16],
46 codec: Codec,
47}
48
49impl Header {
50 pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
51 let meta_schema = Schema {
52 named: vec![],
53 indices: Default::default(),
54 top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
55 };
56
57 let mut buf = [0u8; 4];
58 reader.read_exact(&mut buf)?;
59
60 if buf != [b'O', b'b', b'j', 1u8] {
61 return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
62 }
63
64 if let Value::Map(meta) = decode(meta_schema.top_node(), reader)? {
65 let json = meta
67 .get("avro.schema")
68 .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
69 .and_then(|bytes| {
70 if let Value::Bytes(ref bytes) = *bytes {
71 from_slice(bytes.as_ref()).map_err(|e| {
72 AvroError::ParseSchema(ParseSchemaError::new(format!(
73 "unable to decode schema bytes: {}",
74 e
75 )))
76 })
77 } else {
78 unreachable!()
79 }
80 })?;
81 let writer_schema = Schema::parse(&json).map_err(|e| {
82 ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
83 })?;
84
85 let codec = meta
86 .get("avro.codec")
87 .map(|val| match val {
88 Value::Bytes(bytes) => from_utf8(bytes.as_ref())
89 .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
90 .and_then(|codec| {
91 Codec::from_str(codec).map_err(|_| {
92 AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
93 })
94 }),
95 _ => unreachable!(),
96 })
97 .unwrap_or(Ok(Codec::Null))?;
98
99 let mut marker = [0u8; 16];
100 reader.read_exact(&mut marker)?;
101
102 Ok(Header {
103 writer_schema,
104 marker,
105 codec,
106 })
107 } else {
108 unreachable!()
109 }
110 }
111
112 pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
113 (self.writer_schema, self.marker, self.codec)
114 }
115}
116
117pub struct Reader<R> {
118 header: Header,
119 inner: R,
120 errored: bool,
121 resolved_schema: Option<Schema>,
122 messages_remaining: usize,
123 buf: Vec<u8>,
125 buf_idx: usize,
126}
127
128pub struct BlockIter<R> {
130 inner: Reader<R>,
131}
132
133#[derive(Debug, Clone)]
135pub struct Block {
136 pub bytes: Vec<u8>,
138 pub len: usize,
140}
141
142impl<R: AvroRead> BlockIter<R> {
143 pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
144 Ok(Self {
145 inner: Reader::with_schema(reader_schema, inner)?,
146 })
147 }
148}
149
150impl<R: AvroRead> Iterator for BlockIter<R> {
151 type Item = Result<Block, AvroError>;
152
153 fn next(&mut self) -> Option<Self::Item> {
154 assert!(self.inner.is_empty());
155
156 match self.inner.read_block_next() {
157 Ok(()) => {
158 if self.inner.is_empty() {
159 None
160 } else {
161 let bytes = std::mem::take(&mut self.inner.buf);
162 let len = std::mem::take(&mut self.inner.messages_remaining);
163 Some(Ok(Block { bytes, len }))
164 }
165 }
166 Err(e) => Some(Err(e)),
167 }
168 }
169}
170
171impl<R: AvroRead> Reader<R> {
172 pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
177 let header = Header::from_reader(&mut inner)?;
178 let reader = Reader {
179 header,
180 inner,
181 errored: false,
182 resolved_schema: None,
183 messages_remaining: 0,
184 buf: vec![],
185 buf_idx: 0,
186 };
187 Ok(reader)
188 }
189
190 pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
195 let header = Header::from_reader(&mut inner)?;
196
197 let writer_schema = &header.writer_schema;
198 let resolved_schema = if reader_schema.fingerprint(&digest::SHA256).bytes
199 != writer_schema.fingerprint(&digest::SHA256).bytes
200 {
201 Some(resolve_schemas(writer_schema, reader_schema)?)
202 } else {
203 None
204 };
205
206 Ok(Reader {
207 header,
208 errored: false,
209 resolved_schema,
210 inner,
211 messages_remaining: 0,
212 buf: vec![],
213 buf_idx: 0,
214 })
215 }
216
217 pub fn writer_schema(&self) -> &Schema {
219 &self.header.writer_schema
220 }
221
222 pub fn schema(&self) -> &Schema {
226 match &self.resolved_schema {
227 Some(schema) => schema,
228 None => self.writer_schema(),
229 }
230 }
231
232 #[inline]
233 pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
235 if self.is_empty() {
236 self.read_block_next()?;
237 if self.is_empty() {
238 return Ok(None);
239 }
240 }
241
242 let mut block_bytes = &self.buf[self.buf_idx..];
243 let b_original = block_bytes.len();
244 let schema = self.schema();
245 let item = from_avro_datum(schema, &mut block_bytes)?;
246 self.buf_idx += b_original - block_bytes.len();
247 self.messages_remaining -= 1;
248 Ok(Some(item))
249 }
250
251 fn is_empty(&self) -> bool {
252 self.messages_remaining == 0
253 }
254
255 fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
256 if n >= self.buf.len() {
258 self.buf.resize(n, 0);
259 }
260
261 self.inner.read_exact(&mut self.buf[..n])?;
262 self.buf_idx = 0;
263 Ok(())
264 }
265
266 fn read_block_next(&mut self) -> Result<(), AvroError> {
267 assert!(self.is_empty(), "Expected self to be empty!");
268 match util::read_long(&mut self.inner) {
269 Ok(block_len) => {
270 self.messages_remaining = util::safe_len(block_len as usize)?;
277 let block_bytes = util::safe_len(util::read_long(&mut self.inner)? as usize)?;
278 self.fill_buf(block_bytes)?;
279 let mut marker = [0u8; 16];
280 self.inner.read_exact(&mut marker)?;
281
282 if marker != self.header.marker {
283 return Err(DecodeError::MismatchedBlockHeader {
284 expected: self.header.marker,
285 actual: marker,
286 }
287 .into());
288 }
289
290 self.header.codec.decompress(&mut self.buf)?;
297
298 Ok(())
299 }
300 Err(e) => {
301 if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
302 Ok(())
304 } else {
305 Err(e)
306 }
307 }
308 }
309 }
310}
311
312impl<R: AvroRead> Iterator for Reader<R> {
313 type Item = Result<Value, AvroError>;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 if self.errored {
318 return None;
319 };
320 match self.read_next() {
321 Ok(opt) => opt.map(Ok),
322 Err(e) => {
323 self.errored = true;
324 Some(Err(e))
325 }
326 }
327 }
328}
329
330pub struct SchemaResolver<'a> {
331 pub named: Vec<Option<NamedSchemaPiece>>,
332 pub indices: BTreeMap<FullName, usize>,
333 pub human_readable_field_path: Vec<String>,
334 pub current_human_readable_path_start: usize,
335 pub writer_to_reader_names: BTreeMap<usize, usize>,
336 pub reader_to_writer_names: BTreeMap<usize, usize>,
337 pub reader_to_resolved_names: BTreeMap<usize, usize>,
338 #[allow(dead_code)]
339 pub reader_fullnames: BTreeMap<usize, &'a FullName>,
340 pub reader_schema: &'a Schema,
341}
342
343impl<'a> SchemaResolver<'a> {
344 fn resolve_named(
345 &mut self,
346 writer: &Schema,
347 reader: &Schema,
348 writer_index: usize,
349 reader_index: usize,
350 ) -> Result<SchemaPiece, AvroError> {
351 let ws = writer.lookup(writer_index);
352 let rs = reader.lookup(reader_index);
353 let typ = match (&ws.piece, &rs.piece) {
354 (
355 SchemaPiece::Record {
356 fields: w_fields,
357 lookup: w_lookup,
358 ..
359 },
360 SchemaPiece::Record {
361 fields: r_fields,
362 lookup: _r_lookup,
363 ..
364 },
365 ) => {
366 let mut defaults = Vec::new();
367 let mut fields: Vec<Option<RecordField>> = Vec::new();
368 for (r_index, rf) in r_fields.iter().enumerate() {
369 match w_lookup.get(&rf.name) {
370 None => {
371 let default_field = match &rf.default {
372 Some(v) => ResolvedDefaultValueField {
373 name: rf.name.clone(),
374 doc: rf.doc.clone(),
375 default: reader
376 .top_node_or_named()
377 .step(&rf.schema)
378 .lookup()
379 .json_to_value(v)?,
380 order: rf.order.clone(),
381 position: r_index,
382 },
383 None => return Err(SchemaResolutionError::new(format!(
384 "Reader field `{}.{}` not found in writer, and has no default",
385 self.get_current_human_readable_path(),
386 rf.name
387 ))
388 .into()),
389 };
390 defaults.push(default_field);
391 }
392 Some(w_index) => {
393 if fields.len() > *w_index && fields[*w_index].is_some() {
394 return Err(SchemaResolutionError::new(format!(
395 "Duplicate field `{}.{}` in schema",
396 self.get_current_human_readable_path(),
397 rf.name
398 ))
399 .into());
400 }
401 let wf = &w_fields[*w_index];
402 let w_node = SchemaNodeOrNamed {
403 root: writer,
404 inner: wf.schema.as_ref(),
405 };
406 let r_node = SchemaNodeOrNamed {
407 root: reader,
408 inner: rf.schema.as_ref(),
409 };
410
411 self.human_readable_field_path.push(rf.name.clone());
412 let new_inner = self.resolve(w_node, r_node)?;
413 self.human_readable_field_path.pop();
414
415 let field = RecordField {
416 name: rf.name.clone(),
417 doc: rf.doc.clone(),
418 default: rf.default.clone(),
419 schema: new_inner,
420 order: rf.order.clone(),
421 position: r_index,
422 };
423 while fields.len() <= *w_index {
424 fields.push(None);
425 }
426 fields[*w_index] = Some(field)
427 }
428 }
429 }
430 while fields.len() < w_fields.len() {
431 fields.push(None);
432 }
433 let mut n_present = 0;
434 let fields = fields
435 .into_iter()
436 .enumerate()
437 .map(|(i, rf)| match rf {
438 Some(rf) => {
439 n_present += 1;
440 ResolvedRecordField::Present(rf)
441 }
442 None => {
443 let writer_schema_piece = SchemaNodeOrNamed {
455 root: writer,
456 inner: w_fields[i].schema.as_ref(),
457 }
458 .to_schema();
459 ResolvedRecordField::Absent(writer_schema_piece)
460 }
461 })
462 .collect();
463 let n_reader_fields = defaults.len() + n_present;
464 SchemaPiece::ResolveRecord {
465 defaults,
466 fields,
467 n_reader_fields,
468 }
469 }
470 (
471 SchemaPiece::Enum {
472 symbols: w_symbols, ..
473 },
474 SchemaPiece::Enum {
475 symbols: r_symbols,
476 doc,
477 default_idx,
478 },
479 ) => {
480 let r_map = r_symbols
481 .iter()
482 .enumerate()
483 .map(|(i, s)| (s, i))
484 .collect::<BTreeMap<_, _>>();
485 let symbols = w_symbols
486 .iter()
487 .map(|s| {
488 r_map
489 .get(s)
490 .map(|i| (*i, s.clone()))
491 .ok_or_else(|| s.clone())
492 })
493 .collect();
494 SchemaPiece::ResolveEnum {
495 doc: doc.clone(),
496 symbols,
497 default: default_idx.map(|i| (i, r_symbols[i].clone())),
498 }
499 }
500 (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
501 if *wsz == *rsz {
502 SchemaPiece::Fixed { size: *wsz }
503 } else {
504 return Err(SchemaResolutionError::new(format!(
505 "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
506 &rs.name,
507 wsz,
508 rsz,
509 self.get_current_human_readable_path(),
510 ))
511 .into());
512 }
513 }
514 (
515 SchemaPiece::Decimal {
516 precision: wp,
517 scale: wscale,
518 fixed_size: wsz,
519 },
520 SchemaPiece::Decimal {
521 precision: rp,
522 scale: rscale,
523 fixed_size: rsz,
524 },
525 ) => {
526 if wp != rp {
527 return Err(SchemaResolutionError::new(format!(
528 "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
529 &rs.name,
530 wp,
531 rp,
532 self.get_current_human_readable_path(),
533 ))
534 .into());
535 }
536 if wscale != rscale {
537 return Err(SchemaResolutionError::new(format!(
538 "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
539 &rs.name,
540 wscale,
541 rscale,
542 self.get_current_human_readable_path(),
543 ))
544 .into());
545 }
546 if wsz != rsz {
547 return Err(SchemaResolutionError::new(format!(
548 "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
549 &rs.name,
550 wsz,
551 rsz,
552 self.get_current_human_readable_path(),
553 ))
554 .into());
555 }
556 SchemaPiece::Decimal {
557 precision: *wp,
558 scale: *wscale,
559 fixed_size: *wsz,
560 }
561 }
562 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
563 if *fixed_size == Some(*size) =>
564 {
565 SchemaPiece::Fixed { size: *size }
566 }
567 (
568 SchemaPiece::Fixed { size },
569 SchemaPiece::Decimal {
570 precision,
571 scale,
572 fixed_size,
573 },
574 ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
575 precision: *precision,
576 scale: *scale,
577 fixed_size: *fixed_size,
578 },
579
580 (_, SchemaPiece::ResolveRecord { .. })
581 | (_, SchemaPiece::ResolveEnum { .. })
582 | (SchemaPiece::ResolveRecord { .. }, _)
583 | (SchemaPiece::ResolveEnum { .. }, _) => {
584 return Err(SchemaResolutionError::new(
585 "Attempted to resolve an already resolved schema".to_string(),
586 )
587 .into());
588 }
589
590 (_wt, _rt) => {
591 return Err(SchemaResolutionError::new(format!(
592 "Non-matching schemas: writer: {:?}, reader: {:?}",
593 ws.name, rs.name
594 ))
595 .into());
596 }
597 };
598 Ok(typ)
599 }
600
601 pub fn resolve(
602 &mut self,
603 writer: SchemaNodeOrNamed,
604 reader: SchemaNodeOrNamed,
605 ) -> Result<SchemaPieceOrNamed, AvroError> {
606 let previous_human_readable_path_start = self.current_human_readable_path_start;
607 let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
608 if let Some(full_name) = named_node {
609 self.current_human_readable_path_start = self.human_readable_field_path.len();
610 self.human_readable_field_path.push(full_name.human_name());
611 }
612
613 let inner = match (writer.inner, reader.inner) {
614 (
628 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
629 SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
630 ) => {
631 let w2r = self.writer_to_reader_names.clone();
632 let permutation = w_inner
640 .variants()
641 .iter()
642 .map(|w_variant| {
643 let (r_idx, r_variant) =
644 r_inner.match_promote_writer(w_variant, &w2r).ok_or_else(|| {
645 SchemaResolutionError::new(format!(
646 "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
647 w_variant.get_human_name(writer.root),
648 self.get_current_human_readable_path()
649 ))
650 })?;
651 let resolved =
652 self.resolve(writer.step(w_variant), reader.step(r_variant))?;
653 Ok((r_idx, resolved))
654 })
655 .collect();
656 let n_reader_variants = r_inner.variants().len();
657 let reader_null_variant = r_inner
658 .variants()
659 .iter()
660 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
661 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
662 permutation,
663 n_reader_variants,
664 reader_null_variant,
665 })
666 }
667 (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
669 let n_reader_variants = r_inner.variants().len();
670 let reader_null_variant = r_inner
671 .variants()
672 .iter()
673 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
674 let (index, r_inner) = r_inner
675 .match_ref_promote_writer(other, &self.writer_to_reader_names)
676 .ok_or_else(|| {
677 SchemaResolutionError::new(
678 format!("No matching schema in reader union for writer type `{}` for field `{}`",
679 other.get_human_name(writer.root),
680 self.get_current_human_readable_path()))
681 })?;
682 let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
683 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
684 index,
685 inner,
686 n_reader_variants,
687 reader_null_variant,
688 })
689 }
690 (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
692 let (index, w_inner) = w_inner
693 .match_ref_promote_reader(other, &self.reader_to_writer_names)
694 .ok_or_else(|| {
695 SchemaResolutionError::new(
700 format!("No matching schema in writer union for reader type `{}` for field `{}`",
701 other.get_human_name(reader.root),
702 self.get_current_human_readable_path()))
703 })?;
704 let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
705 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
706 }
707 (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
709 match (wp, rp) {
710 (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
717 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
718 }
719 (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
721 SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
722 }
723 (SchemaPiece::Date, SchemaPiece::TimestampMilli)
724 | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
725 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
726 }
727 (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
728 SchemaPieceOrNamed::Piece(rp.clone()) }
730 (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
731 SchemaPieceOrNamed::Piece(rp.clone()) }
733 (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
734 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
735 }
736 (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
737 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
738 }
739 (SchemaPiece::Null, SchemaPiece::Null) => {
740 SchemaPieceOrNamed::Piece(SchemaPiece::Null)
741 }
742 (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
743 SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
744 }
745 (SchemaPiece::Int, SchemaPiece::Long) => {
746 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
747 }
748 (SchemaPiece::Int, SchemaPiece::Float) => {
749 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
750 }
751 (SchemaPiece::Int, SchemaPiece::Double) => {
752 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
753 }
754 (SchemaPiece::Long, SchemaPiece::Float) => {
755 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
756 }
757 (SchemaPiece::Long, SchemaPiece::Double) => {
758 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
759 }
760 (SchemaPiece::Float, SchemaPiece::Float) => {
761 SchemaPieceOrNamed::Piece(SchemaPiece::Float)
762 }
763 (SchemaPiece::Float, SchemaPiece::Double) => {
764 SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
765 }
766 (SchemaPiece::Double, SchemaPiece::Double) => {
767 SchemaPieceOrNamed::Piece(SchemaPiece::Double)
768 }
769 (b, SchemaPiece::Bytes)
770 if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
771 {
772 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
773 }
774 (s, SchemaPiece::String)
775 if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
776 {
777 SchemaPieceOrNamed::Piece(SchemaPiece::String)
778 }
779 (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
780 let inner =
781 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
782 SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
783 }
784 (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
785 let inner =
786 self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
787 SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
788 }
789 (
790 SchemaPiece::Decimal {
791 precision: wp,
792 scale: ws,
793 fixed_size: wf,
794 },
795 SchemaPiece::Decimal {
796 precision: rp,
797 scale: rs,
798 fixed_size: rf,
799 },
800 ) => {
801 if wp == rp && ws == rs && wf == rf {
802 SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
803 precision: *wp,
804 scale: *ws,
805 fixed_size: *wf,
806 })
807 } else {
808 return Err(SchemaResolutionError::new(format!(
809 "Decimal types must match in precision, scale, and fixed size. \
810 Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
811 wp,
812 ws,
813 wf,
814 rp,
815 rs,
816 rf,
817 self.get_current_human_readable_path(),
818 ))
819 .into());
820 }
821 }
822 (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
823 if *fixed_size == None =>
824 {
825 SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
826 }
827 (SchemaPiece::Json, SchemaPiece::Json) => {
831 SchemaPieceOrNamed::Piece(SchemaPiece::Json)
832 }
833 (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
834 SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
835 }
836 (
837 SchemaPiece::Bytes,
838 SchemaPiece::Decimal {
839 precision,
840 scale,
841 fixed_size,
842 },
843 ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
844 precision: *precision,
845 scale: *scale,
846 fixed_size: *fixed_size,
847 }),
848 (ws, rs) => {
849 return Err(SchemaResolutionError::new(format!(
850 "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
851 ws,
852 rs,
853 self.get_current_human_readable_path(),
854 ))
855 .into());
856 }
857 }
858 }
859 (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
861 if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
862 let (w_name, r_name) = (
864 &writer.root.lookup(w_index).name,
865 &reader.root.lookup(r_index).name,
866 );
867 return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {:?} against reader schema node named {:?}", w_name, r_name)).into());
868 }
869 let idx = match self.reader_to_resolved_names.get(&r_index) {
872 Some(resolved) => *resolved,
873 None => {
874 let resolved_idx = self.named.len();
880 self.reader_to_resolved_names.insert(r_index, resolved_idx);
881 self.named.push(None);
882 let piece =
883 match self.resolve_named(writer.root, reader.root, w_index, r_index) {
884 Ok(piece) => piece,
885 Err(e) => {
886 self.named.truncate(resolved_idx);
895 self.reader_to_resolved_names
896 .retain(|_, v| *v < resolved_idx);
897 self.indices.retain(|_, v| *v < resolved_idx);
898 return Err(e);
899 }
900 };
901 let name = &self.reader_schema.named[r_index].name;
902 let ns = NamedSchemaPiece {
903 name: name.clone(),
904 piece,
905 };
906 self.named[resolved_idx] = Some(ns);
907 self.indices.insert(name.clone(), resolved_idx);
908
909 resolved_idx
910 }
911 };
912 SchemaPieceOrNamed::Named(idx)
913 }
914 (ws, rs) => {
915 return Err(SchemaResolutionError::new(format!(
916 "Schemas don't match: {:?}, {:?} for field `{}`",
917 ws.get_piece_and_name(writer.root).0,
918 rs.get_piece_and_name(reader.root).0,
919 self.get_current_human_readable_path(),
920 ))
921 .into());
922 }
923 };
924 if named_node.is_some() {
925 self.human_readable_field_path.pop();
926 self.current_human_readable_path_start = previous_human_readable_path_start;
927 }
928 Ok(inner)
929 }
930
931 fn get_current_human_readable_path(&self) -> String {
932 self.human_readable_field_path[self.current_human_readable_path_start..].join(".")
933 }
934}
935
936pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
945 let value = decode(schema.top_node(), reader)?;
946 Ok(value)
947}
948
949#[cfg(test)]
950mod tests {
951 use std::io::Cursor;
952
953 use mz_ore::assert_err;
954
955 use crate::Reader;
956 use crate::types::{Record, ToAvro};
957
958 use super::*;
959
960 #[mz_ore::test]
961 fn reader_rejects_huge_block_object_count() {
962 let bytes: &[u8] = &[
966 0x4f, 0x62, 0x6a, 0x01, 0x04, 0x16, 0x61, 0x76, 0x72, 0x6f, 0x2e, 0x73, 0x63, 0x68,
967 0x65, 0x6d, 0x61, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x22,
968 0x6e, 0x75, 0x6c, 0x6c, 0x22, 0x20, 0x00, 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64,
969 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0xbf, 0x00, 0x26,
970 0x35, 0x33, 0x39, 0x33, 0x34, 0x38, 0x33, 0xcd, 0x45, 0x38, 0x56, 0xb1, 0x00, 0x00,
971 0x64, 0x64, 0x7a, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
972 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
973 ];
974 let reader = Reader::new(bytes).expect("OCF header parses");
975 assert!(
978 reader.into_iter().any(|item| item.is_err()),
979 "expected a decode error from the oversized block count"
980 );
981 }
982
983 static SCHEMA: &str = r#"
984 {
985 "type": "record",
986 "name": "test",
987 "fields": [
988 {"name": "a", "type": "long", "default": 42},
989 {"name": "b", "type": "string"}
990 ]
991 }
992 "#;
993 static UNION_SCHEMA: &str = r#"
994 ["null", "long"]
995 "#;
996 static ENCODED: &[u8] = &[
997 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
998 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
999 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
1000 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
1001 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
1002 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
1003 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
1004 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
1005 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
1006 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
1007 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
1008 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
1009 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
1010 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
1011 ];
1012
1013 #[mz_ore::test]
1014 fn test_from_avro_datum() {
1015 let schema: Schema = SCHEMA.parse().unwrap();
1016 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
1017
1018 let mut record = Record::new(schema.top_node()).unwrap();
1019 record.put("a", 27i64);
1020 record.put("b", "foo");
1021 let expected = record.avro();
1022
1023 assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
1024 }
1025
1026 #[mz_ore::test]
1027 fn test_null_union() {
1028 let schema: Schema = UNION_SCHEMA.parse().unwrap();
1029 let mut encoded: &'static [u8] = &[2, 0];
1030
1031 assert_eq!(
1032 from_avro_datum(&schema, &mut encoded).unwrap(),
1033 Value::Union {
1034 index: 1,
1035 inner: Box::new(Value::Long(0)),
1036 n_variants: 2,
1037 null_variant: Some(0)
1038 }
1039 );
1040 }
1041
1042 #[mz_ore::test]
1043 #[cfg_attr(miri, ignore)] fn test_reader_stream() {
1045 let schema: Schema = SCHEMA.parse().unwrap();
1046 let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1047
1048 let mut record1 = Record::new(schema.top_node()).unwrap();
1049 record1.put("a", 27i64);
1050 record1.put("b", "foo");
1051
1052 let mut record2 = Record::new(schema.top_node()).unwrap();
1053 record2.put("a", 42i64);
1054 record2.put("b", "bar");
1055
1056 let expected = [record1.avro(), record2.avro()];
1057
1058 for (i, value) in reader.enumerate() {
1059 assert_eq!(value.unwrap(), expected[i]);
1060 }
1061 }
1062
1063 #[mz_ore::test]
1064 fn test_reader_invalid_header() {
1065 let schema: Schema = SCHEMA.parse().unwrap();
1066 let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1067 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1068 }
1069
1070 #[mz_ore::test]
1071 #[cfg_attr(miri, ignore)] fn test_reader_invalid_block() {
1073 let schema: Schema = SCHEMA.parse().unwrap();
1074 let invalid = ENCODED
1075 .iter()
1076 .rev()
1077 .skip(19)
1078 .copied()
1079 .collect::<Vec<u8>>()
1080 .into_iter()
1081 .rev()
1082 .collect::<Vec<u8>>();
1083 let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1084 for value in reader {
1085 assert_err!(value);
1086 }
1087 }
1088
1089 #[mz_ore::test]
1090 fn test_reader_empty_buffer() {
1091 let empty = Cursor::new(Vec::new());
1092 assert!(Reader::new(empty).is_err());
1093 }
1094
1095 #[mz_ore::test]
1096 fn test_reader_only_header() {
1097 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1098 let reader = Reader::new(&invalid[..]).unwrap();
1099 for value in reader {
1100 assert_err!(value);
1101 }
1102 }
1103
1104 #[mz_ore::test]
1105 fn test_resolution_nested_types_error() {
1106 let r = r#"
1107{
1108 "type": "record",
1109 "name": "com.materialize.foo",
1110 "fields": [
1111 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1112 ]
1113}
1114"#;
1115 let w = r#"
1116{
1117 "type": "record",
1118 "name": "com.materialize.foo",
1119 "fields": [
1120 {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1121 ]
1122}
1123"#;
1124 let r: Schema = r.parse().unwrap();
1125 let w: Schema = w.parse().unwrap();
1126 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1127 resolve_schemas(&w, &r)
1128 {
1129 s
1130 } else {
1131 panic!("Expected schema resolution failure");
1132 };
1133 assert_eq!(
1137 &err_str,
1138 "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`"
1139 );
1140 }
1141
1142 #[mz_ore::test]
1143 fn test_extra_fields_without_default_error() {
1144 let r = r#"
1145{
1146 "type": "record",
1147 "name": "com.materialize.foo",
1148 "fields": [
1149 {"name": "f1", "type": "int"},
1150 {"name": "f2", "type": "int"}
1151 ]
1152}
1153"#;
1154 let w = r#"
1155{
1156 "type": "record",
1157 "name": "com.materialize.foo",
1158 "fields": [
1159 {"name": "f1", "type": "int"}
1160 ]
1161}
1162"#;
1163 let r: Schema = r.parse().unwrap();
1164 let w: Schema = w.parse().unwrap();
1165 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1166 resolve_schemas(&w, &r)
1167 {
1168 s
1169 } else {
1170 panic!("Expected schema resolution failure");
1171 };
1172 assert_eq!(
1173 &err_str,
1174 "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1175 );
1176 }
1177
1178 #[mz_ore::test]
1179 fn test_duplicate_field_error() {
1180 let r = r#"
1181{
1182 "type": "record",
1183 "name": "com.materialize.bar",
1184 "fields": [
1185 {"name": "f1", "type": "int"},
1186 {"name": "f1", "type": "int"}
1187 ]
1188}
1189"#;
1190 let w = r#"
1191{
1192 "type": "record",
1193 "name": "com.materialize.bar",
1194 "fields": [
1195 {"name": "f1", "type": "int"}
1196 ]
1197}
1198"#;
1199 let r: Schema = r.parse().unwrap();
1200 let w: Schema = w.parse().unwrap();
1201 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1202 resolve_schemas(&w, &r)
1203 {
1204 s
1205 } else {
1206 panic!("Expected schema resolution failure");
1207 };
1208 assert_eq!(
1209 &err_str,
1210 "Duplicate field `com.materialize.bar.f1` in schema"
1211 );
1212 }
1213
1214 #[mz_ore::test]
1215 fn test_decimal_field_mismatch_error() {
1216 let r = r#"
1217{
1218 "type": "record",
1219 "name": "com.materialize.foo",
1220 "fields": [
1221 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1222 ]
1223}
1224"#;
1225 let w = r#"
1226{
1227 "type": "record",
1228 "name": "com.materialize.foo",
1229 "fields": [
1230 {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1231 ]
1232}
1233"#;
1234 let r: Schema = r.parse().unwrap();
1235 let w: Schema = w.parse().unwrap();
1236 let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1237 resolve_schemas(&w, &r)
1238 {
1239 s
1240 } else {
1241 panic!("Expected schema resolution failure");
1242 };
1243 assert_eq!(
1244 &err_str,
1245 "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1246 );
1247 }
1248}