1use std::cmp;
25use std::fmt::{self, Display};
26use std::fs::File;
27use std::io::{self, Cursor, Read, Seek, SeekFrom};
28
29use chrono::{DateTime, NaiveDate};
30use flate2::read::MultiGzDecoder;
31
32use crate::error::{DecodeError, Error as AvroError};
33use crate::schema::{
34 RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece,
35 SchemaPieceOrNamed,
36};
37use crate::types::{Scalar, Value};
38use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
39use crate::{TrivialDecoder, ValueDecoder};
40
41pub trait StatefulAvroDecodable: Sized {
42 type Decoder: AvroDecode<Out = Self>;
43 type State;
44 fn new_decoder(state: Self::State) -> Self::Decoder;
45}
46pub trait AvroDecodable: Sized {
47 type Decoder: AvroDecode<Out = Self>;
48
49 fn new_decoder() -> Self::Decoder;
50}
51impl<T> AvroDecodable for T
52where
53 T: StatefulAvroDecodable,
54 T::State: Default,
55{
56 type Decoder = <Self as StatefulAvroDecodable>::Decoder;
57
58 fn new_decoder() -> Self::Decoder {
59 <Self as StatefulAvroDecodable>::new_decoder(Default::default())
60 }
61}
62#[inline]
63fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
64 let u = match zag_i64(reader)? {
65 i if i >= 0 => i as u64,
66 i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
67 };
68 Ok(u)
69}
70
71fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
72 let u = match zag_i32(reader)? {
73 i if i >= 0 => i as u32,
74 i => {
75 return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
76 i as i64,
77 )));
78 }
79 };
80 Ok(u)
81}
82
83#[inline]
84fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
85 zag_i64(reader).and_then(|i| safe_len(i as usize))
86}
87
88#[inline]
89fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
90 let mut buf = [0u8; 4];
91 reader.read_exact(&mut buf[..])?;
92 Ok(f32::from_le_bytes(buf))
93}
94
95#[inline]
96fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
97 let mut buf = [0u8; 8];
98 reader.read_exact(&mut buf[..])?;
99 Ok(f64::from_le_bytes(buf))
100}
101
102impl Display for TsUnit {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 TsUnit::Millis => write!(f, "ms"),
106 TsUnit::Micros => write!(f, "us"),
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use chrono::DateTime;
114
115 use crate::types::Value;
116 use crate::util::TsUnit;
117
118 use super::build_ts_value;
119
120 #[mz_ore::test]
121 fn test_negative_timestamps() {
122 assert_eq!(
123 build_ts_value(-1, TsUnit::Millis).unwrap(),
124 Value::Timestamp(
125 DateTime::from_timestamp(-1, 999_000_000)
126 .unwrap()
127 .naive_utc()
128 )
129 );
130 assert_eq!(
131 build_ts_value(-1000, TsUnit::Millis).unwrap(),
132 Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
133 );
134 assert_eq!(
135 build_ts_value(-1000, TsUnit::Micros).unwrap(),
136 Value::Timestamp(
137 DateTime::from_timestamp(-1, 999_000_000)
138 .unwrap()
139 .naive_utc()
140 )
141 );
142 assert_eq!(
143 build_ts_value(-1, TsUnit::Micros).unwrap(),
144 Value::Timestamp(
145 DateTime::from_timestamp(-1, 999_999_000)
146 .unwrap()
147 .naive_utc()
148 )
149 );
150 assert_eq!(
151 build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
152 Value::Timestamp(
153 DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
154 .unwrap()
155 .naive_utc()
156 )
157 );
158 }
159}
160
161pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
163 let result = match unit {
164 TsUnit::Millis => DateTime::from_timestamp_millis(value),
165 TsUnit::Micros => DateTime::from_timestamp_micros(value),
166 };
167 let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
168 Ok(Value::Timestamp(ndt.naive_utc()))
169}
170
171pub trait AvroRead: Read + Skip {}
176
177impl<T> AvroRead for T where T: Read + Skip {}
178
179pub trait Skip: Read {
181 #[allow(clippy::unused_io_amount)]
197 fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
198 const BUF_SIZE: usize = 512;
199 let mut buf = [0; BUF_SIZE];
200
201 while len > 0 {
202 let n = if len < BUF_SIZE {
203 self.read(&mut buf[..len])?
204 } else {
205 self.read(&mut buf)?
206 };
207 if n == 0 {
208 break;
209 }
210 len -= n;
211 }
212 Ok(())
213 }
214}
215
216impl Skip for File {
217 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
218 self.seek(SeekFrom::Current(len as i64))?;
219 Ok(())
220 }
221}
222
223impl Skip for &[u8] {
224 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
225 let len = cmp::min(len, self.len());
226 *self = &self[len..];
227 Ok(())
228 }
229}
230
231impl<S: Skip + ?Sized> Skip for Box<S> {
232 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
233 self.as_mut().skip(len)
234 }
235}
236
237impl<T: AsRef<[u8]>> Skip for Cursor<T> {
238 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
239 self.seek(SeekFrom::Current(len as i64))?;
240 Ok(())
241 }
242}
243
244impl<R: Read> Skip for MultiGzDecoder<R> {}
245
246pub enum ValueOrReader<'a, V, R: AvroRead> {
247 Value(V),
248 Reader { len: usize, r: &'a mut R },
249}
250
251enum SchemaOrDefault<'b, R: AvroRead> {
252 Schema(&'b mut R, SchemaNode<'b>),
253 Default(&'b Value),
254}
255pub struct AvroFieldAccess<'b, R: AvroRead> {
256 schema: SchemaOrDefault<'b, R>,
257}
258
259impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
260 pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
261 match self.schema {
262 SchemaOrDefault::Schema(r, schema) => {
263 let des = GeneralDeserializer { schema };
264 des.deserialize(r, d)
265 }
266 SchemaOrDefault::Default(value) => give_value(d, value),
267 }
268 }
269}
270
271pub trait AvroRecordAccess<R: AvroRead> {
272 fn next_field<'b>(
273 &'b mut self,
274 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
275}
276
277struct SimpleRecordAccess<'a, R: AvroRead> {
278 schema: SchemaNode<'a>,
279 r: &'a mut R,
280 fields: &'a [RecordField],
281 i: usize,
282}
283
284impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
285 fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
286 Self {
287 schema,
288 r,
289 fields,
290 i: 0,
291 }
292 }
293}
294
295impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
296 fn next_field<'b>(
297 &'b mut self,
298 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
299 assert!(self.i <= self.fields.len());
300 if self.i == self.fields.len() {
301 Ok(None)
302 } else {
303 let f = &self.fields[self.i];
304 self.i += 1;
305 Ok(Some((
306 f.name.as_str(),
307 f.position,
308 AvroFieldAccess {
309 schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
310 },
311 )))
312 }
313 }
314}
315
316struct ValueRecordAccess<'a> {
317 values: &'a [(String, Value)],
318 i: usize,
319}
320
321impl<'a> ValueRecordAccess<'a> {
322 fn new(values: &'a [(String, Value)]) -> Self {
323 Self { values, i: 0 }
324 }
325}
326
327impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
328 fn next_field<'b>(
329 &'b mut self,
330 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
331 assert!(self.i <= self.values.len());
332 if self.i == self.values.len() {
333 Ok(None)
334 } else {
335 let (name, val) = &self.values[self.i];
336 self.i += 1;
337 Ok(Some((
338 name.as_str(),
339 self.i - 1,
340 AvroFieldAccess {
341 schema: SchemaOrDefault::Default(val),
342 },
343 )))
344 }
345 }
346}
347
348struct ValueMapAccess<'a> {
349 values: &'a [(String, Value)],
350 i: usize,
351}
352
353impl<'a> ValueMapAccess<'a> {
354 fn new(values: &'a [(String, Value)]) -> Self {
355 Self { values, i: 0 }
356 }
357}
358
359impl<'a> AvroMapAccess for ValueMapAccess<'a> {
360 type R = &'a [u8];
361 fn next_entry<'b>(
362 &'b mut self,
363 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
364 assert!(self.i <= self.values.len());
365 if self.i == self.values.len() {
366 Ok(None)
367 } else {
368 let (name, val) = &self.values[self.i];
369 self.i += 1;
370 Ok(Some((
371 name.clone(),
372 AvroFieldAccess {
373 schema: SchemaOrDefault::Default(val),
374 },
375 )))
376 }
377 }
378}
379
380struct ResolvedRecordAccess<'a, R: AvroRead> {
381 defaults: &'a [ResolvedDefaultValueField],
382 i_defaults: usize,
383 fields: &'a [ResolvedRecordField],
384 i_fields: usize,
385 r: &'a mut R,
386 schema: SchemaNode<'a>,
387}
388
389impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
390 fn new(
391 defaults: &'a [ResolvedDefaultValueField],
392 fields: &'a [ResolvedRecordField],
393 r: &'a mut R,
394 schema: SchemaNode<'a>,
395 ) -> Self {
396 Self {
397 defaults,
398 i_defaults: 0,
399 fields,
400 i_fields: 0,
401 r,
402 schema,
403 }
404 }
405}
406
407impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
408 fn next_field<'b>(
409 &'b mut self,
410 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
411 assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
412 if self.i_defaults < self.defaults.len() {
413 let default = &self.defaults[self.i_defaults];
414 self.i_defaults += 1;
415 Ok(Some((
416 default.name.as_str(),
417 default.position,
418 AvroFieldAccess {
419 schema: SchemaOrDefault::Default(&default.default),
420 },
421 )))
422 } else {
423 while self.i_fields < self.fields.len() {
424 let field = &self.fields[self.i_fields];
425 self.i_fields += 1;
426 match field {
427 ResolvedRecordField::Absent(absent_schema) => {
428 let d = GeneralDeserializer {
430 schema: absent_schema.top_node(),
431 };
432 d.deserialize(self.r, TrivialDecoder)?;
433 continue;
434 }
435 ResolvedRecordField::Present(field) => {
436 return Ok(Some((
437 field.name.as_str(),
438 field.position,
439 AvroFieldAccess {
440 schema: SchemaOrDefault::Schema(
441 self.r,
442 self.schema.step(&field.schema),
443 ),
444 },
445 )));
446 }
447 }
448 }
449 Ok(None)
450 }
451 }
452}
453
454pub trait AvroArrayAccess {
455 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
456}
457
458pub trait AvroMapAccess {
459 type R: AvroRead;
460 fn next_entry<'b>(
461 &'b mut self,
462 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
463}
464
465pub struct SimpleMapAccess<'a, R: AvroRead> {
466 entry_schema: SchemaNode<'a>,
467 r: &'a mut R,
468 done: bool,
469 remaining: usize,
470}
471
472impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
473 fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
474 Self {
475 entry_schema,
476 r,
477 done: false,
478 remaining: 0,
479 }
480 }
481}
482
483impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
484 type R = R;
485 fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
486 if self.done {
487 return Ok(None);
488 }
489 if self.remaining == 0 {
490 let (len, _len_in_bytes) = match zag_i64(self.r)? {
492 len if len > 0 => (len as usize, None),
493 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
494 0 => {
495 self.done = true;
496 return Ok(None);
497 }
498 _ => unreachable!(),
499 };
500 self.remaining = len;
501 }
502 assert!(self.remaining > 0);
503 self.remaining -= 1;
504
505 let key_len = decode_len(self.r)?;
508 let mut key_buf = vec![];
509 key_buf.resize_with(key_len, Default::default);
510 self.r.read_exact(&mut key_buf)?;
511 let key = String::from_utf8(key_buf)
512 .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
513
514 let a = AvroFieldAccess {
515 schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
516 };
517 Ok(Some((key, a)))
518 }
519}
520
521struct SimpleArrayAccess<'a, R: AvroRead> {
522 r: &'a mut R,
523 schema: SchemaNode<'a>,
524 remaining: usize,
525 done: bool,
526}
527
528impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
529 fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
530 Self {
531 r,
532 schema,
533 remaining: 0,
534 done: false,
535 }
536 }
537}
538
539struct ValueArrayAccess<'a> {
540 values: &'a [Value],
541 i: usize,
542}
543
544impl<'a> ValueArrayAccess<'a> {
545 fn new(values: &'a [Value]) -> Self {
546 Self { values, i: 0 }
547 }
548}
549
550impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
551 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
552 assert!(self.i <= self.values.len());
553 if self.i == self.values.len() {
554 Ok(None)
555 } else {
556 let val = give_value(d, &self.values[self.i])?;
557 self.i += 1;
558 Ok(Some(val))
559 }
560 }
561}
562
563impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
564 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
565 if self.done {
566 return Ok(None);
567 }
568 if self.remaining == 0 {
569 let (len, _len_in_bytes) = match zag_i64(self.r)? {
571 len if len > 0 => (len as usize, None),
572 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
573 0 => {
574 self.done = true;
575 return Ok(None);
576 }
577 _ => unreachable!(),
578 };
579 self.remaining = len;
580 }
581 assert!(self.remaining > 0);
582 self.remaining -= 1;
583 let des = GeneralDeserializer {
584 schema: self.schema,
585 };
586 des.deserialize(self.r, d).map(Some)
587 }
588}
589
590#[macro_export]
591macro_rules! define_unexpected {
592 (record) => {
593 fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
594 self,
595 _a: &mut A,
596 ) -> Result<Self::Out, $crate::error::Error> {
597 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
598 }
599 };
600 (union_branch) => {
601 fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
602 self,
603 _idx: usize,
604 _n_variants: usize,
605 _null_variant: Option<usize>,
606 _deserializer: D,
607 _reader: &'avro_macro_lifetime mut R,
608 ) -> Result<Self::Out, $crate::error::Error> {
609 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
610 }
611 };
612 (array) => {
613 fn array<A: $crate::AvroArrayAccess>(
614 self,
615 _a: &mut A,
616 ) -> Result<Self::Out, $crate::error::Error> {
617 Err($crate::error::Error::Decode(
618 $crate::error::DecodeError::UnexpectedArray,
619 ))
620 }
621 };
622 (map) => {
623 fn map<M: $crate::AvroMapAccess>(
624 self,
625 _m: &mut M,
626 ) -> Result<Self::Out, $crate::error::Error> {
627 Err($crate::error::Error::Decode(
628 $crate::error::DecodeError::UnexpectedMap,
629 ))
630 }
631 };
632 (enum_variant) => {
633 fn enum_variant(
634 self,
635 _symbol: &str,
636 _idx: usize,
637 ) -> Result<Self::Out, $crate::error::Error> {
638 Err($crate::error::Error::Decode(
639 $crate::error::DecodeError::UnexpectedEnum,
640 ))
641 }
642 };
643 (scalar) => {
644 fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
645 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
646 }
647 };
648 (decimal) => {
649 fn decimal<'avro_macro_lifetime, R: AvroRead>(
650 self,
651 _precision: usize,
652 _scale: usize,
653 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
654 ) -> Result<Self::Out, $crate::error::Error> {
655 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
656 }
657 };
658 (bytes) => {
659 fn bytes<'avro_macro_lifetime, R: AvroRead>(
660 self,
661 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
662 ) -> Result<Self::Out, $crate::error::Error> {
663 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
664 }
665 };
666 (string) => {
667 fn string<'avro_macro_lifetime, R: AvroRead>(
668 self,
669 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
670 ) -> Result<Self::Out, $crate::error::Error> {
671 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
672 }
673 };
674 (json) => {
675 fn json<'avro_macro_lifetime, R: AvroRead>(
676 self,
677 _r: $crate::ValueOrReader<
678 'avro_macro_lifetime,
679 &'avro_macro_lifetime serde_json::Value,
680 R,
681 >,
682 ) -> Result<Self::Out, $crate::error::Error> {
683 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
684 }
685 };
686 (uuid) => {
687 fn uuid<'avro_macro_lifetime, R: AvroRead>(
688 self,
689 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
690 ) -> Result<Self::Out, $crate::error::Error> {
691 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
692 }
693 };
694 (fixed) => {
695 fn fixed<'avro_macro_lifetime, R: AvroRead>(
696 self,
697 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
698 ) -> Result<Self::Out, $crate::error::Error> {
699 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
700 }
701 };
702 ($($kind:ident),+) => {
703 $($crate::define_unexpected!{$kind})+
704 }
705}
706
707pub trait AvroDecode: Sized {
708 type Out;
709 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
710 self,
711 _a: &mut A,
712 ) -> Result<Self::Out, AvroError>;
713
714 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
715 self,
716 _idx: usize,
717 _n_variants: usize,
718 _null_variant: Option<usize>,
719 _deserializer: D,
720 _reader: &'a mut R,
721 ) -> Result<Self::Out, AvroError>;
722
723 fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
724
725 fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
726
727 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
728
729 fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
730
731 fn decimal<'a, R: AvroRead>(
732 self,
733 _precision: usize,
734 _scale: usize,
735 _r: ValueOrReader<'a, &'a [u8], R>,
736 ) -> Result<Self::Out, AvroError>;
737
738 fn bytes<'a, R: AvroRead>(
739 self,
740 _r: ValueOrReader<'a, &'a [u8], R>,
741 ) -> Result<Self::Out, AvroError>;
742 fn string<'a, R: AvroRead>(
743 self,
744 _r: ValueOrReader<'a, &'a str, R>,
745 ) -> Result<Self::Out, AvroError>;
746 fn json<'a, R: AvroRead>(
747 self,
748 _r: ValueOrReader<'a, &'a serde_json::Value, R>,
749 ) -> Result<Self::Out, AvroError>;
750 fn uuid<'a, R: AvroRead>(
751 self,
752 _r: ValueOrReader<'a, &'a [u8], R>,
753 ) -> Result<Self::Out, AvroError>;
754 fn fixed<'a, R: AvroRead>(
755 self,
756 _r: ValueOrReader<'a, &'a [u8], R>,
757 ) -> Result<Self::Out, AvroError>;
758 fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
759 self,
760 f: F,
761 ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
762 public_decoders::MappingDecoder::new(self, f)
763 }
764}
765
766pub mod public_decoders {
767
768 use std::collections::BTreeMap;
769
770 use crate::error::{DecodeError, Error as AvroError};
771 use crate::types::{DecimalValue, Scalar, Value};
772 use crate::{
773 AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
774 };
775
776 use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
777
778 macro_rules! define_simple_decoder {
779 ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
780 pub struct $name;
781 impl AvroDecode for $name {
782 type Out = $out;
783 fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
784 let out = match scalar {
785 $(
786 Scalar::$scalar_branch(inner) => {inner.try_into()?}
787 ),*
788 other => return Err(AvroError::Decode(
789 DecodeError::UnexpectedScalarKind(other.into()),
790 ))
791 };
792 Ok(out)
793 }
794 define_unexpected! {
795 array, record, union_branch, map,
796 enum_variant, decimal, bytes, string,
797 json, uuid, fixed
798 }
799 }
800
801 impl StatefulAvroDecodable for $out {
802 type Decoder = $name;
803 type State = ();
804 fn new_decoder(_state: ()) -> $name {
805 $name
806 }
807 }
808 }
809 }
810
811 define_simple_decoder!(I32Decoder, i32, Int;Long);
812 define_simple_decoder!(I64Decoder, i64, Int;Long);
813 define_simple_decoder!(U64Decoder, u64, Int;Long);
814 define_simple_decoder!(UsizeDecoder, usize, Int;Long);
815 define_simple_decoder!(IsizeDecoder, isize, Int;Long);
816
817 pub struct MappingDecoder<
818 T,
819 InnerOut,
820 Inner: AvroDecode<Out = InnerOut>,
821 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
822 > {
823 inner: Inner,
824 conv: Conv,
825 }
826
827 impl<
828 T,
829 InnerOut,
830 Inner: AvroDecode<Out = InnerOut>,
831 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
832 > MappingDecoder<T, InnerOut, Inner, Conv>
833 {
834 pub fn new(inner: Inner, conv: Conv) -> Self {
835 Self { inner, conv }
836 }
837 }
838
839 impl<
840 T,
841 InnerOut,
842 Inner: AvroDecode<Out = InnerOut>,
843 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
844 > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
845 {
846 type Out = T;
847
848 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
849 mut self,
850 a: &mut A,
851 ) -> Result<Self::Out, AvroError> {
852 (self.conv)(self.inner.record(a)?)
853 }
854
855 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
856 mut self,
857 idx: usize,
858 n_variants: usize,
859 null_variant: Option<usize>,
860 deserializer: D,
861 reader: &'a mut R,
862 ) -> Result<Self::Out, AvroError> {
863 (self.conv)(self.inner.union_branch(
864 idx,
865 n_variants,
866 null_variant,
867 deserializer,
868 reader,
869 )?)
870 }
871
872 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
873 (self.conv)(self.inner.array(a)?)
874 }
875
876 fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
877 (self.conv)(self.inner.map(m)?)
878 }
879
880 fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
881 (self.conv)(self.inner.enum_variant(symbol, idx)?)
882 }
883
884 fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
885 (self.conv)(self.inner.scalar(scalar)?)
886 }
887
888 fn decimal<'a, R: AvroRead>(
889 mut self,
890 precision: usize,
891 scale: usize,
892 r: ValueOrReader<'a, &'a [u8], R>,
893 ) -> Result<Self::Out, AvroError> {
894 (self.conv)(self.inner.decimal(precision, scale, r)?)
895 }
896
897 fn bytes<'a, R: AvroRead>(
898 mut self,
899 r: ValueOrReader<'a, &'a [u8], R>,
900 ) -> Result<Self::Out, AvroError> {
901 (self.conv)(self.inner.bytes(r)?)
902 }
903
904 fn string<'a, R: AvroRead>(
905 mut self,
906 r: ValueOrReader<'a, &'a str, R>,
907 ) -> Result<Self::Out, AvroError> {
908 (self.conv)(self.inner.string(r)?)
909 }
910
911 fn json<'a, R: AvroRead>(
912 mut self,
913 r: ValueOrReader<'a, &'a serde_json::Value, R>,
914 ) -> Result<Self::Out, AvroError> {
915 (self.conv)(self.inner.json(r)?)
916 }
917
918 fn uuid<'a, R: AvroRead>(
919 mut self,
920 r: ValueOrReader<'a, &'a [u8], R>,
921 ) -> Result<Self::Out, AvroError> {
922 (self.conv)(self.inner.uuid(r)?)
923 }
924
925 fn fixed<'a, R: AvroRead>(
926 mut self,
927 r: ValueOrReader<'a, &'a [u8], R>,
928 ) -> Result<Self::Out, AvroError> {
929 (self.conv)(self.inner.fixed(r)?)
930 }
931 }
932 pub struct ArrayAsVecDecoder<
933 InnerOut,
934 Inner: AvroDecode<Out = InnerOut>,
935 Ctor: FnMut() -> Inner,
936 > {
937 ctor: Ctor,
938 buf: Vec<InnerOut>,
939 }
940
941 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
942 ArrayAsVecDecoder<InnerOut, Inner, Ctor>
943 {
944 pub fn new(ctor: Ctor) -> Self {
945 Self { ctor, buf: vec![] }
946 }
947 }
948 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
949 for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
950 {
951 type Out = Vec<InnerOut>;
952 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
953 while let Some(next) = a.decode_next((self.ctor)())? {
954 self.buf.push(next);
955 }
956 Ok(self.buf)
957 }
958 define_unexpected! {
959 record, union_branch, map, enum_variant,
960 scalar, decimal, bytes, string, json, uuid,
961 fixed
962 }
963 }
964
965 pub struct DefaultArrayAsVecDecoder<T> {
966 buf: Vec<T>,
967 }
968 impl<T> Default for DefaultArrayAsVecDecoder<T> {
969 fn default() -> Self {
970 Self { buf: vec![] }
971 }
972 }
973 impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
974 type Out = Vec<T>;
975 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
976 while let Some(next) = {
977 let inner = T::new_decoder();
978 a.decode_next(inner)?
979 } {
980 self.buf.push(next);
981 }
982 Ok(self.buf)
983 }
984 define_unexpected! {
985 record, union_branch, map, enum_variant,
986 scalar, decimal, bytes, string, json, uuid,
987 fixed
988 }
989 }
990 impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
991 type Decoder = DefaultArrayAsVecDecoder<T>;
992 type State = ();
993
994 fn new_decoder(_state: Self::State) -> Self::Decoder {
995 DefaultArrayAsVecDecoder::<T>::default()
996 }
997 }
998 pub struct TrivialDecoder;
999
1000 impl TrivialDecoder {
1001 fn maybe_skip<'a, V, R: AvroRead>(
1002 self,
1003 r: ValueOrReader<'a, V, R>,
1004 ) -> Result<(), AvroError> {
1005 if let ValueOrReader::Reader { len, r } = r {
1006 Ok(r.skip(len)?)
1007 } else {
1008 Ok(())
1009 }
1010 }
1011 }
1012
1013 impl AvroDecode for TrivialDecoder {
1014 type Out = ();
1015 fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
1016 while let Some((_, _, f)) = a.next_field()? {
1017 f.decode_field(TrivialDecoder)?;
1018 }
1019 Ok(())
1020 }
1021 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1022 self,
1023 _idx: usize,
1024 _n_variants: usize,
1025 _null_variant: Option<usize>,
1026 deserializer: D,
1027 reader: &'a mut R,
1028 ) -> Result<(), AvroError> {
1029 deserializer.deserialize(reader, self)
1030 }
1031
1032 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1033 Ok(())
1034 }
1035 fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1036 Ok(())
1037 }
1038 fn decimal<'a, R: AvroRead>(
1039 self,
1040 _precision: usize,
1041 _scale: usize,
1042 r: ValueOrReader<'a, &'a [u8], R>,
1043 ) -> Result<(), AvroError> {
1044 self.maybe_skip(r)
1045 }
1046 fn bytes<'a, R: AvroRead>(
1047 self,
1048 r: ValueOrReader<'a, &'a [u8], R>,
1049 ) -> Result<(), AvroError> {
1050 self.maybe_skip(r)
1051 }
1052 fn string<'a, R: AvroRead>(
1053 self,
1054 r: ValueOrReader<'a, &'a str, R>,
1055 ) -> Result<(), AvroError> {
1056 self.maybe_skip(r)
1057 }
1058 fn json<'a, R: AvroRead>(
1059 self,
1060 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1061 ) -> Result<(), AvroError> {
1062 self.maybe_skip(r)
1063 }
1064 fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1065 self.maybe_skip(r)
1066 }
1067 fn fixed<'a, R: AvroRead>(
1068 self,
1069 r: ValueOrReader<'a, &'a [u8], R>,
1070 ) -> Result<(), AvroError> {
1071 self.maybe_skip(r)
1072 }
1073 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1074 while a.decode_next(TrivialDecoder)?.is_some() {}
1075 Ok(())
1076 }
1077
1078 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1079 while let Some((_n, entry)) = m.next_entry()? {
1080 entry.decode_field(TrivialDecoder)?
1081 }
1082 Ok(())
1083 }
1084 }
1085 pub struct ValueDecoder;
1086 impl AvroDecode for ValueDecoder {
1087 type Out = Value;
1088 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1089 self,
1090 a: &mut A,
1091 ) -> Result<Value, AvroError> {
1092 let mut fields = vec![];
1093 while let Some((name, idx, f)) = a.next_field()? {
1094 let next = ValueDecoder;
1095 let val = f.decode_field(next)?;
1096 fields.push((idx, (name.to_string(), val)));
1097 }
1098 fields.sort_by_key(|(idx, _)| *idx);
1099
1100 Ok(Value::Record(
1101 fields
1102 .into_iter()
1103 .map(|(_idx, (name, val))| (name, val))
1104 .collect(),
1105 ))
1106 }
1107 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1108 self,
1109 index: usize,
1110 n_variants: usize,
1111 null_variant: Option<usize>,
1112 deserializer: D,
1113 reader: &'a mut R,
1114 ) -> Result<Value, AvroError> {
1115 let next = ValueDecoder;
1116 let inner = Box::new(deserializer.deserialize(reader, next)?);
1117 Ok(Value::Union {
1118 index,
1119 inner,
1120 n_variants,
1121 null_variant,
1122 })
1123 }
1124 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1125 let mut items = vec![];
1126 loop {
1127 let next = ValueDecoder;
1128
1129 if let Some(value) = a.decode_next(next)? {
1130 items.push(value)
1131 } else {
1132 break;
1133 }
1134 }
1135 Ok(Value::Array(items))
1136 }
1137 fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1138 Ok(Value::Enum(idx, symbol.to_string()))
1139 }
1140 fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1141 Ok(scalar.into())
1142 }
1143 fn decimal<'a, R: AvroRead>(
1144 self,
1145 precision: usize,
1146 scale: usize,
1147 r: ValueOrReader<'a, &'a [u8], R>,
1148 ) -> Result<Value, AvroError> {
1149 let unscaled = match r {
1150 ValueOrReader::Value(buf) => buf.to_vec(),
1151 ValueOrReader::Reader { len, r } => {
1152 let mut buf = vec![];
1153 buf.resize_with(len, Default::default);
1154 r.read_exact(&mut buf)?;
1155 buf
1156 }
1157 };
1158 Ok(Value::Decimal(DecimalValue {
1159 unscaled,
1160 precision,
1161 scale,
1162 }))
1163 }
1164 fn bytes<'a, R: AvroRead>(
1165 self,
1166 r: ValueOrReader<'a, &'a [u8], R>,
1167 ) -> Result<Value, AvroError> {
1168 let buf = match r {
1169 ValueOrReader::Value(buf) => buf.to_vec(),
1170 ValueOrReader::Reader { len, r } => {
1171 let mut buf = vec![];
1172 buf.resize_with(len, Default::default);
1173 r.read_exact(&mut buf)?;
1174 buf
1175 }
1176 };
1177 Ok(Value::Bytes(buf))
1178 }
1179 fn string<'a, R: AvroRead>(
1180 self,
1181 r: ValueOrReader<'a, &'a str, R>,
1182 ) -> Result<Value, AvroError> {
1183 let s = match r {
1184 ValueOrReader::Value(s) => s.to_string(),
1185 ValueOrReader::Reader { len, r } => {
1186 let mut buf = vec![];
1187 buf.resize_with(len, Default::default);
1188 r.read_exact(&mut buf)?;
1189 String::from_utf8(buf)
1190 .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1191 }
1192 };
1193 Ok(Value::String(s))
1194 }
1195 fn json<'a, R: AvroRead>(
1196 self,
1197 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1198 ) -> Result<Value, AvroError> {
1199 let val = match r {
1200 ValueOrReader::Value(val) => val.clone(),
1201 ValueOrReader::Reader { len, r } => {
1202 let mut buf = vec![];
1203 buf.resize_with(len, Default::default);
1204 r.read_exact(&mut buf)?;
1205 serde_json::from_slice(&buf).map_err(|e| {
1206 AvroError::Decode(DecodeError::BadJson {
1207 category: e.classify(),
1208 bytes: buf.to_owned(),
1209 })
1210 })?
1211 }
1212 };
1213 Ok(Value::Json(val))
1214 }
1215 fn uuid<'a, R: AvroRead>(
1216 self,
1217 r: ValueOrReader<'a, &'a [u8], R>,
1218 ) -> Result<Value, AvroError> {
1219 let buf = match r {
1220 ValueOrReader::Value(val) => val.to_vec(),
1221 ValueOrReader::Reader { len, r } => {
1222 let mut buf = vec![];
1223 buf.resize_with(len, Default::default);
1224 r.read_exact(&mut buf)?;
1225 buf
1226 }
1227 };
1228 let s = std::str::from_utf8(&buf)
1229 .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1230 let val =
1231 uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1232 Ok(Value::Uuid(val))
1233 }
1234 fn fixed<'a, R: AvroRead>(
1235 self,
1236 r: ValueOrReader<'a, &'a [u8], R>,
1237 ) -> Result<Value, AvroError> {
1238 let buf = match r {
1239 ValueOrReader::Value(buf) => buf.to_vec(),
1240 ValueOrReader::Reader { len, r } => {
1241 let mut buf = vec![];
1242 buf.resize_with(len, Default::default);
1243 r.read_exact(&mut buf)?;
1244 buf
1245 }
1246 };
1247 Ok(Value::Fixed(buf.len(), buf))
1248 }
1249 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1250 let mut entries = BTreeMap::new();
1251 while let Some((name, a)) = m.next_entry()? {
1252 let d = ValueDecoder;
1253 let val = a.decode_field(d)?;
1254 entries.insert(name, val);
1255 }
1256 Ok(Value::Map(entries))
1257 }
1258 }
1259}
1260
1261impl<'a> AvroDeserializer for &'a Value {
1262 fn deserialize<R: AvroRead, D: AvroDecode>(
1263 self,
1264 _r: &mut R,
1265 d: D,
1266 ) -> Result<D::Out, AvroError> {
1267 give_value(d, self)
1268 }
1269}
1270
1271pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1272 use ValueOrReader::Value as V;
1273 match v {
1274 Value::Null => d.scalar(Scalar::Null),
1275 Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1276 Value::Int(val) => d.scalar(Scalar::Int(*val)),
1277 Value::Long(val) => d.scalar(Scalar::Long(*val)),
1278 Value::Float(val) => d.scalar(Scalar::Float(*val)),
1279 Value::Double(val) => d.scalar(Scalar::Double(*val)),
1280 Value::Date(val) => d.scalar(Scalar::Date(*val)),
1281 Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1282 Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1285 Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1286 Value::String(val) => d.string::<&[u8]>(V(val)),
1287 Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1288 Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1289 Value::Union {
1290 index,
1291 inner,
1292 n_variants,
1293 null_variant,
1294 } => {
1295 let mut empty_reader: &[u8] = &[];
1296 d.union_branch(
1297 *index,
1298 *n_variants,
1299 *null_variant,
1300 &**inner,
1301 &mut empty_reader,
1302 )
1303 }
1304 Value::Array(val) => {
1305 let mut a = ValueArrayAccess::new(val);
1306 d.array(&mut a)
1307 }
1308 Value::Map(val) => {
1309 let vals: Vec<_> = val.clone().into_iter().collect();
1310 let mut m = ValueMapAccess::new(vals.as_slice());
1311 d.map(&mut m)
1312 }
1313 Value::Record(val) => {
1314 let mut a = ValueRecordAccess::new(val);
1315 d.record(&mut a)
1316 }
1317 Value::Json(val) => d.json::<&[u8]>(V(val)),
1318 Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1319 }
1320}
1321
1322pub trait AvroDeserializer {
1323 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1324}
1325
1326#[derive(Clone, Copy)]
1327pub struct GeneralDeserializer<'a> {
1328 pub schema: SchemaNode<'a>,
1329}
1330
1331impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1332 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1333 use ValueOrReader::Reader;
1334 match self.schema.inner {
1335 SchemaPiece::Null => d.scalar(Scalar::Null),
1336 SchemaPiece::Boolean => {
1337 let mut buf = [0u8; 1];
1338 r.read_exact(&mut buf[..])?;
1339 let val = match buf[0] {
1340 0u8 => false,
1341 1u8 => true,
1342 other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
1343 };
1344 d.scalar(Scalar::Boolean(val))
1345 }
1346 SchemaPiece::Int => {
1347 let val = zag_i32(r)?;
1348 d.scalar(Scalar::Int(val))
1349 }
1350 SchemaPiece::Long => {
1351 let val = zag_i64(r)?;
1352 d.scalar(Scalar::Long(val))
1353 }
1354 SchemaPiece::Float => {
1355 let val = decode_float(r)?;
1356 d.scalar(Scalar::Float(val))
1357 }
1358 SchemaPiece::Double => {
1359 let val = decode_double(r)?;
1360 d.scalar(Scalar::Double(val))
1361 }
1362 SchemaPiece::Date => {
1363 let days = zag_i32(r)?;
1364 d.scalar(Scalar::Date(days))
1365 }
1366 SchemaPiece::TimestampMilli => {
1367 let total_millis = zag_i64(r)?;
1368 let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
1369 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1370 _ => unreachable!(),
1371 };
1372 d.scalar(scalar)
1373 }
1374 SchemaPiece::TimestampMicro => {
1375 let total_micros = zag_i64(r)?;
1376 let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
1377 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1378 _ => unreachable!(),
1379 };
1380 d.scalar(scalar)
1381 }
1382 SchemaPiece::Decimal {
1383 precision,
1384 scale,
1385 fixed_size,
1386 } => {
1387 let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
1388 d.decimal(*precision, *scale, Reader { len, r })
1389 }
1390 SchemaPiece::Bytes => {
1391 let len = decode_len(r)?;
1392 d.bytes(Reader { len, r })
1393 }
1394 SchemaPiece::String => {
1395 let len = decode_len(r)?;
1396 d.string(Reader { len, r })
1397 }
1398 SchemaPiece::Json => {
1399 let len = decode_len(r)?;
1400 d.json(Reader { len, r })
1401 }
1402 SchemaPiece::Uuid => {
1403 let len = decode_len(r)?;
1404 d.uuid(Reader { len, r })
1405 }
1406 SchemaPiece::Array(inner) => {
1407 let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
1412 d.array(&mut a)
1413 }
1414 SchemaPiece::Map(inner) => {
1415 let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
1417 d.map(&mut m)
1418 }
1419 SchemaPiece::Union(inner) => {
1420 let index = decode_long_nonneg(r)? as usize;
1421 let variants = inner.variants();
1422 match variants.get(index) {
1423 Some(variant) => {
1424 let n_variants = variants.len();
1425 let null_variant = variants
1426 .iter()
1427 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
1428 let dsr = GeneralDeserializer {
1429 schema: self.schema.step(variant),
1430 };
1431 d.union_branch(index, n_variants, null_variant, dsr, r)
1432 }
1433 None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
1434 index,
1435 len: variants.len(),
1436 })),
1437 }
1438 }
1439 SchemaPiece::ResolveIntLong => {
1440 let val = zag_i32(r)? as i64;
1441 d.scalar(Scalar::Long(val))
1442 }
1443 SchemaPiece::ResolveIntFloat => {
1444 let val = zag_i32(r)? as f32;
1445 d.scalar(Scalar::Float(val))
1446 }
1447 SchemaPiece::ResolveIntDouble => {
1448 let val = zag_i32(r)? as f64;
1449 d.scalar(Scalar::Double(val))
1450 }
1451 SchemaPiece::ResolveLongFloat => {
1452 let val = zag_i64(r)? as f32;
1453 d.scalar(Scalar::Float(val))
1454 }
1455 SchemaPiece::ResolveLongDouble => {
1456 let val = zag_i64(r)? as f64;
1457 d.scalar(Scalar::Double(val))
1458 }
1459 SchemaPiece::ResolveFloatDouble => {
1460 let val = decode_float(r)? as f64;
1461 d.scalar(Scalar::Double(val))
1462 }
1463 SchemaPiece::ResolveConcreteUnion {
1464 index,
1465 inner,
1466 n_reader_variants,
1467 reader_null_variant,
1468 } => {
1469 let dsr = GeneralDeserializer {
1470 schema: self.schema.step(&**inner),
1471 };
1472 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1473 }
1474 SchemaPiece::ResolveUnionUnion {
1475 permutation,
1476 n_reader_variants,
1477 reader_null_variant,
1478 } => {
1479 let index = decode_long_nonneg(r)? as usize;
1480 if index >= permutation.len() {
1481 return Err(AvroError::Decode(DecodeError::BadUnionIndex {
1482 index,
1483 len: permutation.len(),
1484 }));
1485 }
1486 match &permutation[index] {
1487 Err(e) => Err(e.clone()),
1488 Ok((index, variant)) => {
1489 let dsr = GeneralDeserializer {
1490 schema: self.schema.step(variant),
1491 };
1492 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1493 }
1494 }
1495 }
1496 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1497 let found_index = decode_long_nonneg(r)? as usize;
1498 if *index != found_index {
1499 Err(AvroError::Decode(DecodeError::WrongUnionIndex {
1500 expected: *index,
1501 actual: found_index,
1502 }))
1503 } else {
1504 let dsr = GeneralDeserializer {
1505 schema: self.schema.step(inner.as_ref()),
1506 };
1507 dsr.deserialize(r, d)
1509 }
1510 }
1511 SchemaPiece::Record {
1512 doc: _,
1513 fields,
1514 lookup: _,
1515 } => {
1516 let mut a = SimpleRecordAccess::new(self.schema, r, fields);
1517 d.record(&mut a)
1518 }
1519 SchemaPiece::Enum {
1520 symbols,
1521 doc: _,
1522 default_idx: _,
1523 } => {
1524 let index = decode_int_nonneg(r)? as usize;
1525 match symbols.get(index) {
1526 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1527 index,
1528 len: symbols.len(),
1529 })),
1530 Some(symbol) => d.enum_variant(symbol, index),
1531 }
1532 }
1533 SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
1534 SchemaPiece::ResolveRecord {
1540 defaults,
1541 fields,
1542 n_reader_fields: _,
1543 } => {
1544 let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
1545 d.record(&mut a)
1546 }
1547 SchemaPiece::ResolveEnum {
1548 doc: _,
1549 symbols,
1550 default,
1551 } => {
1552 let index = decode_int_nonneg(r)? as usize;
1553 match symbols.get(index) {
1554 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1555 index,
1556 len: symbols.len(),
1557 })),
1558 Some(op) => match op {
1559 Err(missing) => {
1560 if let Some((reader_index, symbol)) = default.clone() {
1561 d.enum_variant(&symbol, reader_index)
1562 } else {
1563 Err(AvroError::Decode(DecodeError::MissingEnumIndex {
1564 index,
1565 symbol: missing.clone(),
1566 }))
1567 }
1568 }
1569 Ok((index, name)) => d.enum_variant(name, *index),
1570 },
1571 }
1572 }
1573 SchemaPiece::ResolveIntTsMilli => {
1574 let total_millis = zag_i32(r)?;
1575 let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
1576 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1577 _ => unreachable!(),
1578 };
1579 d.scalar(scalar)
1580 }
1581 SchemaPiece::ResolveIntTsMicro => {
1582 let total_micros = zag_i32(r)?;
1583 let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
1584 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1585 _ => unreachable!(),
1586 };
1587 d.scalar(scalar)
1588 }
1589 SchemaPiece::ResolveDateTimestamp => {
1590 let days = zag_i32(r)?;
1591
1592 let date = NaiveDate::from_ymd_opt(1970, 1, 1)
1593 .expect("naive date known valid")
1594 .checked_add_signed(
1595 chrono::Duration::try_days(days.into())
1596 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
1597 )
1598 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
1599 let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
1600 d.scalar(Scalar::Timestamp(dt))
1601 }
1602 }
1603 }
1604}
1605pub fn decode<'a, R: AvroRead>(
1607 schema: SchemaNode<'a>,
1608 reader: &'a mut R,
1609) -> Result<Value, AvroError> {
1610 let d = ValueDecoder;
1611 let dsr = GeneralDeserializer { schema };
1612 let val = dsr.deserialize(reader, d)?;
1613 Ok(val)
1614}