Skip to main content

mz_avro/
decode.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use std::cmp;
25use std::fmt::{self, Display};
26use std::fs::File;
27use std::io::{self, Cursor, Read, Seek, SeekFrom};
28
29use chrono::{DateTime, NaiveDate};
30use flate2::read::MultiGzDecoder;
31
32use crate::error::{DecodeError, Error as AvroError};
33use crate::schema::{
34    RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece,
35    SchemaPieceOrNamed,
36};
37use crate::types::{Scalar, Value};
38use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
39use crate::{TrivialDecoder, ValueDecoder};
40
41pub trait StatefulAvroDecodable: Sized {
42    type Decoder: AvroDecode<Out = Self>;
43    type State;
44    fn new_decoder(state: Self::State) -> Self::Decoder;
45}
46pub trait AvroDecodable: Sized {
47    type Decoder: AvroDecode<Out = Self>;
48
49    fn new_decoder() -> Self::Decoder;
50}
51impl<T> AvroDecodable for T
52where
53    T: StatefulAvroDecodable,
54    T::State: Default,
55{
56    type Decoder = <Self as StatefulAvroDecodable>::Decoder;
57
58    fn new_decoder() -> Self::Decoder {
59        <Self as StatefulAvroDecodable>::new_decoder(Default::default())
60    }
61}
62#[inline]
63fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
64    let u = match zag_i64(reader)? {
65        i if i >= 0 => i as u64,
66        i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
67    };
68    Ok(u)
69}
70
71fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
72    let u = match zag_i32(reader)? {
73        i if i >= 0 => i as u32,
74        i => {
75            return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
76                i as i64,
77            )));
78        }
79    };
80    Ok(u)
81}
82
83#[inline]
84fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
85    zag_i64(reader).and_then(|i| safe_len(i as usize))
86}
87
88#[inline]
89fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
90    let mut buf = [0u8; 4];
91    reader.read_exact(&mut buf[..])?;
92    Ok(f32::from_le_bytes(buf))
93}
94
95#[inline]
96fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
97    let mut buf = [0u8; 8];
98    reader.read_exact(&mut buf[..])?;
99    Ok(f64::from_le_bytes(buf))
100}
101
102impl Display for TsUnit {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        match self {
105            TsUnit::Millis => write!(f, "ms"),
106            TsUnit::Micros => write!(f, "us"),
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use chrono::DateTime;
114
115    use crate::types::Value;
116    use crate::util::TsUnit;
117
118    use super::build_ts_value;
119
120    #[mz_ore::test]
121    fn test_negative_timestamps() {
122        assert_eq!(
123            build_ts_value(-1, TsUnit::Millis).unwrap(),
124            Value::Timestamp(
125                DateTime::from_timestamp(-1, 999_000_000)
126                    .unwrap()
127                    .naive_utc()
128            )
129        );
130        assert_eq!(
131            build_ts_value(-1000, TsUnit::Millis).unwrap(),
132            Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
133        );
134        assert_eq!(
135            build_ts_value(-1000, TsUnit::Micros).unwrap(),
136            Value::Timestamp(
137                DateTime::from_timestamp(-1, 999_000_000)
138                    .unwrap()
139                    .naive_utc()
140            )
141        );
142        assert_eq!(
143            build_ts_value(-1, TsUnit::Micros).unwrap(),
144            Value::Timestamp(
145                DateTime::from_timestamp(-1, 999_999_000)
146                    .unwrap()
147                    .naive_utc()
148            )
149        );
150        assert_eq!(
151            build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
152            Value::Timestamp(
153                DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
154                    .unwrap()
155                    .naive_utc()
156            )
157        );
158    }
159}
160
161/// A convenience function to build timestamp values from underlying longs.
162pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
163    let result = match unit {
164        TsUnit::Millis => DateTime::from_timestamp_millis(value),
165        TsUnit::Micros => DateTime::from_timestamp_micros(value),
166    };
167    let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
168    Ok(Value::Timestamp(ndt.naive_utc()))
169}
170
171/// A convenience trait for types that are both readable and skippable.
172///
173/// A blanket implementation is provided for all types that implement both
174/// [`Read`] and [`Skip`].
175pub trait AvroRead: Read + Skip {}
176
177impl<T> AvroRead for T where T: Read + Skip {}
178
179/// A trait that allows for efficient skipping forward while reading data.
180pub trait Skip: Read {
181    /// Advance the cursor by `len` bytes.
182    ///
183    /// If possible, the implementation should be more efficient than calling
184    /// [`Read::read`] and discarding the resulting bytes.
185    ///
186    /// Calling `skip` with a `len` that advances the cursor past the end of the
187    /// underlying data source is permissible. The only requirement is that the
188    /// next call to [`Read::read`] indicates EOF.
189    ///
190    /// # Errors
191    ///
192    /// Can return an error in all the same cases that [`Read::read`] can.
193    ///
194    /// TODO: Remove this clippy suppression when the issue is fixed.
195    /// See <https://github.com/rust-lang/rust-clippy/issues/12519>
196    #[allow(clippy::unused_io_amount)]
197    fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
198        const BUF_SIZE: usize = 512;
199        let mut buf = [0; BUF_SIZE];
200
201        while len > 0 {
202            let n = if len < BUF_SIZE {
203                self.read(&mut buf[..len])?
204            } else {
205                self.read(&mut buf)?
206            };
207            if n == 0 {
208                break;
209            }
210            len -= n;
211        }
212        Ok(())
213    }
214}
215
216impl Skip for File {
217    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
218        self.seek(SeekFrom::Current(len as i64))?;
219        Ok(())
220    }
221}
222
223impl Skip for &[u8] {
224    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
225        let len = cmp::min(len, self.len());
226        *self = &self[len..];
227        Ok(())
228    }
229}
230
231impl<S: Skip + ?Sized> Skip for Box<S> {
232    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
233        self.as_mut().skip(len)
234    }
235}
236
237impl<T: AsRef<[u8]>> Skip for Cursor<T> {
238    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
239        self.seek(SeekFrom::Current(len as i64))?;
240        Ok(())
241    }
242}
243
244impl<R: Read> Skip for MultiGzDecoder<R> {}
245
246pub enum ValueOrReader<'a, V, R: AvroRead> {
247    Value(V),
248    Reader { len: usize, r: &'a mut R },
249}
250
251enum SchemaOrDefault<'b, R: AvroRead> {
252    Schema(&'b mut R, SchemaNode<'b>),
253    Default(&'b Value),
254}
255pub struct AvroFieldAccess<'b, R: AvroRead> {
256    schema: SchemaOrDefault<'b, R>,
257}
258
259impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
260    pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
261        match self.schema {
262            SchemaOrDefault::Schema(r, schema) => {
263                let des = GeneralDeserializer { schema };
264                des.deserialize(r, d)
265            }
266            SchemaOrDefault::Default(value) => give_value(d, value),
267        }
268    }
269}
270
271pub trait AvroRecordAccess<R: AvroRead> {
272    fn next_field<'b>(
273        &'b mut self,
274    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
275}
276
277struct SimpleRecordAccess<'a, R: AvroRead> {
278    schema: SchemaNode<'a>,
279    r: &'a mut R,
280    fields: &'a [RecordField],
281    i: usize,
282}
283
284impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
285    fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
286        Self {
287            schema,
288            r,
289            fields,
290            i: 0,
291        }
292    }
293}
294
295impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
296    fn next_field<'b>(
297        &'b mut self,
298    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
299        assert!(self.i <= self.fields.len());
300        if self.i == self.fields.len() {
301            Ok(None)
302        } else {
303            let f = &self.fields[self.i];
304            self.i += 1;
305            Ok(Some((
306                f.name.as_str(),
307                f.position,
308                AvroFieldAccess {
309                    schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
310                },
311            )))
312        }
313    }
314}
315
316struct ValueRecordAccess<'a> {
317    values: &'a [(String, Value)],
318    i: usize,
319}
320
321impl<'a> ValueRecordAccess<'a> {
322    fn new(values: &'a [(String, Value)]) -> Self {
323        Self { values, i: 0 }
324    }
325}
326
327impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
328    fn next_field<'b>(
329        &'b mut self,
330    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
331        assert!(self.i <= self.values.len());
332        if self.i == self.values.len() {
333            Ok(None)
334        } else {
335            let (name, val) = &self.values[self.i];
336            self.i += 1;
337            Ok(Some((
338                name.as_str(),
339                self.i - 1,
340                AvroFieldAccess {
341                    schema: SchemaOrDefault::Default(val),
342                },
343            )))
344        }
345    }
346}
347
348struct ValueMapAccess<'a> {
349    values: &'a [(String, Value)],
350    i: usize,
351}
352
353impl<'a> ValueMapAccess<'a> {
354    fn new(values: &'a [(String, Value)]) -> Self {
355        Self { values, i: 0 }
356    }
357}
358
359impl<'a> AvroMapAccess for ValueMapAccess<'a> {
360    type R = &'a [u8];
361    fn next_entry<'b>(
362        &'b mut self,
363    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
364        assert!(self.i <= self.values.len());
365        if self.i == self.values.len() {
366            Ok(None)
367        } else {
368            let (name, val) = &self.values[self.i];
369            self.i += 1;
370            Ok(Some((
371                name.clone(),
372                AvroFieldAccess {
373                    schema: SchemaOrDefault::Default(val),
374                },
375            )))
376        }
377    }
378}
379
380struct ResolvedRecordAccess<'a, R: AvroRead> {
381    defaults: &'a [ResolvedDefaultValueField],
382    i_defaults: usize,
383    fields: &'a [ResolvedRecordField],
384    i_fields: usize,
385    r: &'a mut R,
386    schema: SchemaNode<'a>,
387}
388
389impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
390    fn new(
391        defaults: &'a [ResolvedDefaultValueField],
392        fields: &'a [ResolvedRecordField],
393        r: &'a mut R,
394        schema: SchemaNode<'a>,
395    ) -> Self {
396        Self {
397            defaults,
398            i_defaults: 0,
399            fields,
400            i_fields: 0,
401            r,
402            schema,
403        }
404    }
405}
406
407impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
408    fn next_field<'b>(
409        &'b mut self,
410    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
411        assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
412        if self.i_defaults < self.defaults.len() {
413            let default = &self.defaults[self.i_defaults];
414            self.i_defaults += 1;
415            Ok(Some((
416                default.name.as_str(),
417                default.position,
418                AvroFieldAccess {
419                    schema: SchemaOrDefault::Default(&default.default),
420                },
421            )))
422        } else {
423            while self.i_fields < self.fields.len() {
424                let field = &self.fields[self.i_fields];
425                self.i_fields += 1;
426                match field {
427                    ResolvedRecordField::Absent(absent_schema) => {
428                        // we don't care what's in the value, but we still need to read it in order to skip ahead the proper amount in the input.
429                        let d = GeneralDeserializer {
430                            schema: absent_schema.top_node(),
431                        };
432                        d.deserialize(self.r, TrivialDecoder)?;
433                        continue;
434                    }
435                    ResolvedRecordField::Present(field) => {
436                        return Ok(Some((
437                            field.name.as_str(),
438                            field.position,
439                            AvroFieldAccess {
440                                schema: SchemaOrDefault::Schema(
441                                    self.r,
442                                    self.schema.step(&field.schema),
443                                ),
444                            },
445                        )));
446                    }
447                }
448            }
449            Ok(None)
450        }
451    }
452}
453
454pub trait AvroArrayAccess {
455    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
456}
457
458pub trait AvroMapAccess {
459    type R: AvroRead;
460    fn next_entry<'b>(
461        &'b mut self,
462    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
463}
464
465pub struct SimpleMapAccess<'a, R: AvroRead> {
466    entry_schema: SchemaNode<'a>,
467    r: &'a mut R,
468    done: bool,
469    remaining: usize,
470}
471
472impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
473    fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
474        Self {
475            entry_schema,
476            r,
477            done: false,
478            remaining: 0,
479        }
480    }
481}
482
483impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
484    type R = R;
485    fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
486        if self.done {
487            return Ok(None);
488        }
489        if self.remaining == 0 {
490            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
491            let (len, _len_in_bytes) = match zag_i64(self.r)? {
492                len if len > 0 => (len as usize, None),
493                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
494                0 => {
495                    self.done = true;
496                    return Ok(None);
497                }
498                _ => unreachable!(),
499            };
500            self.remaining = len;
501        }
502        assert!(self.remaining > 0);
503        self.remaining -= 1;
504
505        // TODO - We can try to avoid this allocation, but  nobody uses maps in Materialize
506        // right now so it doesn't really matter.
507        let key_len = decode_len(self.r)?;
508        let mut key_buf = vec![];
509        key_buf.resize_with(key_len, Default::default);
510        self.r.read_exact(&mut key_buf)?;
511        let key = String::from_utf8(key_buf)
512            .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
513
514        let a = AvroFieldAccess {
515            schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
516        };
517        Ok(Some((key, a)))
518    }
519}
520
521struct SimpleArrayAccess<'a, R: AvroRead> {
522    r: &'a mut R,
523    schema: SchemaNode<'a>,
524    remaining: usize,
525    done: bool,
526}
527
528impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
529    fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
530        Self {
531            r,
532            schema,
533            remaining: 0,
534            done: false,
535        }
536    }
537}
538
539struct ValueArrayAccess<'a> {
540    values: &'a [Value],
541    i: usize,
542}
543
544impl<'a> ValueArrayAccess<'a> {
545    fn new(values: &'a [Value]) -> Self {
546        Self { values, i: 0 }
547    }
548}
549
550impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
551    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
552        assert!(self.i <= self.values.len());
553        if self.i == self.values.len() {
554            Ok(None)
555        } else {
556            let val = give_value(d, &self.values[self.i])?;
557            self.i += 1;
558            Ok(Some(val))
559        }
560    }
561}
562
563impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
564    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
565        if self.done {
566            return Ok(None);
567        }
568        if self.remaining == 0 {
569            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
570            let (len, _len_in_bytes) = match zag_i64(self.r)? {
571                len if len > 0 => (len as usize, None),
572                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
573                0 => {
574                    self.done = true;
575                    return Ok(None);
576                }
577                _ => unreachable!(),
578            };
579            self.remaining = len;
580        }
581        assert!(self.remaining > 0);
582        self.remaining -= 1;
583        let des = GeneralDeserializer {
584            schema: self.schema,
585        };
586        des.deserialize(self.r, d).map(Some)
587    }
588}
589
590#[macro_export]
591macro_rules! define_unexpected {
592    (record) => {
593        fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
594            self,
595            _a: &mut A,
596        ) -> Result<Self::Out, $crate::error::Error> {
597            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
598        }
599    };
600    (union_branch) => {
601        fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
602            self,
603            _idx: usize,
604            _n_variants: usize,
605            _null_variant: Option<usize>,
606            _deserializer: D,
607            _reader: &'avro_macro_lifetime mut R,
608        ) -> Result<Self::Out, $crate::error::Error> {
609            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
610        }
611    };
612    (array) => {
613        fn array<A: $crate::AvroArrayAccess>(
614            self,
615            _a: &mut A,
616        ) -> Result<Self::Out, $crate::error::Error> {
617            Err($crate::error::Error::Decode(
618                $crate::error::DecodeError::UnexpectedArray,
619            ))
620        }
621    };
622    (map) => {
623        fn map<M: $crate::AvroMapAccess>(
624            self,
625            _m: &mut M,
626        ) -> Result<Self::Out, $crate::error::Error> {
627            Err($crate::error::Error::Decode(
628                $crate::error::DecodeError::UnexpectedMap,
629            ))
630        }
631    };
632    (enum_variant) => {
633        fn enum_variant(
634            self,
635            _symbol: &str,
636            _idx: usize,
637        ) -> Result<Self::Out, $crate::error::Error> {
638            Err($crate::error::Error::Decode(
639                $crate::error::DecodeError::UnexpectedEnum,
640            ))
641        }
642    };
643    (scalar) => {
644        fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
645            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
646        }
647    };
648    (decimal) => {
649        fn decimal<'avro_macro_lifetime, R: AvroRead>(
650            self,
651            _precision: usize,
652            _scale: usize,
653            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
654        ) -> Result<Self::Out, $crate::error::Error> {
655            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
656        }
657    };
658    (bytes) => {
659        fn bytes<'avro_macro_lifetime, R: AvroRead>(
660            self,
661            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
662        ) -> Result<Self::Out, $crate::error::Error> {
663            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
664        }
665    };
666    (string) => {
667        fn string<'avro_macro_lifetime, R: AvroRead>(
668            self,
669            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
670        ) -> Result<Self::Out, $crate::error::Error> {
671            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
672        }
673    };
674    (json) => {
675        fn json<'avro_macro_lifetime, R: AvroRead>(
676            self,
677            _r: $crate::ValueOrReader<
678                'avro_macro_lifetime,
679                &'avro_macro_lifetime serde_json::Value,
680                R,
681            >,
682        ) -> Result<Self::Out, $crate::error::Error> {
683            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
684        }
685    };
686    (uuid) => {
687        fn uuid<'avro_macro_lifetime, R: AvroRead>(
688            self,
689            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
690        ) -> Result<Self::Out, $crate::error::Error> {
691            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
692        }
693    };
694    (fixed) => {
695        fn fixed<'avro_macro_lifetime, R: AvroRead>(
696            self,
697            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
698        ) -> Result<Self::Out, $crate::error::Error> {
699            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
700        }
701    };
702    ($($kind:ident),+) => {
703        $($crate::define_unexpected!{$kind})+
704    }
705}
706
707pub trait AvroDecode: Sized {
708    type Out;
709    fn record<R: AvroRead, A: AvroRecordAccess<R>>(
710        self,
711        _a: &mut A,
712    ) -> Result<Self::Out, AvroError>;
713
714    fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
715        self,
716        _idx: usize,
717        _n_variants: usize,
718        _null_variant: Option<usize>,
719        _deserializer: D,
720        _reader: &'a mut R,
721    ) -> Result<Self::Out, AvroError>;
722
723    fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
724
725    fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
726
727    fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
728
729    fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
730
731    fn decimal<'a, R: AvroRead>(
732        self,
733        _precision: usize,
734        _scale: usize,
735        _r: ValueOrReader<'a, &'a [u8], R>,
736    ) -> Result<Self::Out, AvroError>;
737
738    fn bytes<'a, R: AvroRead>(
739        self,
740        _r: ValueOrReader<'a, &'a [u8], R>,
741    ) -> Result<Self::Out, AvroError>;
742    fn string<'a, R: AvroRead>(
743        self,
744        _r: ValueOrReader<'a, &'a str, R>,
745    ) -> Result<Self::Out, AvroError>;
746    fn json<'a, R: AvroRead>(
747        self,
748        _r: ValueOrReader<'a, &'a serde_json::Value, R>,
749    ) -> Result<Self::Out, AvroError>;
750    fn uuid<'a, R: AvroRead>(
751        self,
752        _r: ValueOrReader<'a, &'a [u8], R>,
753    ) -> Result<Self::Out, AvroError>;
754    fn fixed<'a, R: AvroRead>(
755        self,
756        _r: ValueOrReader<'a, &'a [u8], R>,
757    ) -> Result<Self::Out, AvroError>;
758    fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
759        self,
760        f: F,
761    ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
762        public_decoders::MappingDecoder::new(self, f)
763    }
764}
765
766pub mod public_decoders {
767
768    use std::collections::BTreeMap;
769
770    use crate::error::{DecodeError, Error as AvroError};
771    use crate::types::{DecimalValue, Scalar, Value};
772    use crate::{
773        AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
774    };
775
776    use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
777
778    macro_rules! define_simple_decoder {
779        ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
780            pub struct $name;
781            impl AvroDecode for $name {
782                type Out = $out;
783                fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
784                    let out = match scalar {
785                        $(
786                            Scalar::$scalar_branch(inner) => {inner.try_into()?}
787                        ),*
788                            other => return Err(AvroError::Decode(
789                                DecodeError::UnexpectedScalarKind(other.into()),
790                            ))
791                    };
792                    Ok(out)
793                }
794                define_unexpected! {
795                    array, record, union_branch, map,
796                    enum_variant, decimal, bytes, string,
797                    json, uuid, fixed
798                }
799            }
800
801            impl StatefulAvroDecodable for $out {
802                type Decoder = $name;
803                type State = ();
804                fn new_decoder(_state: ()) -> $name {
805                    $name
806                }
807            }
808        }
809    }
810
811    define_simple_decoder!(I32Decoder, i32, Int;Long);
812    define_simple_decoder!(I64Decoder, i64, Int;Long);
813    define_simple_decoder!(U64Decoder, u64, Int;Long);
814    define_simple_decoder!(UsizeDecoder, usize, Int;Long);
815    define_simple_decoder!(IsizeDecoder, isize, Int;Long);
816
817    pub struct MappingDecoder<
818        T,
819        InnerOut,
820        Inner: AvroDecode<Out = InnerOut>,
821        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
822    > {
823        inner: Inner,
824        conv: Conv,
825    }
826
827    impl<
828        T,
829        InnerOut,
830        Inner: AvroDecode<Out = InnerOut>,
831        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
832    > MappingDecoder<T, InnerOut, Inner, Conv>
833    {
834        pub fn new(inner: Inner, conv: Conv) -> Self {
835            Self { inner, conv }
836        }
837    }
838
839    impl<
840        T,
841        InnerOut,
842        Inner: AvroDecode<Out = InnerOut>,
843        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
844    > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
845    {
846        type Out = T;
847
848        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
849            mut self,
850            a: &mut A,
851        ) -> Result<Self::Out, AvroError> {
852            (self.conv)(self.inner.record(a)?)
853        }
854
855        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
856            mut self,
857            idx: usize,
858            n_variants: usize,
859            null_variant: Option<usize>,
860            deserializer: D,
861            reader: &'a mut R,
862        ) -> Result<Self::Out, AvroError> {
863            (self.conv)(self.inner.union_branch(
864                idx,
865                n_variants,
866                null_variant,
867                deserializer,
868                reader,
869            )?)
870        }
871
872        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
873            (self.conv)(self.inner.array(a)?)
874        }
875
876        fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
877            (self.conv)(self.inner.map(m)?)
878        }
879
880        fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
881            (self.conv)(self.inner.enum_variant(symbol, idx)?)
882        }
883
884        fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
885            (self.conv)(self.inner.scalar(scalar)?)
886        }
887
888        fn decimal<'a, R: AvroRead>(
889            mut self,
890            precision: usize,
891            scale: usize,
892            r: ValueOrReader<'a, &'a [u8], R>,
893        ) -> Result<Self::Out, AvroError> {
894            (self.conv)(self.inner.decimal(precision, scale, r)?)
895        }
896
897        fn bytes<'a, R: AvroRead>(
898            mut self,
899            r: ValueOrReader<'a, &'a [u8], R>,
900        ) -> Result<Self::Out, AvroError> {
901            (self.conv)(self.inner.bytes(r)?)
902        }
903
904        fn string<'a, R: AvroRead>(
905            mut self,
906            r: ValueOrReader<'a, &'a str, R>,
907        ) -> Result<Self::Out, AvroError> {
908            (self.conv)(self.inner.string(r)?)
909        }
910
911        fn json<'a, R: AvroRead>(
912            mut self,
913            r: ValueOrReader<'a, &'a serde_json::Value, R>,
914        ) -> Result<Self::Out, AvroError> {
915            (self.conv)(self.inner.json(r)?)
916        }
917
918        fn uuid<'a, R: AvroRead>(
919            mut self,
920            r: ValueOrReader<'a, &'a [u8], R>,
921        ) -> Result<Self::Out, AvroError> {
922            (self.conv)(self.inner.uuid(r)?)
923        }
924
925        fn fixed<'a, R: AvroRead>(
926            mut self,
927            r: ValueOrReader<'a, &'a [u8], R>,
928        ) -> Result<Self::Out, AvroError> {
929            (self.conv)(self.inner.fixed(r)?)
930        }
931    }
932    pub struct ArrayAsVecDecoder<
933        InnerOut,
934        Inner: AvroDecode<Out = InnerOut>,
935        Ctor: FnMut() -> Inner,
936    > {
937        ctor: Ctor,
938        buf: Vec<InnerOut>,
939    }
940
941    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
942        ArrayAsVecDecoder<InnerOut, Inner, Ctor>
943    {
944        pub fn new(ctor: Ctor) -> Self {
945            Self { ctor, buf: vec![] }
946        }
947    }
948    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
949        for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
950    {
951        type Out = Vec<InnerOut>;
952        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
953            while let Some(next) = a.decode_next((self.ctor)())? {
954                self.buf.push(next);
955            }
956            Ok(self.buf)
957        }
958        define_unexpected! {
959            record, union_branch, map, enum_variant,
960            scalar, decimal, bytes, string, json, uuid,
961            fixed
962        }
963    }
964
965    pub struct DefaultArrayAsVecDecoder<T> {
966        buf: Vec<T>,
967    }
968    impl<T> Default for DefaultArrayAsVecDecoder<T> {
969        fn default() -> Self {
970            Self { buf: vec![] }
971        }
972    }
973    impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
974        type Out = Vec<T>;
975        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
976            while let Some(next) = {
977                let inner = T::new_decoder();
978                a.decode_next(inner)?
979            } {
980                self.buf.push(next);
981            }
982            Ok(self.buf)
983        }
984        define_unexpected! {
985            record, union_branch, map, enum_variant,
986            scalar, decimal, bytes, string, json, uuid,
987            fixed
988        }
989    }
990    impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
991        type Decoder = DefaultArrayAsVecDecoder<T>;
992        type State = ();
993
994        fn new_decoder(_state: Self::State) -> Self::Decoder {
995            DefaultArrayAsVecDecoder::<T>::default()
996        }
997    }
998    pub struct TrivialDecoder;
999
1000    impl TrivialDecoder {
1001        fn maybe_skip<'a, V, R: AvroRead>(
1002            self,
1003            r: ValueOrReader<'a, V, R>,
1004        ) -> Result<(), AvroError> {
1005            if let ValueOrReader::Reader { len, r } = r {
1006                Ok(r.skip(len)?)
1007            } else {
1008                Ok(())
1009            }
1010        }
1011    }
1012
1013    impl AvroDecode for TrivialDecoder {
1014        type Out = ();
1015        fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
1016            while let Some((_, _, f)) = a.next_field()? {
1017                f.decode_field(TrivialDecoder)?;
1018            }
1019            Ok(())
1020        }
1021        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1022            self,
1023            _idx: usize,
1024            _n_variants: usize,
1025            _null_variant: Option<usize>,
1026            deserializer: D,
1027            reader: &'a mut R,
1028        ) -> Result<(), AvroError> {
1029            deserializer.deserialize(reader, self)
1030        }
1031
1032        fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1033            Ok(())
1034        }
1035        fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1036            Ok(())
1037        }
1038        fn decimal<'a, R: AvroRead>(
1039            self,
1040            _precision: usize,
1041            _scale: usize,
1042            r: ValueOrReader<'a, &'a [u8], R>,
1043        ) -> Result<(), AvroError> {
1044            self.maybe_skip(r)
1045        }
1046        fn bytes<'a, R: AvroRead>(
1047            self,
1048            r: ValueOrReader<'a, &'a [u8], R>,
1049        ) -> Result<(), AvroError> {
1050            self.maybe_skip(r)
1051        }
1052        fn string<'a, R: AvroRead>(
1053            self,
1054            r: ValueOrReader<'a, &'a str, R>,
1055        ) -> Result<(), AvroError> {
1056            self.maybe_skip(r)
1057        }
1058        fn json<'a, R: AvroRead>(
1059            self,
1060            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1061        ) -> Result<(), AvroError> {
1062            self.maybe_skip(r)
1063        }
1064        fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1065            self.maybe_skip(r)
1066        }
1067        fn fixed<'a, R: AvroRead>(
1068            self,
1069            r: ValueOrReader<'a, &'a [u8], R>,
1070        ) -> Result<(), AvroError> {
1071            self.maybe_skip(r)
1072        }
1073        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1074            while a.decode_next(TrivialDecoder)?.is_some() {}
1075            Ok(())
1076        }
1077
1078        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1079            while let Some((_n, entry)) = m.next_entry()? {
1080                entry.decode_field(TrivialDecoder)?
1081            }
1082            Ok(())
1083        }
1084    }
1085    pub struct ValueDecoder;
1086    impl AvroDecode for ValueDecoder {
1087        type Out = Value;
1088        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1089            self,
1090            a: &mut A,
1091        ) -> Result<Value, AvroError> {
1092            let mut fields = vec![];
1093            while let Some((name, idx, f)) = a.next_field()? {
1094                let next = ValueDecoder;
1095                let val = f.decode_field(next)?;
1096                fields.push((idx, (name.to_string(), val)));
1097            }
1098            fields.sort_by_key(|(idx, _)| *idx);
1099
1100            Ok(Value::Record(
1101                fields
1102                    .into_iter()
1103                    .map(|(_idx, (name, val))| (name, val))
1104                    .collect(),
1105            ))
1106        }
1107        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1108            self,
1109            index: usize,
1110            n_variants: usize,
1111            null_variant: Option<usize>,
1112            deserializer: D,
1113            reader: &'a mut R,
1114        ) -> Result<Value, AvroError> {
1115            let next = ValueDecoder;
1116            let inner = Box::new(deserializer.deserialize(reader, next)?);
1117            Ok(Value::Union {
1118                index,
1119                inner,
1120                n_variants,
1121                null_variant,
1122            })
1123        }
1124        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1125            let mut items = vec![];
1126            loop {
1127                let next = ValueDecoder;
1128
1129                if let Some(value) = a.decode_next(next)? {
1130                    items.push(value)
1131                } else {
1132                    break;
1133                }
1134            }
1135            Ok(Value::Array(items))
1136        }
1137        fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1138            Ok(Value::Enum(idx, symbol.to_string()))
1139        }
1140        fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1141            Ok(scalar.into())
1142        }
1143        fn decimal<'a, R: AvroRead>(
1144            self,
1145            precision: usize,
1146            scale: usize,
1147            r: ValueOrReader<'a, &'a [u8], R>,
1148        ) -> Result<Value, AvroError> {
1149            let unscaled = match r {
1150                ValueOrReader::Value(buf) => buf.to_vec(),
1151                ValueOrReader::Reader { len, r } => {
1152                    let mut buf = vec![];
1153                    buf.resize_with(len, Default::default);
1154                    r.read_exact(&mut buf)?;
1155                    buf
1156                }
1157            };
1158            Ok(Value::Decimal(DecimalValue {
1159                unscaled,
1160                precision,
1161                scale,
1162            }))
1163        }
1164        fn bytes<'a, R: AvroRead>(
1165            self,
1166            r: ValueOrReader<'a, &'a [u8], R>,
1167        ) -> Result<Value, AvroError> {
1168            let buf = match r {
1169                ValueOrReader::Value(buf) => buf.to_vec(),
1170                ValueOrReader::Reader { len, r } => {
1171                    let mut buf = vec![];
1172                    buf.resize_with(len, Default::default);
1173                    r.read_exact(&mut buf)?;
1174                    buf
1175                }
1176            };
1177            Ok(Value::Bytes(buf))
1178        }
1179        fn string<'a, R: AvroRead>(
1180            self,
1181            r: ValueOrReader<'a, &'a str, R>,
1182        ) -> Result<Value, AvroError> {
1183            let s = match r {
1184                ValueOrReader::Value(s) => s.to_string(),
1185                ValueOrReader::Reader { len, r } => {
1186                    let mut buf = vec![];
1187                    buf.resize_with(len, Default::default);
1188                    r.read_exact(&mut buf)?;
1189                    String::from_utf8(buf)
1190                        .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1191                }
1192            };
1193            Ok(Value::String(s))
1194        }
1195        fn json<'a, R: AvroRead>(
1196            self,
1197            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1198        ) -> Result<Value, AvroError> {
1199            let val = match r {
1200                ValueOrReader::Value(val) => val.clone(),
1201                ValueOrReader::Reader { len, r } => {
1202                    let mut buf = vec![];
1203                    buf.resize_with(len, Default::default);
1204                    r.read_exact(&mut buf)?;
1205                    serde_json::from_slice(&buf).map_err(|e| {
1206                        AvroError::Decode(DecodeError::BadJson {
1207                            category: e.classify(),
1208                            bytes: buf.to_owned(),
1209                        })
1210                    })?
1211                }
1212            };
1213            Ok(Value::Json(val))
1214        }
1215        fn uuid<'a, R: AvroRead>(
1216            self,
1217            r: ValueOrReader<'a, &'a [u8], R>,
1218        ) -> Result<Value, AvroError> {
1219            let buf = match r {
1220                ValueOrReader::Value(val) => val.to_vec(),
1221                ValueOrReader::Reader { len, r } => {
1222                    let mut buf = vec![];
1223                    buf.resize_with(len, Default::default);
1224                    r.read_exact(&mut buf)?;
1225                    buf
1226                }
1227            };
1228            let s = std::str::from_utf8(&buf)
1229                .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1230            let val =
1231                uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1232            Ok(Value::Uuid(val))
1233        }
1234        fn fixed<'a, R: AvroRead>(
1235            self,
1236            r: ValueOrReader<'a, &'a [u8], R>,
1237        ) -> Result<Value, AvroError> {
1238            let buf = match r {
1239                ValueOrReader::Value(buf) => buf.to_vec(),
1240                ValueOrReader::Reader { len, r } => {
1241                    let mut buf = vec![];
1242                    buf.resize_with(len, Default::default);
1243                    r.read_exact(&mut buf)?;
1244                    buf
1245                }
1246            };
1247            Ok(Value::Fixed(buf.len(), buf))
1248        }
1249        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1250            let mut entries = BTreeMap::new();
1251            while let Some((name, a)) = m.next_entry()? {
1252                let d = ValueDecoder;
1253                let val = a.decode_field(d)?;
1254                entries.insert(name, val);
1255            }
1256            Ok(Value::Map(entries))
1257        }
1258    }
1259}
1260
1261impl<'a> AvroDeserializer for &'a Value {
1262    fn deserialize<R: AvroRead, D: AvroDecode>(
1263        self,
1264        _r: &mut R,
1265        d: D,
1266    ) -> Result<D::Out, AvroError> {
1267        give_value(d, self)
1268    }
1269}
1270
1271pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1272    use ValueOrReader::Value as V;
1273    match v {
1274        Value::Null => d.scalar(Scalar::Null),
1275        Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1276        Value::Int(val) => d.scalar(Scalar::Int(*val)),
1277        Value::Long(val) => d.scalar(Scalar::Long(*val)),
1278        Value::Float(val) => d.scalar(Scalar::Float(*val)),
1279        Value::Double(val) => d.scalar(Scalar::Double(*val)),
1280        Value::Date(val) => d.scalar(Scalar::Date(*val)),
1281        Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1282        // The &[u8] parameter here (and elsewhere in this function) is arbitrary, but we have to put in something in order for the function
1283        // to type-check
1284        Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1285        Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1286        Value::String(val) => d.string::<&[u8]>(V(val)),
1287        Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1288        Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1289        Value::Union {
1290            index,
1291            inner,
1292            n_variants,
1293            null_variant,
1294        } => {
1295            let mut empty_reader: &[u8] = &[];
1296            d.union_branch(
1297                *index,
1298                *n_variants,
1299                *null_variant,
1300                &**inner,
1301                &mut empty_reader,
1302            )
1303        }
1304        Value::Array(val) => {
1305            let mut a = ValueArrayAccess::new(val);
1306            d.array(&mut a)
1307        }
1308        Value::Map(val) => {
1309            let vals: Vec<_> = val.clone().into_iter().collect();
1310            let mut m = ValueMapAccess::new(vals.as_slice());
1311            d.map(&mut m)
1312        }
1313        Value::Record(val) => {
1314            let mut a = ValueRecordAccess::new(val);
1315            d.record(&mut a)
1316        }
1317        Value::Json(val) => d.json::<&[u8]>(V(val)),
1318        Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1319    }
1320}
1321
1322pub trait AvroDeserializer {
1323    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1324}
1325
1326#[derive(Clone, Copy)]
1327pub struct GeneralDeserializer<'a> {
1328    pub schema: SchemaNode<'a>,
1329}
1330
1331impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1332    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1333        use ValueOrReader::Reader;
1334        match self.schema.inner {
1335            SchemaPiece::Null => d.scalar(Scalar::Null),
1336            SchemaPiece::Boolean => {
1337                let mut buf = [0u8; 1];
1338                r.read_exact(&mut buf[..])?;
1339                let val = match buf[0] {
1340                    0u8 => false,
1341                    1u8 => true,
1342                    other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
1343                };
1344                d.scalar(Scalar::Boolean(val))
1345            }
1346            SchemaPiece::Int => {
1347                let val = zag_i32(r)?;
1348                d.scalar(Scalar::Int(val))
1349            }
1350            SchemaPiece::Long => {
1351                let val = zag_i64(r)?;
1352                d.scalar(Scalar::Long(val))
1353            }
1354            SchemaPiece::Float => {
1355                let val = decode_float(r)?;
1356                d.scalar(Scalar::Float(val))
1357            }
1358            SchemaPiece::Double => {
1359                let val = decode_double(r)?;
1360                d.scalar(Scalar::Double(val))
1361            }
1362            SchemaPiece::Date => {
1363                let days = zag_i32(r)?;
1364                d.scalar(Scalar::Date(days))
1365            }
1366            SchemaPiece::TimestampMilli => {
1367                let total_millis = zag_i64(r)?;
1368                let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
1369                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1370                    _ => unreachable!(),
1371                };
1372                d.scalar(scalar)
1373            }
1374            SchemaPiece::TimestampMicro => {
1375                let total_micros = zag_i64(r)?;
1376                let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
1377                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1378                    _ => unreachable!(),
1379                };
1380                d.scalar(scalar)
1381            }
1382            SchemaPiece::Decimal {
1383                precision,
1384                scale,
1385                fixed_size,
1386            } => {
1387                let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
1388                d.decimal(*precision, *scale, Reader { len, r })
1389            }
1390            SchemaPiece::Bytes => {
1391                let len = decode_len(r)?;
1392                d.bytes(Reader { len, r })
1393            }
1394            SchemaPiece::String => {
1395                let len = decode_len(r)?;
1396                d.string(Reader { len, r })
1397            }
1398            SchemaPiece::Json => {
1399                let len = decode_len(r)?;
1400                d.json(Reader { len, r })
1401            }
1402            SchemaPiece::Uuid => {
1403                let len = decode_len(r)?;
1404                d.uuid(Reader { len, r })
1405            }
1406            SchemaPiece::Array(inner) => {
1407                // From the spec:
1408                // Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
1409                // If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
1410
1411                let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
1412                d.array(&mut a)
1413            }
1414            SchemaPiece::Map(inner) => {
1415                // See logic for `SchemaPiece::Array` above. Maps are encoded similarly.
1416                let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
1417                d.map(&mut m)
1418            }
1419            SchemaPiece::Union(inner) => {
1420                let index = decode_long_nonneg(r)? as usize;
1421                let variants = inner.variants();
1422                match variants.get(index) {
1423                    Some(variant) => {
1424                        let n_variants = variants.len();
1425                        let null_variant = variants
1426                            .iter()
1427                            .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
1428                        let dsr = GeneralDeserializer {
1429                            schema: self.schema.step(variant),
1430                        };
1431                        d.union_branch(index, n_variants, null_variant, dsr, r)
1432                    }
1433                    None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
1434                        index,
1435                        len: variants.len(),
1436                    })),
1437                }
1438            }
1439            SchemaPiece::ResolveIntLong => {
1440                let val = zag_i32(r)? as i64;
1441                d.scalar(Scalar::Long(val))
1442            }
1443            SchemaPiece::ResolveIntFloat => {
1444                let val = zag_i32(r)? as f32;
1445                d.scalar(Scalar::Float(val))
1446            }
1447            SchemaPiece::ResolveIntDouble => {
1448                let val = zag_i32(r)? as f64;
1449                d.scalar(Scalar::Double(val))
1450            }
1451            SchemaPiece::ResolveLongFloat => {
1452                let val = zag_i64(r)? as f32;
1453                d.scalar(Scalar::Float(val))
1454            }
1455            SchemaPiece::ResolveLongDouble => {
1456                let val = zag_i64(r)? as f64;
1457                d.scalar(Scalar::Double(val))
1458            }
1459            SchemaPiece::ResolveFloatDouble => {
1460                let val = decode_float(r)? as f64;
1461                d.scalar(Scalar::Double(val))
1462            }
1463            SchemaPiece::ResolveConcreteUnion {
1464                index,
1465                inner,
1466                n_reader_variants,
1467                reader_null_variant,
1468            } => {
1469                let dsr = GeneralDeserializer {
1470                    schema: self.schema.step(&**inner),
1471                };
1472                d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1473            }
1474            SchemaPiece::ResolveUnionUnion {
1475                permutation,
1476                n_reader_variants,
1477                reader_null_variant,
1478            } => {
1479                let index = decode_long_nonneg(r)? as usize;
1480                if index >= permutation.len() {
1481                    return Err(AvroError::Decode(DecodeError::BadUnionIndex {
1482                        index,
1483                        len: permutation.len(),
1484                    }));
1485                }
1486                match &permutation[index] {
1487                    Err(e) => Err(e.clone()),
1488                    Ok((index, variant)) => {
1489                        let dsr = GeneralDeserializer {
1490                            schema: self.schema.step(variant),
1491                        };
1492                        d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1493                    }
1494                }
1495            }
1496            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1497                let found_index = decode_long_nonneg(r)? as usize;
1498                if *index != found_index {
1499                    Err(AvroError::Decode(DecodeError::WrongUnionIndex {
1500                        expected: *index,
1501                        actual: found_index,
1502                    }))
1503                } else {
1504                    let dsr = GeneralDeserializer {
1505                        schema: self.schema.step(inner.as_ref()),
1506                    };
1507                    // The reader is not expecting a union here, so don't call `D::union_branch`
1508                    dsr.deserialize(r, d)
1509                }
1510            }
1511            SchemaPiece::Record {
1512                doc: _,
1513                fields,
1514                lookup: _,
1515            } => {
1516                let mut a = SimpleRecordAccess::new(self.schema, r, fields);
1517                d.record(&mut a)
1518            }
1519            SchemaPiece::Enum {
1520                symbols,
1521                doc: _,
1522                default_idx: _,
1523            } => {
1524                let index = decode_int_nonneg(r)? as usize;
1525                match symbols.get(index) {
1526                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1527                        index,
1528                        len: symbols.len(),
1529                    })),
1530                    Some(symbol) => d.enum_variant(symbol, index),
1531                }
1532            }
1533            SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
1534            // XXX - This does not deliver fields to the consumer in the same order they were
1535            // declared in the reader schema, which might cause headache for consumers...
1536            // Unfortunately, there isn't a good way to do so without pre-decoding the whole record
1537            // (which would require a lot of allocations)
1538            // and then sorting the fields. So, just let the consumer deal with re-ordering.
1539            SchemaPiece::ResolveRecord {
1540                defaults,
1541                fields,
1542                n_reader_fields: _,
1543            } => {
1544                let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
1545                d.record(&mut a)
1546            }
1547            SchemaPiece::ResolveEnum {
1548                doc: _,
1549                symbols,
1550                default,
1551            } => {
1552                let index = decode_int_nonneg(r)? as usize;
1553                match symbols.get(index) {
1554                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1555                        index,
1556                        len: symbols.len(),
1557                    })),
1558                    Some(op) => match op {
1559                        Err(missing) => {
1560                            if let Some((reader_index, symbol)) = default.clone() {
1561                                d.enum_variant(&symbol, reader_index)
1562                            } else {
1563                                Err(AvroError::Decode(DecodeError::MissingEnumIndex {
1564                                    index,
1565                                    symbol: missing.clone(),
1566                                }))
1567                            }
1568                        }
1569                        Ok((index, name)) => d.enum_variant(name, *index),
1570                    },
1571                }
1572            }
1573            SchemaPiece::ResolveIntTsMilli => {
1574                let total_millis = zag_i32(r)?;
1575                let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
1576                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1577                    _ => unreachable!(),
1578                };
1579                d.scalar(scalar)
1580            }
1581            SchemaPiece::ResolveIntTsMicro => {
1582                let total_micros = zag_i32(r)?;
1583                let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
1584                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1585                    _ => unreachable!(),
1586                };
1587                d.scalar(scalar)
1588            }
1589            SchemaPiece::ResolveDateTimestamp => {
1590                let days = zag_i32(r)?;
1591
1592                let date = NaiveDate::from_ymd_opt(1970, 1, 1)
1593                    .expect("naive date known valid")
1594                    .checked_add_signed(
1595                        chrono::Duration::try_days(days.into())
1596                            .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
1597                    )
1598                    .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
1599                let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
1600                d.scalar(Scalar::Timestamp(dt))
1601            }
1602        }
1603    }
1604}
1605/// Decode a `Value` from avro format given its `Schema`.
1606pub fn decode<'a, R: AvroRead>(
1607    schema: SchemaNode<'a>,
1608    reader: &'a mut R,
1609) -> Result<Value, AvroError> {
1610    let d = ValueDecoder;
1611    let dsr = GeneralDeserializer { schema };
1612    let val = dsr.deserialize(reader, d)?;
1613    Ok(val)
1614}