1use std::cmp;
25use std::fmt::{self, Display};
26use std::fs::File;
27use std::io::{self, Cursor, Read, Seek, SeekFrom};
28
29use chrono::{DateTime, NaiveDate};
30use flate2::read::MultiGzDecoder;
31
32use crate::error::{DecodeError, Error as AvroError};
33use crate::schema::{
34 RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece,
35 SchemaPieceOrNamed,
36};
37use crate::types::{Scalar, Value};
38use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
39use crate::{TrivialDecoder, ValueDecoder};
40
41pub trait StatefulAvroDecodable: Sized {
42 type Decoder: AvroDecode<Out = Self>;
43 type State;
44 fn new_decoder(state: Self::State) -> Self::Decoder;
45}
46pub trait AvroDecodable: Sized {
47 type Decoder: AvroDecode<Out = Self>;
48
49 fn new_decoder() -> Self::Decoder;
50}
51impl<T> AvroDecodable for T
52where
53 T: StatefulAvroDecodable,
54 T::State: Default,
55{
56 type Decoder = <Self as StatefulAvroDecodable>::Decoder;
57
58 fn new_decoder() -> Self::Decoder {
59 <Self as StatefulAvroDecodable>::new_decoder(Default::default())
60 }
61}
62#[inline]
63fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
64 let u = match zag_i64(reader)? {
65 i if i >= 0 => i as u64,
66 i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
67 };
68 Ok(u)
69}
70
71fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
72 let u = match zag_i32(reader)? {
73 i if i >= 0 => i as u32,
74 i => {
75 return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
76 i as i64,
77 )));
78 }
79 };
80 Ok(u)
81}
82
83#[inline]
84fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
85 zag_i64(reader).and_then(|i| safe_len(i as usize))
86}
87
88#[inline]
89fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
90 let mut buf = [0u8; 4];
91 reader.read_exact(&mut buf[..])?;
92 Ok(f32::from_le_bytes(buf))
93}
94
95#[inline]
96fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
97 let mut buf = [0u8; 8];
98 reader.read_exact(&mut buf[..])?;
99 Ok(f64::from_le_bytes(buf))
100}
101
102impl Display for TsUnit {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 TsUnit::Millis => write!(f, "ms"),
106 TsUnit::Micros => write!(f, "us"),
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use chrono::DateTime;
114
115 use crate::types::Value;
116 use crate::util::TsUnit;
117
118 use super::build_ts_value;
119
120 #[mz_ore::test]
121 fn test_negative_timestamps() {
122 assert_eq!(
123 build_ts_value(-1, TsUnit::Millis).unwrap(),
124 Value::Timestamp(
125 DateTime::from_timestamp(-1, 999_000_000)
126 .unwrap()
127 .naive_utc()
128 )
129 );
130 assert_eq!(
131 build_ts_value(-1000, TsUnit::Millis).unwrap(),
132 Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
133 );
134 assert_eq!(
135 build_ts_value(-1000, TsUnit::Micros).unwrap(),
136 Value::Timestamp(
137 DateTime::from_timestamp(-1, 999_000_000)
138 .unwrap()
139 .naive_utc()
140 )
141 );
142 assert_eq!(
143 build_ts_value(-1, TsUnit::Micros).unwrap(),
144 Value::Timestamp(
145 DateTime::from_timestamp(-1, 999_999_000)
146 .unwrap()
147 .naive_utc()
148 )
149 );
150 assert_eq!(
151 build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
152 Value::Timestamp(
153 DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
154 .unwrap()
155 .naive_utc()
156 )
157 );
158 }
159}
160
161pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
163 let result = match unit {
164 TsUnit::Millis => DateTime::from_timestamp_millis(value),
165 TsUnit::Micros => DateTime::from_timestamp_micros(value),
166 };
167 let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
168 Ok(Value::Timestamp(ndt.naive_utc()))
169}
170
171pub trait AvroRead: Read + Skip {}
176
177impl<T> AvroRead for T where T: Read + Skip {}
178
179pub trait Skip: Read {
181 #[allow(clippy::unused_io_amount)]
197 fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
198 const BUF_SIZE: usize = 512;
199 let mut buf = [0; BUF_SIZE];
200
201 while len > 0 {
202 let n = if len < BUF_SIZE {
203 self.read(&mut buf[..len])?
204 } else {
205 self.read(&mut buf)?
206 };
207 if n == 0 {
208 break;
209 }
210 len -= n;
211 }
212 Ok(())
213 }
214}
215
216impl Skip for File {
217 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
218 self.seek(SeekFrom::Current(len as i64))?;
219 Ok(())
220 }
221}
222
223impl Skip for &[u8] {
224 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
225 let len = cmp::min(len, self.len());
226 *self = &self[len..];
227 Ok(())
228 }
229}
230
231impl<S: Skip + ?Sized> Skip for Box<S> {
232 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
233 self.as_mut().skip(len)
234 }
235}
236
237impl<T: AsRef<[u8]>> Skip for Cursor<T> {
238 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
239 self.seek(SeekFrom::Current(len as i64))?;
240 Ok(())
241 }
242}
243
244impl<R: Read> Skip for MultiGzDecoder<R> {}
245
246pub enum ValueOrReader<'a, V, R: AvroRead> {
247 Value(V),
248 Reader { len: usize, r: &'a mut R },
249}
250
251enum SchemaOrDefault<'b, R: AvroRead> {
252 Schema(&'b mut R, SchemaNode<'b>),
253 Default(&'b Value),
254}
255pub struct AvroFieldAccess<'b, R: AvroRead> {
256 schema: SchemaOrDefault<'b, R>,
257}
258
259impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
260 pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
261 match self.schema {
262 SchemaOrDefault::Schema(r, schema) => {
263 let des = GeneralDeserializer { schema };
264 des.deserialize(r, d)
265 }
266 SchemaOrDefault::Default(value) => give_value(d, value),
267 }
268 }
269}
270
271pub trait AvroRecordAccess<R: AvroRead> {
272 fn next_field<'b>(
273 &'b mut self,
274 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
275}
276
277struct SimpleRecordAccess<'a, R: AvroRead> {
278 schema: SchemaNode<'a>,
279 r: &'a mut R,
280 fields: &'a [RecordField],
281 i: usize,
282}
283
284impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
285 fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
286 Self {
287 schema,
288 r,
289 fields,
290 i: 0,
291 }
292 }
293}
294
295impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
296 fn next_field<'b>(
297 &'b mut self,
298 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
299 assert!(self.i <= self.fields.len());
300 if self.i == self.fields.len() {
301 Ok(None)
302 } else {
303 let f = &self.fields[self.i];
304 self.i += 1;
305 Ok(Some((
306 f.name.as_str(),
307 f.position,
308 AvroFieldAccess {
309 schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
310 },
311 )))
312 }
313 }
314}
315
316struct ValueRecordAccess<'a> {
317 values: &'a [(String, Value)],
318 i: usize,
319}
320
321impl<'a> ValueRecordAccess<'a> {
322 fn new(values: &'a [(String, Value)]) -> Self {
323 Self { values, i: 0 }
324 }
325}
326
327impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
328 fn next_field<'b>(
329 &'b mut self,
330 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
331 assert!(self.i <= self.values.len());
332 if self.i == self.values.len() {
333 Ok(None)
334 } else {
335 let (name, val) = &self.values[self.i];
336 self.i += 1;
337 Ok(Some((
338 name.as_str(),
339 self.i - 1,
340 AvroFieldAccess {
341 schema: SchemaOrDefault::Default(val),
342 },
343 )))
344 }
345 }
346}
347
348struct ValueMapAccess<'a> {
349 values: &'a [(String, Value)],
350 i: usize,
351}
352
353impl<'a> ValueMapAccess<'a> {
354 fn new(values: &'a [(String, Value)]) -> Self {
355 Self { values, i: 0 }
356 }
357}
358
359impl<'a> AvroMapAccess for ValueMapAccess<'a> {
360 type R = &'a [u8];
361 fn next_entry<'b>(
362 &'b mut self,
363 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
364 assert!(self.i <= self.values.len());
365 if self.i == self.values.len() {
366 Ok(None)
367 } else {
368 let (name, val) = &self.values[self.i];
369 self.i += 1;
370 Ok(Some((
371 name.clone(),
372 AvroFieldAccess {
373 schema: SchemaOrDefault::Default(val),
374 },
375 )))
376 }
377 }
378}
379
380struct ResolvedRecordAccess<'a, R: AvroRead> {
381 defaults: &'a [ResolvedDefaultValueField],
382 i_defaults: usize,
383 fields: &'a [ResolvedRecordField],
384 i_fields: usize,
385 r: &'a mut R,
386 schema: SchemaNode<'a>,
387}
388
389impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
390 fn new(
391 defaults: &'a [ResolvedDefaultValueField],
392 fields: &'a [ResolvedRecordField],
393 r: &'a mut R,
394 schema: SchemaNode<'a>,
395 ) -> Self {
396 Self {
397 defaults,
398 i_defaults: 0,
399 fields,
400 i_fields: 0,
401 r,
402 schema,
403 }
404 }
405}
406
407impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
408 fn next_field<'b>(
409 &'b mut self,
410 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
411 assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
412 if self.i_defaults < self.defaults.len() {
413 let default = &self.defaults[self.i_defaults];
414 self.i_defaults += 1;
415 Ok(Some((
416 default.name.as_str(),
417 default.position,
418 AvroFieldAccess {
419 schema: SchemaOrDefault::Default(&default.default),
420 },
421 )))
422 } else {
423 while self.i_fields < self.fields.len() {
424 let field = &self.fields[self.i_fields];
425 self.i_fields += 1;
426 match field {
427 ResolvedRecordField::Absent(absent_schema) => {
428 let d = GeneralDeserializer {
430 schema: absent_schema.top_node(),
431 };
432 d.deserialize(self.r, TrivialDecoder)?;
433 continue;
434 }
435 ResolvedRecordField::Present(field) => {
436 return Ok(Some((
437 field.name.as_str(),
438 field.position,
439 AvroFieldAccess {
440 schema: SchemaOrDefault::Schema(
441 self.r,
442 self.schema.step(&field.schema),
443 ),
444 },
445 )));
446 }
447 }
448 }
449 Ok(None)
450 }
451 }
452}
453
454pub trait AvroArrayAccess {
455 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
456}
457
458pub trait AvroMapAccess {
459 type R: AvroRead;
460 fn next_entry<'b>(
461 &'b mut self,
462 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
463}
464
465pub struct SimpleMapAccess<'a, R: AvroRead> {
466 entry_schema: SchemaNode<'a>,
467 r: &'a mut R,
468 done: bool,
469 remaining: usize,
470}
471
472impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
473 fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
474 Self {
475 entry_schema,
476 r,
477 done: false,
478 remaining: 0,
479 }
480 }
481}
482
483impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
484 type R = R;
485 fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
486 if self.done {
487 return Ok(None);
488 }
489 if self.remaining == 0 {
490 let (len, _len_in_bytes) = match zag_i64(self.r)? {
492 len if len > 0 => (len as usize, None),
493 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
494 0 => {
495 self.done = true;
496 return Ok(None);
497 }
498 _ => unreachable!(),
499 };
500 self.remaining = len;
501 }
502 assert!(self.remaining > 0);
503 self.remaining -= 1;
504
505 let key_len = decode_len(self.r)?;
508 let mut key_buf = vec![];
509 key_buf.resize_with(key_len, Default::default);
510 self.r.read_exact(&mut key_buf)?;
511 let key = String::from_utf8(key_buf)
512 .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
513
514 let a = AvroFieldAccess {
515 schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
516 };
517 Ok(Some((key, a)))
518 }
519}
520
521struct SimpleArrayAccess<'a, R: AvroRead> {
522 r: &'a mut R,
523 schema: SchemaNode<'a>,
524 remaining: usize,
525 done: bool,
526}
527
528impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
529 fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
530 Self {
531 r,
532 schema,
533 remaining: 0,
534 done: false,
535 }
536 }
537}
538
539struct ValueArrayAccess<'a> {
540 values: &'a [Value],
541 i: usize,
542}
543
544impl<'a> ValueArrayAccess<'a> {
545 fn new(values: &'a [Value]) -> Self {
546 Self { values, i: 0 }
547 }
548}
549
550impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
551 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
552 assert!(self.i <= self.values.len());
553 if self.i == self.values.len() {
554 Ok(None)
555 } else {
556 let val = give_value(d, &self.values[self.i])?;
557 self.i += 1;
558 Ok(Some(val))
559 }
560 }
561}
562
563impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
564 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
565 if self.done {
566 return Ok(None);
567 }
568 if self.remaining == 0 {
569 let (len, _len_in_bytes) = match zag_i64(self.r)? {
571 len if len > 0 => (len as usize, None),
572 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
573 0 => {
574 self.done = true;
575 return Ok(None);
576 }
577 _ => unreachable!(),
578 };
579 self.remaining = len;
580 }
581 assert!(self.remaining > 0);
582 self.remaining -= 1;
583 let des = GeneralDeserializer {
584 schema: self.schema,
585 };
586 des.deserialize(self.r, d).map(Some)
587 }
588}
589
590#[macro_export]
591macro_rules! define_unexpected {
592 (record) => {
593 fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
594 self,
595 _a: &mut A,
596 ) -> Result<Self::Out, $crate::error::Error> {
597 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
598 }
599 };
600 (union_branch) => {
601 fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
602 self,
603 _idx: usize,
604 _n_variants: usize,
605 _null_variant: Option<usize>,
606 _deserializer: D,
607 _reader: &'avro_macro_lifetime mut R,
608 ) -> Result<Self::Out, $crate::error::Error> {
609 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
610 }
611 };
612 (array) => {
613 fn array<A: $crate::AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, $crate::error::Error> {
614 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedArray))
615 }
616 };
617 (map) => {
618 fn map<M: $crate::AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, $crate::error::Error> {
619 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedMap))
620 }
621 };
622 (enum_variant) => {
623 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, $crate::error::Error> {
624 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedEnum))
625 }
626 };
627 (scalar) => {
628 fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
629 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
630 }
631 };
632 (decimal) => {
633 fn decimal<'avro_macro_lifetime, R: AvroRead>(
634 self,
635 _precision: usize,
636 _scale: usize,
637 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
638 ) -> Result<Self::Out, $crate::error::Error> {
639 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
640 }
641 };
642 (bytes) => {
643 fn bytes<'avro_macro_lifetime, R: AvroRead>(
644 self,
645 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
646 ) -> Result<Self::Out, $crate::error::Error> {
647 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
648 }
649 };
650 (string) => {
651 fn string<'avro_macro_lifetime, R: AvroRead>(
652 self,
653 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
654 ) -> Result<Self::Out, $crate::error::Error> {
655 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
656 }
657 };
658 (json) => {
659 fn json<'avro_macro_lifetime, R: AvroRead>(
660 self,
661 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime serde_json::Value, R>,
662 ) -> Result<Self::Out, $crate::error::Error> {
663 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
664 }
665 };
666 (uuid) => {
667 fn uuid<'avro_macro_lifetime, R: AvroRead>(
668 self,
669 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
670 ) -> Result<Self::Out, $crate::error::Error> {
671 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
672 }
673 };
674 (fixed) => {
675 fn fixed<'avro_macro_lifetime, R: AvroRead>(
676 self,
677 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
678 ) -> Result<Self::Out, $crate::error::Error> {
679 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
680 }
681 };
682 ($($kind:ident),+) => {
683 $($crate::define_unexpected!{$kind})+
684 }
685}
686
687pub trait AvroDecode: Sized {
688 type Out;
689 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
690 self,
691 _a: &mut A,
692 ) -> Result<Self::Out, AvroError>;
693
694 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
695 self,
696 _idx: usize,
697 _n_variants: usize,
698 _null_variant: Option<usize>,
699 _deserializer: D,
700 _reader: &'a mut R,
701 ) -> Result<Self::Out, AvroError>;
702
703 fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
704
705 fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
706
707 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
708
709 fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
710
711 fn decimal<'a, R: AvroRead>(
712 self,
713 _precision: usize,
714 _scale: usize,
715 _r: ValueOrReader<'a, &'a [u8], R>,
716 ) -> Result<Self::Out, AvroError>;
717
718 fn bytes<'a, R: AvroRead>(
719 self,
720 _r: ValueOrReader<'a, &'a [u8], R>,
721 ) -> Result<Self::Out, AvroError>;
722 fn string<'a, R: AvroRead>(
723 self,
724 _r: ValueOrReader<'a, &'a str, R>,
725 ) -> Result<Self::Out, AvroError>;
726 fn json<'a, R: AvroRead>(
727 self,
728 _r: ValueOrReader<'a, &'a serde_json::Value, R>,
729 ) -> Result<Self::Out, AvroError>;
730 fn uuid<'a, R: AvroRead>(
731 self,
732 _r: ValueOrReader<'a, &'a [u8], R>,
733 ) -> Result<Self::Out, AvroError>;
734 fn fixed<'a, R: AvroRead>(
735 self,
736 _r: ValueOrReader<'a, &'a [u8], R>,
737 ) -> Result<Self::Out, AvroError>;
738 fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
739 self,
740 f: F,
741 ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
742 public_decoders::MappingDecoder::new(self, f)
743 }
744}
745
746pub mod public_decoders {
747
748 use std::collections::BTreeMap;
749
750 use crate::error::{DecodeError, Error as AvroError};
751 use crate::types::{DecimalValue, Scalar, Value};
752 use crate::{
753 AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
754 };
755
756 use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
757
758 macro_rules! define_simple_decoder {
759 ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
760 pub struct $name;
761 impl AvroDecode for $name {
762 type Out = $out;
763 fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
764 let out = match scalar {
765 $(
766 Scalar::$scalar_branch(inner) => {inner.try_into()?}
767 ),*
768 other => return Err(AvroError::Decode(DecodeError::UnexpectedScalarKind(other.into())))
769 };
770 Ok(out)
771 }
772 define_unexpected! {
773 array, record, union_branch, map, enum_variant, decimal, bytes, string, json, uuid, fixed
774 }
775 }
776
777 impl StatefulAvroDecodable for $out {
778 type Decoder = $name;
779 type State = ();
780 fn new_decoder(_state: ()) -> $name {
781 $name
782 }
783 }
784 }
785 }
786
787 define_simple_decoder!(I32Decoder, i32, Int;Long);
788 define_simple_decoder!(I64Decoder, i64, Int;Long);
789 define_simple_decoder!(U64Decoder, u64, Int;Long);
790 define_simple_decoder!(UsizeDecoder, usize, Int;Long);
791 define_simple_decoder!(IsizeDecoder, isize, Int;Long);
792
793 pub struct MappingDecoder<
794 T,
795 InnerOut,
796 Inner: AvroDecode<Out = InnerOut>,
797 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
798 > {
799 inner: Inner,
800 conv: Conv,
801 }
802
803 impl<
804 T,
805 InnerOut,
806 Inner: AvroDecode<Out = InnerOut>,
807 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
808 > MappingDecoder<T, InnerOut, Inner, Conv>
809 {
810 pub fn new(inner: Inner, conv: Conv) -> Self {
811 Self { inner, conv }
812 }
813 }
814
815 impl<
816 T,
817 InnerOut,
818 Inner: AvroDecode<Out = InnerOut>,
819 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
820 > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
821 {
822 type Out = T;
823
824 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
825 mut self,
826 a: &mut A,
827 ) -> Result<Self::Out, AvroError> {
828 (self.conv)(self.inner.record(a)?)
829 }
830
831 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
832 mut self,
833 idx: usize,
834 n_variants: usize,
835 null_variant: Option<usize>,
836 deserializer: D,
837 reader: &'a mut R,
838 ) -> Result<Self::Out, AvroError> {
839 (self.conv)(self.inner.union_branch(
840 idx,
841 n_variants,
842 null_variant,
843 deserializer,
844 reader,
845 )?)
846 }
847
848 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
849 (self.conv)(self.inner.array(a)?)
850 }
851
852 fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
853 (self.conv)(self.inner.map(m)?)
854 }
855
856 fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
857 (self.conv)(self.inner.enum_variant(symbol, idx)?)
858 }
859
860 fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
861 (self.conv)(self.inner.scalar(scalar)?)
862 }
863
864 fn decimal<'a, R: AvroRead>(
865 mut self,
866 precision: usize,
867 scale: usize,
868 r: ValueOrReader<'a, &'a [u8], R>,
869 ) -> Result<Self::Out, AvroError> {
870 (self.conv)(self.inner.decimal(precision, scale, r)?)
871 }
872
873 fn bytes<'a, R: AvroRead>(
874 mut self,
875 r: ValueOrReader<'a, &'a [u8], R>,
876 ) -> Result<Self::Out, AvroError> {
877 (self.conv)(self.inner.bytes(r)?)
878 }
879
880 fn string<'a, R: AvroRead>(
881 mut self,
882 r: ValueOrReader<'a, &'a str, R>,
883 ) -> Result<Self::Out, AvroError> {
884 (self.conv)(self.inner.string(r)?)
885 }
886
887 fn json<'a, R: AvroRead>(
888 mut self,
889 r: ValueOrReader<'a, &'a serde_json::Value, R>,
890 ) -> Result<Self::Out, AvroError> {
891 (self.conv)(self.inner.json(r)?)
892 }
893
894 fn uuid<'a, R: AvroRead>(
895 mut self,
896 r: ValueOrReader<'a, &'a [u8], R>,
897 ) -> Result<Self::Out, AvroError> {
898 (self.conv)(self.inner.uuid(r)?)
899 }
900
901 fn fixed<'a, R: AvroRead>(
902 mut self,
903 r: ValueOrReader<'a, &'a [u8], R>,
904 ) -> Result<Self::Out, AvroError> {
905 (self.conv)(self.inner.fixed(r)?)
906 }
907 }
908 pub struct ArrayAsVecDecoder<
909 InnerOut,
910 Inner: AvroDecode<Out = InnerOut>,
911 Ctor: FnMut() -> Inner,
912 > {
913 ctor: Ctor,
914 buf: Vec<InnerOut>,
915 }
916
917 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
918 ArrayAsVecDecoder<InnerOut, Inner, Ctor>
919 {
920 pub fn new(ctor: Ctor) -> Self {
921 Self { ctor, buf: vec![] }
922 }
923 }
924 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
925 for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
926 {
927 type Out = Vec<InnerOut>;
928 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
929 while let Some(next) = a.decode_next((self.ctor)())? {
930 self.buf.push(next);
931 }
932 Ok(self.buf)
933 }
934 define_unexpected! {
935 record, union_branch, map, enum_variant, scalar, decimal, bytes, string, json, uuid, fixed
936 }
937 }
938
939 pub struct DefaultArrayAsVecDecoder<T> {
940 buf: Vec<T>,
941 }
942 impl<T> Default for DefaultArrayAsVecDecoder<T> {
943 fn default() -> Self {
944 Self { buf: vec![] }
945 }
946 }
947 impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
948 type Out = Vec<T>;
949 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
950 while let Some(next) = {
951 let inner = T::new_decoder();
952 a.decode_next(inner)?
953 } {
954 self.buf.push(next);
955 }
956 Ok(self.buf)
957 }
958 define_unexpected! {
959 record, union_branch, map, enum_variant, scalar, decimal, bytes, string, json, uuid, fixed
960 }
961 }
962 impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
963 type Decoder = DefaultArrayAsVecDecoder<T>;
964 type State = ();
965
966 fn new_decoder(_state: Self::State) -> Self::Decoder {
967 DefaultArrayAsVecDecoder::<T>::default()
968 }
969 }
970 pub struct TrivialDecoder;
971
972 impl TrivialDecoder {
973 fn maybe_skip<'a, V, R: AvroRead>(
974 self,
975 r: ValueOrReader<'a, V, R>,
976 ) -> Result<(), AvroError> {
977 if let ValueOrReader::Reader { len, r } = r {
978 Ok(r.skip(len)?)
979 } else {
980 Ok(())
981 }
982 }
983 }
984
985 impl AvroDecode for TrivialDecoder {
986 type Out = ();
987 fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
988 while let Some((_, _, f)) = a.next_field()? {
989 f.decode_field(TrivialDecoder)?;
990 }
991 Ok(())
992 }
993 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
994 self,
995 _idx: usize,
996 _n_variants: usize,
997 _null_variant: Option<usize>,
998 deserializer: D,
999 reader: &'a mut R,
1000 ) -> Result<(), AvroError> {
1001 deserializer.deserialize(reader, self)
1002 }
1003
1004 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1005 Ok(())
1006 }
1007 fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1008 Ok(())
1009 }
1010 fn decimal<'a, R: AvroRead>(
1011 self,
1012 _precision: usize,
1013 _scale: usize,
1014 r: ValueOrReader<'a, &'a [u8], R>,
1015 ) -> Result<(), AvroError> {
1016 self.maybe_skip(r)
1017 }
1018 fn bytes<'a, R: AvroRead>(
1019 self,
1020 r: ValueOrReader<'a, &'a [u8], R>,
1021 ) -> Result<(), AvroError> {
1022 self.maybe_skip(r)
1023 }
1024 fn string<'a, R: AvroRead>(
1025 self,
1026 r: ValueOrReader<'a, &'a str, R>,
1027 ) -> Result<(), AvroError> {
1028 self.maybe_skip(r)
1029 }
1030 fn json<'a, R: AvroRead>(
1031 self,
1032 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1033 ) -> Result<(), AvroError> {
1034 self.maybe_skip(r)
1035 }
1036 fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1037 self.maybe_skip(r)
1038 }
1039 fn fixed<'a, R: AvroRead>(
1040 self,
1041 r: ValueOrReader<'a, &'a [u8], R>,
1042 ) -> Result<(), AvroError> {
1043 self.maybe_skip(r)
1044 }
1045 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1046 while a.decode_next(TrivialDecoder)?.is_some() {}
1047 Ok(())
1048 }
1049
1050 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1051 while let Some((_n, entry)) = m.next_entry()? {
1052 entry.decode_field(TrivialDecoder)?
1053 }
1054 Ok(())
1055 }
1056 }
1057 pub struct ValueDecoder;
1058 impl AvroDecode for ValueDecoder {
1059 type Out = Value;
1060 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1061 self,
1062 a: &mut A,
1063 ) -> Result<Value, AvroError> {
1064 let mut fields = vec![];
1065 while let Some((name, idx, f)) = a.next_field()? {
1066 let next = ValueDecoder;
1067 let val = f.decode_field(next)?;
1068 fields.push((idx, (name.to_string(), val)));
1069 }
1070 fields.sort_by_key(|(idx, _)| *idx);
1071
1072 Ok(Value::Record(
1073 fields
1074 .into_iter()
1075 .map(|(_idx, (name, val))| (name, val))
1076 .collect(),
1077 ))
1078 }
1079 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1080 self,
1081 index: usize,
1082 n_variants: usize,
1083 null_variant: Option<usize>,
1084 deserializer: D,
1085 reader: &'a mut R,
1086 ) -> Result<Value, AvroError> {
1087 let next = ValueDecoder;
1088 let inner = Box::new(deserializer.deserialize(reader, next)?);
1089 Ok(Value::Union {
1090 index,
1091 inner,
1092 n_variants,
1093 null_variant,
1094 })
1095 }
1096 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1097 let mut items = vec![];
1098 loop {
1099 let next = ValueDecoder;
1100
1101 if let Some(value) = a.decode_next(next)? {
1102 items.push(value)
1103 } else {
1104 break;
1105 }
1106 }
1107 Ok(Value::Array(items))
1108 }
1109 fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1110 Ok(Value::Enum(idx, symbol.to_string()))
1111 }
1112 fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1113 Ok(scalar.into())
1114 }
1115 fn decimal<'a, R: AvroRead>(
1116 self,
1117 precision: usize,
1118 scale: usize,
1119 r: ValueOrReader<'a, &'a [u8], R>,
1120 ) -> Result<Value, AvroError> {
1121 let unscaled = match r {
1122 ValueOrReader::Value(buf) => buf.to_vec(),
1123 ValueOrReader::Reader { len, r } => {
1124 let mut buf = vec![];
1125 buf.resize_with(len, Default::default);
1126 r.read_exact(&mut buf)?;
1127 buf
1128 }
1129 };
1130 Ok(Value::Decimal(DecimalValue {
1131 unscaled,
1132 precision,
1133 scale,
1134 }))
1135 }
1136 fn bytes<'a, R: AvroRead>(
1137 self,
1138 r: ValueOrReader<'a, &'a [u8], R>,
1139 ) -> Result<Value, AvroError> {
1140 let buf = match r {
1141 ValueOrReader::Value(buf) => buf.to_vec(),
1142 ValueOrReader::Reader { len, r } => {
1143 let mut buf = vec![];
1144 buf.resize_with(len, Default::default);
1145 r.read_exact(&mut buf)?;
1146 buf
1147 }
1148 };
1149 Ok(Value::Bytes(buf))
1150 }
1151 fn string<'a, R: AvroRead>(
1152 self,
1153 r: ValueOrReader<'a, &'a str, R>,
1154 ) -> Result<Value, AvroError> {
1155 let s = match r {
1156 ValueOrReader::Value(s) => s.to_string(),
1157 ValueOrReader::Reader { len, r } => {
1158 let mut buf = vec![];
1159 buf.resize_with(len, Default::default);
1160 r.read_exact(&mut buf)?;
1161 String::from_utf8(buf)
1162 .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1163 }
1164 };
1165 Ok(Value::String(s))
1166 }
1167 fn json<'a, R: AvroRead>(
1168 self,
1169 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1170 ) -> Result<Value, AvroError> {
1171 let val = match r {
1172 ValueOrReader::Value(val) => val.clone(),
1173 ValueOrReader::Reader { len, r } => {
1174 let mut buf = vec![];
1175 buf.resize_with(len, Default::default);
1176 r.read_exact(&mut buf)?;
1177 serde_json::from_slice(&buf).map_err(|e| {
1178 AvroError::Decode(DecodeError::BadJson {
1179 category: e.classify(),
1180 bytes: buf.to_owned(),
1181 })
1182 })?
1183 }
1184 };
1185 Ok(Value::Json(val))
1186 }
1187 fn uuid<'a, R: AvroRead>(
1188 self,
1189 r: ValueOrReader<'a, &'a [u8], R>,
1190 ) -> Result<Value, AvroError> {
1191 let buf = match r {
1192 ValueOrReader::Value(val) => val.to_vec(),
1193 ValueOrReader::Reader { len, r } => {
1194 let mut buf = vec![];
1195 buf.resize_with(len, Default::default);
1196 r.read_exact(&mut buf)?;
1197 buf
1198 }
1199 };
1200 let s = std::str::from_utf8(&buf)
1201 .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1202 let val =
1203 uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1204 Ok(Value::Uuid(val))
1205 }
1206 fn fixed<'a, R: AvroRead>(
1207 self,
1208 r: ValueOrReader<'a, &'a [u8], R>,
1209 ) -> Result<Value, AvroError> {
1210 let buf = match r {
1211 ValueOrReader::Value(buf) => buf.to_vec(),
1212 ValueOrReader::Reader { len, r } => {
1213 let mut buf = vec![];
1214 buf.resize_with(len, Default::default);
1215 r.read_exact(&mut buf)?;
1216 buf
1217 }
1218 };
1219 Ok(Value::Fixed(buf.len(), buf))
1220 }
1221 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1222 let mut entries = BTreeMap::new();
1223 while let Some((name, a)) = m.next_entry()? {
1224 let d = ValueDecoder;
1225 let val = a.decode_field(d)?;
1226 entries.insert(name, val);
1227 }
1228 Ok(Value::Map(entries))
1229 }
1230 }
1231}
1232
1233impl<'a> AvroDeserializer for &'a Value {
1234 fn deserialize<R: AvroRead, D: AvroDecode>(
1235 self,
1236 _r: &mut R,
1237 d: D,
1238 ) -> Result<D::Out, AvroError> {
1239 give_value(d, self)
1240 }
1241}
1242
1243pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1244 use ValueOrReader::Value as V;
1245 match v {
1246 Value::Null => d.scalar(Scalar::Null),
1247 Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1248 Value::Int(val) => d.scalar(Scalar::Int(*val)),
1249 Value::Long(val) => d.scalar(Scalar::Long(*val)),
1250 Value::Float(val) => d.scalar(Scalar::Float(*val)),
1251 Value::Double(val) => d.scalar(Scalar::Double(*val)),
1252 Value::Date(val) => d.scalar(Scalar::Date(*val)),
1253 Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1254 Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1257 Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1258 Value::String(val) => d.string::<&[u8]>(V(val)),
1259 Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1260 Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1261 Value::Union {
1262 index,
1263 inner,
1264 n_variants,
1265 null_variant,
1266 } => {
1267 let mut empty_reader: &[u8] = &[];
1268 d.union_branch(
1269 *index,
1270 *n_variants,
1271 *null_variant,
1272 &**inner,
1273 &mut empty_reader,
1274 )
1275 }
1276 Value::Array(val) => {
1277 let mut a = ValueArrayAccess::new(val);
1278 d.array(&mut a)
1279 }
1280 Value::Map(val) => {
1281 let vals: Vec<_> = val.clone().into_iter().collect();
1282 let mut m = ValueMapAccess::new(vals.as_slice());
1283 d.map(&mut m)
1284 }
1285 Value::Record(val) => {
1286 let mut a = ValueRecordAccess::new(val);
1287 d.record(&mut a)
1288 }
1289 Value::Json(val) => d.json::<&[u8]>(V(val)),
1290 Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1291 }
1292}
1293
1294pub trait AvroDeserializer {
1295 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1296}
1297
1298#[derive(Clone, Copy)]
1299pub struct GeneralDeserializer<'a> {
1300 pub schema: SchemaNode<'a>,
1301}
1302
1303impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1304 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1305 use ValueOrReader::Reader;
1306 match self.schema.inner {
1307 SchemaPiece::Null => d.scalar(Scalar::Null),
1308 SchemaPiece::Boolean => {
1309 let mut buf = [0u8; 1];
1310 r.read_exact(&mut buf[..])?;
1311 let val = match buf[0] {
1312 0u8 => false,
1313 1u8 => true,
1314 other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
1315 };
1316 d.scalar(Scalar::Boolean(val))
1317 }
1318 SchemaPiece::Int => {
1319 let val = zag_i32(r)?;
1320 d.scalar(Scalar::Int(val))
1321 }
1322 SchemaPiece::Long => {
1323 let val = zag_i64(r)?;
1324 d.scalar(Scalar::Long(val))
1325 }
1326 SchemaPiece::Float => {
1327 let val = decode_float(r)?;
1328 d.scalar(Scalar::Float(val))
1329 }
1330 SchemaPiece::Double => {
1331 let val = decode_double(r)?;
1332 d.scalar(Scalar::Double(val))
1333 }
1334 SchemaPiece::Date => {
1335 let days = zag_i32(r)?;
1336 d.scalar(Scalar::Date(days))
1337 }
1338 SchemaPiece::TimestampMilli => {
1339 let total_millis = zag_i64(r)?;
1340 let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
1341 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1342 _ => unreachable!(),
1343 };
1344 d.scalar(scalar)
1345 }
1346 SchemaPiece::TimestampMicro => {
1347 let total_micros = zag_i64(r)?;
1348 let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
1349 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1350 _ => unreachable!(),
1351 };
1352 d.scalar(scalar)
1353 }
1354 SchemaPiece::Decimal {
1355 precision,
1356 scale,
1357 fixed_size,
1358 } => {
1359 let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
1360 d.decimal(*precision, *scale, Reader { len, r })
1361 }
1362 SchemaPiece::Bytes => {
1363 let len = decode_len(r)?;
1364 d.bytes(Reader { len, r })
1365 }
1366 SchemaPiece::String => {
1367 let len = decode_len(r)?;
1368 d.string(Reader { len, r })
1369 }
1370 SchemaPiece::Json => {
1371 let len = decode_len(r)?;
1372 d.json(Reader { len, r })
1373 }
1374 SchemaPiece::Uuid => {
1375 let len = decode_len(r)?;
1376 d.uuid(Reader { len, r })
1377 }
1378 SchemaPiece::Array(inner) => {
1379 let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
1384 d.array(&mut a)
1385 }
1386 SchemaPiece::Map(inner) => {
1387 let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
1389 d.map(&mut m)
1390 }
1391 SchemaPiece::Union(inner) => {
1392 let index = decode_long_nonneg(r)? as usize;
1393 let variants = inner.variants();
1394 match variants.get(index) {
1395 Some(variant) => {
1396 let n_variants = variants.len();
1397 let null_variant = variants
1398 .iter()
1399 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
1400 let dsr = GeneralDeserializer {
1401 schema: self.schema.step(variant),
1402 };
1403 d.union_branch(index, n_variants, null_variant, dsr, r)
1404 }
1405 None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
1406 index,
1407 len: variants.len(),
1408 })),
1409 }
1410 }
1411 SchemaPiece::ResolveIntLong => {
1412 let val = zag_i32(r)? as i64;
1413 d.scalar(Scalar::Long(val))
1414 }
1415 SchemaPiece::ResolveIntFloat => {
1416 let val = zag_i32(r)? as f32;
1417 d.scalar(Scalar::Float(val))
1418 }
1419 SchemaPiece::ResolveIntDouble => {
1420 let val = zag_i32(r)? as f64;
1421 d.scalar(Scalar::Double(val))
1422 }
1423 SchemaPiece::ResolveLongFloat => {
1424 let val = zag_i64(r)? as f32;
1425 d.scalar(Scalar::Float(val))
1426 }
1427 SchemaPiece::ResolveLongDouble => {
1428 let val = zag_i64(r)? as f64;
1429 d.scalar(Scalar::Double(val))
1430 }
1431 SchemaPiece::ResolveFloatDouble => {
1432 let val = decode_float(r)? as f64;
1433 d.scalar(Scalar::Double(val))
1434 }
1435 SchemaPiece::ResolveConcreteUnion {
1436 index,
1437 inner,
1438 n_reader_variants,
1439 reader_null_variant,
1440 } => {
1441 let dsr = GeneralDeserializer {
1442 schema: self.schema.step(&**inner),
1443 };
1444 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1445 }
1446 SchemaPiece::ResolveUnionUnion {
1447 permutation,
1448 n_reader_variants,
1449 reader_null_variant,
1450 } => {
1451 let index = decode_long_nonneg(r)? as usize;
1452 if index >= permutation.len() {
1453 return Err(AvroError::Decode(DecodeError::BadUnionIndex {
1454 index,
1455 len: permutation.len(),
1456 }));
1457 }
1458 match &permutation[index] {
1459 Err(e) => Err(e.clone()),
1460 Ok((index, variant)) => {
1461 let dsr = GeneralDeserializer {
1462 schema: self.schema.step(variant),
1463 };
1464 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1465 }
1466 }
1467 }
1468 SchemaPiece::ResolveUnionConcrete { index, inner } => {
1469 let found_index = decode_long_nonneg(r)? as usize;
1470 if *index != found_index {
1471 Err(AvroError::Decode(DecodeError::WrongUnionIndex {
1472 expected: *index,
1473 actual: found_index,
1474 }))
1475 } else {
1476 let dsr = GeneralDeserializer {
1477 schema: self.schema.step(inner.as_ref()),
1478 };
1479 dsr.deserialize(r, d)
1481 }
1482 }
1483 SchemaPiece::Record {
1484 doc: _,
1485 fields,
1486 lookup: _,
1487 } => {
1488 let mut a = SimpleRecordAccess::new(self.schema, r, fields);
1489 d.record(&mut a)
1490 }
1491 SchemaPiece::Enum {
1492 symbols,
1493 doc: _,
1494 default_idx: _,
1495 } => {
1496 let index = decode_int_nonneg(r)? as usize;
1497 match symbols.get(index) {
1498 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1499 index,
1500 len: symbols.len(),
1501 })),
1502 Some(symbol) => d.enum_variant(symbol, index),
1503 }
1504 }
1505 SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
1506 SchemaPiece::ResolveRecord {
1512 defaults,
1513 fields,
1514 n_reader_fields: _,
1515 } => {
1516 let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
1517 d.record(&mut a)
1518 }
1519 SchemaPiece::ResolveEnum {
1520 doc: _,
1521 symbols,
1522 default,
1523 } => {
1524 let index = decode_int_nonneg(r)? as usize;
1525 match symbols.get(index) {
1526 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1527 index,
1528 len: symbols.len(),
1529 })),
1530 Some(op) => match op {
1531 Err(missing) => {
1532 if let Some((reader_index, symbol)) = default.clone() {
1533 d.enum_variant(&symbol, reader_index)
1534 } else {
1535 Err(AvroError::Decode(DecodeError::MissingEnumIndex {
1536 index,
1537 symbol: missing.clone(),
1538 }))
1539 }
1540 }
1541 Ok((index, name)) => d.enum_variant(name, *index),
1542 },
1543 }
1544 }
1545 SchemaPiece::ResolveIntTsMilli => {
1546 let total_millis = zag_i32(r)?;
1547 let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
1548 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1549 _ => unreachable!(),
1550 };
1551 d.scalar(scalar)
1552 }
1553 SchemaPiece::ResolveIntTsMicro => {
1554 let total_micros = zag_i32(r)?;
1555 let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
1556 Value::Timestamp(ts) => Scalar::Timestamp(ts),
1557 _ => unreachable!(),
1558 };
1559 d.scalar(scalar)
1560 }
1561 SchemaPiece::ResolveDateTimestamp => {
1562 let days = zag_i32(r)?;
1563
1564 let date = NaiveDate::from_ymd_opt(1970, 1, 1)
1565 .expect("naive date known valid")
1566 .checked_add_signed(
1567 chrono::Duration::try_days(days.into())
1568 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
1569 )
1570 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
1571 let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
1572 d.scalar(Scalar::Timestamp(dt))
1573 }
1574 }
1575 }
1576}
1577pub fn decode<'a, R: AvroRead>(
1579 schema: SchemaNode<'a>,
1580 reader: &'a mut R,
1581) -> Result<Value, AvroError> {
1582 let d = ValueDecoder;
1583 let dsr = GeneralDeserializer { schema };
1584 let val = dsr.deserialize(reader, d)?;
1585 Ok(val)
1586}