mz_avro/
decode.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use std::cmp;
25use std::fmt::{self, Display};
26use std::fs::File;
27use std::io::{self, Cursor, Read, Seek, SeekFrom};
28
29use chrono::{DateTime, NaiveDate};
30use flate2::read::MultiGzDecoder;
31
32use crate::error::{DecodeError, Error as AvroError};
33use crate::schema::{
34    RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece,
35    SchemaPieceOrNamed,
36};
37use crate::types::{Scalar, Value};
38use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
39use crate::{TrivialDecoder, ValueDecoder};
40
41pub trait StatefulAvroDecodable: Sized {
42    type Decoder: AvroDecode<Out = Self>;
43    type State;
44    fn new_decoder(state: Self::State) -> Self::Decoder;
45}
46pub trait AvroDecodable: Sized {
47    type Decoder: AvroDecode<Out = Self>;
48
49    fn new_decoder() -> Self::Decoder;
50}
51impl<T> AvroDecodable for T
52where
53    T: StatefulAvroDecodable,
54    T::State: Default,
55{
56    type Decoder = <Self as StatefulAvroDecodable>::Decoder;
57
58    fn new_decoder() -> Self::Decoder {
59        <Self as StatefulAvroDecodable>::new_decoder(Default::default())
60    }
61}
62#[inline]
63fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
64    let u = match zag_i64(reader)? {
65        i if i >= 0 => i as u64,
66        i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
67    };
68    Ok(u)
69}
70
71fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
72    let u = match zag_i32(reader)? {
73        i if i >= 0 => i as u32,
74        i => {
75            return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
76                i as i64,
77            )));
78        }
79    };
80    Ok(u)
81}
82
83#[inline]
84fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
85    zag_i64(reader).and_then(|i| safe_len(i as usize))
86}
87
88#[inline]
89fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
90    let mut buf = [0u8; 4];
91    reader.read_exact(&mut buf[..])?;
92    Ok(f32::from_le_bytes(buf))
93}
94
95#[inline]
96fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
97    let mut buf = [0u8; 8];
98    reader.read_exact(&mut buf[..])?;
99    Ok(f64::from_le_bytes(buf))
100}
101
102impl Display for TsUnit {
103    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104        match self {
105            TsUnit::Millis => write!(f, "ms"),
106            TsUnit::Micros => write!(f, "us"),
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use chrono::DateTime;
114
115    use crate::types::Value;
116    use crate::util::TsUnit;
117
118    use super::build_ts_value;
119
120    #[mz_ore::test]
121    fn test_negative_timestamps() {
122        assert_eq!(
123            build_ts_value(-1, TsUnit::Millis).unwrap(),
124            Value::Timestamp(
125                DateTime::from_timestamp(-1, 999_000_000)
126                    .unwrap()
127                    .naive_utc()
128            )
129        );
130        assert_eq!(
131            build_ts_value(-1000, TsUnit::Millis).unwrap(),
132            Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
133        );
134        assert_eq!(
135            build_ts_value(-1000, TsUnit::Micros).unwrap(),
136            Value::Timestamp(
137                DateTime::from_timestamp(-1, 999_000_000)
138                    .unwrap()
139                    .naive_utc()
140            )
141        );
142        assert_eq!(
143            build_ts_value(-1, TsUnit::Micros).unwrap(),
144            Value::Timestamp(
145                DateTime::from_timestamp(-1, 999_999_000)
146                    .unwrap()
147                    .naive_utc()
148            )
149        );
150        assert_eq!(
151            build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
152            Value::Timestamp(
153                DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
154                    .unwrap()
155                    .naive_utc()
156            )
157        );
158    }
159}
160
161/// A convenience function to build timestamp values from underlying longs.
162pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
163    let result = match unit {
164        TsUnit::Millis => DateTime::from_timestamp_millis(value),
165        TsUnit::Micros => DateTime::from_timestamp_micros(value),
166    };
167    let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
168    Ok(Value::Timestamp(ndt.naive_utc()))
169}
170
171/// A convenience trait for types that are both readable and skippable.
172///
173/// A blanket implementation is provided for all types that implement both
174/// [`Read`] and [`Skip`].
175pub trait AvroRead: Read + Skip {}
176
177impl<T> AvroRead for T where T: Read + Skip {}
178
179/// A trait that allows for efficient skipping forward while reading data.
180pub trait Skip: Read {
181    /// Advance the cursor by `len` bytes.
182    ///
183    /// If possible, the implementation should be more efficient than calling
184    /// [`Read::read`] and discarding the resulting bytes.
185    ///
186    /// Calling `skip` with a `len` that advances the cursor past the end of the
187    /// underlying data source is permissible. The only requirement is that the
188    /// next call to [`Read::read`] indicates EOF.
189    ///
190    /// # Errors
191    ///
192    /// Can return an error in all the same cases that [`Read::read`] can.
193    ///
194    /// TODO: Remove this clippy suppression when the issue is fixed.
195    /// See <https://github.com/rust-lang/rust-clippy/issues/12519>
196    #[allow(clippy::unused_io_amount)]
197    fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
198        const BUF_SIZE: usize = 512;
199        let mut buf = [0; BUF_SIZE];
200
201        while len > 0 {
202            let n = if len < BUF_SIZE {
203                self.read(&mut buf[..len])?
204            } else {
205                self.read(&mut buf)?
206            };
207            if n == 0 {
208                break;
209            }
210            len -= n;
211        }
212        Ok(())
213    }
214}
215
216impl Skip for File {
217    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
218        self.seek(SeekFrom::Current(len as i64))?;
219        Ok(())
220    }
221}
222
223impl Skip for &[u8] {
224    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
225        let len = cmp::min(len, self.len());
226        *self = &self[len..];
227        Ok(())
228    }
229}
230
231impl<S: Skip + ?Sized> Skip for Box<S> {
232    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
233        self.as_mut().skip(len)
234    }
235}
236
237impl<T: AsRef<[u8]>> Skip for Cursor<T> {
238    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
239        self.seek(SeekFrom::Current(len as i64))?;
240        Ok(())
241    }
242}
243
244impl<R: Read> Skip for MultiGzDecoder<R> {}
245
246pub enum ValueOrReader<'a, V, R: AvroRead> {
247    Value(V),
248    Reader { len: usize, r: &'a mut R },
249}
250
251enum SchemaOrDefault<'b, R: AvroRead> {
252    Schema(&'b mut R, SchemaNode<'b>),
253    Default(&'b Value),
254}
255pub struct AvroFieldAccess<'b, R: AvroRead> {
256    schema: SchemaOrDefault<'b, R>,
257}
258
259impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
260    pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
261        match self.schema {
262            SchemaOrDefault::Schema(r, schema) => {
263                let des = GeneralDeserializer { schema };
264                des.deserialize(r, d)
265            }
266            SchemaOrDefault::Default(value) => give_value(d, value),
267        }
268    }
269}
270
271pub trait AvroRecordAccess<R: AvroRead> {
272    fn next_field<'b>(
273        &'b mut self,
274    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
275}
276
277struct SimpleRecordAccess<'a, R: AvroRead> {
278    schema: SchemaNode<'a>,
279    r: &'a mut R,
280    fields: &'a [RecordField],
281    i: usize,
282}
283
284impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
285    fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
286        Self {
287            schema,
288            r,
289            fields,
290            i: 0,
291        }
292    }
293}
294
295impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
296    fn next_field<'b>(
297        &'b mut self,
298    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
299        assert!(self.i <= self.fields.len());
300        if self.i == self.fields.len() {
301            Ok(None)
302        } else {
303            let f = &self.fields[self.i];
304            self.i += 1;
305            Ok(Some((
306                f.name.as_str(),
307                f.position,
308                AvroFieldAccess {
309                    schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
310                },
311            )))
312        }
313    }
314}
315
316struct ValueRecordAccess<'a> {
317    values: &'a [(String, Value)],
318    i: usize,
319}
320
321impl<'a> ValueRecordAccess<'a> {
322    fn new(values: &'a [(String, Value)]) -> Self {
323        Self { values, i: 0 }
324    }
325}
326
327impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
328    fn next_field<'b>(
329        &'b mut self,
330    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
331        assert!(self.i <= self.values.len());
332        if self.i == self.values.len() {
333            Ok(None)
334        } else {
335            let (name, val) = &self.values[self.i];
336            self.i += 1;
337            Ok(Some((
338                name.as_str(),
339                self.i - 1,
340                AvroFieldAccess {
341                    schema: SchemaOrDefault::Default(val),
342                },
343            )))
344        }
345    }
346}
347
348struct ValueMapAccess<'a> {
349    values: &'a [(String, Value)],
350    i: usize,
351}
352
353impl<'a> ValueMapAccess<'a> {
354    fn new(values: &'a [(String, Value)]) -> Self {
355        Self { values, i: 0 }
356    }
357}
358
359impl<'a> AvroMapAccess for ValueMapAccess<'a> {
360    type R = &'a [u8];
361    fn next_entry<'b>(
362        &'b mut self,
363    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
364        assert!(self.i <= self.values.len());
365        if self.i == self.values.len() {
366            Ok(None)
367        } else {
368            let (name, val) = &self.values[self.i];
369            self.i += 1;
370            Ok(Some((
371                name.clone(),
372                AvroFieldAccess {
373                    schema: SchemaOrDefault::Default(val),
374                },
375            )))
376        }
377    }
378}
379
380struct ResolvedRecordAccess<'a, R: AvroRead> {
381    defaults: &'a [ResolvedDefaultValueField],
382    i_defaults: usize,
383    fields: &'a [ResolvedRecordField],
384    i_fields: usize,
385    r: &'a mut R,
386    schema: SchemaNode<'a>,
387}
388
389impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
390    fn new(
391        defaults: &'a [ResolvedDefaultValueField],
392        fields: &'a [ResolvedRecordField],
393        r: &'a mut R,
394        schema: SchemaNode<'a>,
395    ) -> Self {
396        Self {
397            defaults,
398            i_defaults: 0,
399            fields,
400            i_fields: 0,
401            r,
402            schema,
403        }
404    }
405}
406
407impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
408    fn next_field<'b>(
409        &'b mut self,
410    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
411        assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
412        if self.i_defaults < self.defaults.len() {
413            let default = &self.defaults[self.i_defaults];
414            self.i_defaults += 1;
415            Ok(Some((
416                default.name.as_str(),
417                default.position,
418                AvroFieldAccess {
419                    schema: SchemaOrDefault::Default(&default.default),
420                },
421            )))
422        } else {
423            while self.i_fields < self.fields.len() {
424                let field = &self.fields[self.i_fields];
425                self.i_fields += 1;
426                match field {
427                    ResolvedRecordField::Absent(absent_schema) => {
428                        // we don't care what's in the value, but we still need to read it in order to skip ahead the proper amount in the input.
429                        let d = GeneralDeserializer {
430                            schema: absent_schema.top_node(),
431                        };
432                        d.deserialize(self.r, TrivialDecoder)?;
433                        continue;
434                    }
435                    ResolvedRecordField::Present(field) => {
436                        return Ok(Some((
437                            field.name.as_str(),
438                            field.position,
439                            AvroFieldAccess {
440                                schema: SchemaOrDefault::Schema(
441                                    self.r,
442                                    self.schema.step(&field.schema),
443                                ),
444                            },
445                        )));
446                    }
447                }
448            }
449            Ok(None)
450        }
451    }
452}
453
454pub trait AvroArrayAccess {
455    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
456}
457
458pub trait AvroMapAccess {
459    type R: AvroRead;
460    fn next_entry<'b>(
461        &'b mut self,
462    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
463}
464
465pub struct SimpleMapAccess<'a, R: AvroRead> {
466    entry_schema: SchemaNode<'a>,
467    r: &'a mut R,
468    done: bool,
469    remaining: usize,
470}
471
472impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
473    fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
474        Self {
475            entry_schema,
476            r,
477            done: false,
478            remaining: 0,
479        }
480    }
481}
482
483impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
484    type R = R;
485    fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
486        if self.done {
487            return Ok(None);
488        }
489        if self.remaining == 0 {
490            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
491            let (len, _len_in_bytes) = match zag_i64(self.r)? {
492                len if len > 0 => (len as usize, None),
493                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
494                0 => {
495                    self.done = true;
496                    return Ok(None);
497                }
498                _ => unreachable!(),
499            };
500            self.remaining = len;
501        }
502        assert!(self.remaining > 0);
503        self.remaining -= 1;
504
505        // TODO - We can try to avoid this allocation, but  nobody uses maps in Materialize
506        // right now so it doesn't really matter.
507        let key_len = decode_len(self.r)?;
508        let mut key_buf = vec![];
509        key_buf.resize_with(key_len, Default::default);
510        self.r.read_exact(&mut key_buf)?;
511        let key = String::from_utf8(key_buf)
512            .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
513
514        let a = AvroFieldAccess {
515            schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
516        };
517        Ok(Some((key, a)))
518    }
519}
520
521struct SimpleArrayAccess<'a, R: AvroRead> {
522    r: &'a mut R,
523    schema: SchemaNode<'a>,
524    remaining: usize,
525    done: bool,
526}
527
528impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
529    fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
530        Self {
531            r,
532            schema,
533            remaining: 0,
534            done: false,
535        }
536    }
537}
538
539struct ValueArrayAccess<'a> {
540    values: &'a [Value],
541    i: usize,
542}
543
544impl<'a> ValueArrayAccess<'a> {
545    fn new(values: &'a [Value]) -> Self {
546        Self { values, i: 0 }
547    }
548}
549
550impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
551    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
552        assert!(self.i <= self.values.len());
553        if self.i == self.values.len() {
554            Ok(None)
555        } else {
556            let val = give_value(d, &self.values[self.i])?;
557            self.i += 1;
558            Ok(Some(val))
559        }
560    }
561}
562
563impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
564    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
565        if self.done {
566            return Ok(None);
567        }
568        if self.remaining == 0 {
569            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
570            let (len, _len_in_bytes) = match zag_i64(self.r)? {
571                len if len > 0 => (len as usize, None),
572                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
573                0 => {
574                    self.done = true;
575                    return Ok(None);
576                }
577                _ => unreachable!(),
578            };
579            self.remaining = len;
580        }
581        assert!(self.remaining > 0);
582        self.remaining -= 1;
583        let des = GeneralDeserializer {
584            schema: self.schema,
585        };
586        des.deserialize(self.r, d).map(Some)
587    }
588}
589
590#[macro_export]
591macro_rules! define_unexpected {
592    (record) => {
593        fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
594            self,
595            _a: &mut A,
596        ) -> Result<Self::Out, $crate::error::Error> {
597            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
598        }
599    };
600    (union_branch) => {
601        fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
602            self,
603            _idx: usize,
604            _n_variants: usize,
605            _null_variant: Option<usize>,
606            _deserializer: D,
607            _reader: &'avro_macro_lifetime mut R,
608        ) -> Result<Self::Out, $crate::error::Error> {
609            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
610        }
611    };
612    (array) => {
613        fn array<A: $crate::AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, $crate::error::Error> {
614            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedArray))
615        }
616    };
617    (map) => {
618        fn map<M: $crate::AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, $crate::error::Error> {
619            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedMap))
620        }
621    };
622    (enum_variant) => {
623        fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, $crate::error::Error> {
624            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedEnum))
625        }
626    };
627    (scalar) => {
628        fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
629            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
630        }
631    };
632    (decimal) => {
633        fn decimal<'avro_macro_lifetime, R: AvroRead>(
634            self,
635            _precision: usize,
636            _scale: usize,
637            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
638        ) -> Result<Self::Out, $crate::error::Error> {
639            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
640        }
641    };
642    (bytes) => {
643        fn bytes<'avro_macro_lifetime, R: AvroRead>(
644            self,
645            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
646        ) -> Result<Self::Out, $crate::error::Error> {
647            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
648        }
649    };
650    (string) => {
651        fn string<'avro_macro_lifetime, R: AvroRead>(
652            self,
653            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
654        ) -> Result<Self::Out, $crate::error::Error> {
655            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
656        }
657    };
658    (json) => {
659        fn json<'avro_macro_lifetime, R: AvroRead>(
660            self,
661            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime serde_json::Value, R>,
662        ) -> Result<Self::Out, $crate::error::Error> {
663            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
664        }
665    };
666    (uuid) => {
667        fn uuid<'avro_macro_lifetime, R: AvroRead>(
668            self,
669            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
670        ) -> Result<Self::Out, $crate::error::Error> {
671            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
672        }
673    };
674    (fixed) => {
675        fn fixed<'avro_macro_lifetime, R: AvroRead>(
676            self,
677            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
678        ) -> Result<Self::Out, $crate::error::Error> {
679            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
680        }
681    };
682    ($($kind:ident),+) => {
683        $($crate::define_unexpected!{$kind})+
684    }
685}
686
687pub trait AvroDecode: Sized {
688    type Out;
689    fn record<R: AvroRead, A: AvroRecordAccess<R>>(
690        self,
691        _a: &mut A,
692    ) -> Result<Self::Out, AvroError>;
693
694    fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
695        self,
696        _idx: usize,
697        _n_variants: usize,
698        _null_variant: Option<usize>,
699        _deserializer: D,
700        _reader: &'a mut R,
701    ) -> Result<Self::Out, AvroError>;
702
703    fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
704
705    fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
706
707    fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
708
709    fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
710
711    fn decimal<'a, R: AvroRead>(
712        self,
713        _precision: usize,
714        _scale: usize,
715        _r: ValueOrReader<'a, &'a [u8], R>,
716    ) -> Result<Self::Out, AvroError>;
717
718    fn bytes<'a, R: AvroRead>(
719        self,
720        _r: ValueOrReader<'a, &'a [u8], R>,
721    ) -> Result<Self::Out, AvroError>;
722    fn string<'a, R: AvroRead>(
723        self,
724        _r: ValueOrReader<'a, &'a str, R>,
725    ) -> Result<Self::Out, AvroError>;
726    fn json<'a, R: AvroRead>(
727        self,
728        _r: ValueOrReader<'a, &'a serde_json::Value, R>,
729    ) -> Result<Self::Out, AvroError>;
730    fn uuid<'a, R: AvroRead>(
731        self,
732        _r: ValueOrReader<'a, &'a [u8], R>,
733    ) -> Result<Self::Out, AvroError>;
734    fn fixed<'a, R: AvroRead>(
735        self,
736        _r: ValueOrReader<'a, &'a [u8], R>,
737    ) -> Result<Self::Out, AvroError>;
738    fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
739        self,
740        f: F,
741    ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
742        public_decoders::MappingDecoder::new(self, f)
743    }
744}
745
746pub mod public_decoders {
747
748    use std::collections::BTreeMap;
749
750    use crate::error::{DecodeError, Error as AvroError};
751    use crate::types::{DecimalValue, Scalar, Value};
752    use crate::{
753        AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
754    };
755
756    use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
757
758    macro_rules! define_simple_decoder {
759        ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
760            pub struct $name;
761            impl AvroDecode for $name {
762                type Out = $out;
763                fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
764                    let out = match scalar {
765                        $(
766                            Scalar::$scalar_branch(inner) => {inner.try_into()?}
767                        ),*
768                            other => return Err(AvroError::Decode(DecodeError::UnexpectedScalarKind(other.into())))
769                    };
770                    Ok(out)
771                }
772                define_unexpected! {
773                    array, record, union_branch, map, enum_variant, decimal, bytes, string, json, uuid, fixed
774                }
775            }
776
777            impl StatefulAvroDecodable for $out {
778                type Decoder = $name;
779                type State = ();
780                fn new_decoder(_state: ()) -> $name {
781                    $name
782                }
783            }
784        }
785    }
786
787    define_simple_decoder!(I32Decoder, i32, Int;Long);
788    define_simple_decoder!(I64Decoder, i64, Int;Long);
789    define_simple_decoder!(U64Decoder, u64, Int;Long);
790    define_simple_decoder!(UsizeDecoder, usize, Int;Long);
791    define_simple_decoder!(IsizeDecoder, isize, Int;Long);
792
793    pub struct MappingDecoder<
794        T,
795        InnerOut,
796        Inner: AvroDecode<Out = InnerOut>,
797        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
798    > {
799        inner: Inner,
800        conv: Conv,
801    }
802
803    impl<
804        T,
805        InnerOut,
806        Inner: AvroDecode<Out = InnerOut>,
807        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
808    > MappingDecoder<T, InnerOut, Inner, Conv>
809    {
810        pub fn new(inner: Inner, conv: Conv) -> Self {
811            Self { inner, conv }
812        }
813    }
814
815    impl<
816        T,
817        InnerOut,
818        Inner: AvroDecode<Out = InnerOut>,
819        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
820    > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
821    {
822        type Out = T;
823
824        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
825            mut self,
826            a: &mut A,
827        ) -> Result<Self::Out, AvroError> {
828            (self.conv)(self.inner.record(a)?)
829        }
830
831        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
832            mut self,
833            idx: usize,
834            n_variants: usize,
835            null_variant: Option<usize>,
836            deserializer: D,
837            reader: &'a mut R,
838        ) -> Result<Self::Out, AvroError> {
839            (self.conv)(self.inner.union_branch(
840                idx,
841                n_variants,
842                null_variant,
843                deserializer,
844                reader,
845            )?)
846        }
847
848        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
849            (self.conv)(self.inner.array(a)?)
850        }
851
852        fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
853            (self.conv)(self.inner.map(m)?)
854        }
855
856        fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
857            (self.conv)(self.inner.enum_variant(symbol, idx)?)
858        }
859
860        fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
861            (self.conv)(self.inner.scalar(scalar)?)
862        }
863
864        fn decimal<'a, R: AvroRead>(
865            mut self,
866            precision: usize,
867            scale: usize,
868            r: ValueOrReader<'a, &'a [u8], R>,
869        ) -> Result<Self::Out, AvroError> {
870            (self.conv)(self.inner.decimal(precision, scale, r)?)
871        }
872
873        fn bytes<'a, R: AvroRead>(
874            mut self,
875            r: ValueOrReader<'a, &'a [u8], R>,
876        ) -> Result<Self::Out, AvroError> {
877            (self.conv)(self.inner.bytes(r)?)
878        }
879
880        fn string<'a, R: AvroRead>(
881            mut self,
882            r: ValueOrReader<'a, &'a str, R>,
883        ) -> Result<Self::Out, AvroError> {
884            (self.conv)(self.inner.string(r)?)
885        }
886
887        fn json<'a, R: AvroRead>(
888            mut self,
889            r: ValueOrReader<'a, &'a serde_json::Value, R>,
890        ) -> Result<Self::Out, AvroError> {
891            (self.conv)(self.inner.json(r)?)
892        }
893
894        fn uuid<'a, R: AvroRead>(
895            mut self,
896            r: ValueOrReader<'a, &'a [u8], R>,
897        ) -> Result<Self::Out, AvroError> {
898            (self.conv)(self.inner.uuid(r)?)
899        }
900
901        fn fixed<'a, R: AvroRead>(
902            mut self,
903            r: ValueOrReader<'a, &'a [u8], R>,
904        ) -> Result<Self::Out, AvroError> {
905            (self.conv)(self.inner.fixed(r)?)
906        }
907    }
908    pub struct ArrayAsVecDecoder<
909        InnerOut,
910        Inner: AvroDecode<Out = InnerOut>,
911        Ctor: FnMut() -> Inner,
912    > {
913        ctor: Ctor,
914        buf: Vec<InnerOut>,
915    }
916
917    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
918        ArrayAsVecDecoder<InnerOut, Inner, Ctor>
919    {
920        pub fn new(ctor: Ctor) -> Self {
921            Self { ctor, buf: vec![] }
922        }
923    }
924    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
925        for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
926    {
927        type Out = Vec<InnerOut>;
928        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
929            while let Some(next) = a.decode_next((self.ctor)())? {
930                self.buf.push(next);
931            }
932            Ok(self.buf)
933        }
934        define_unexpected! {
935            record, union_branch, map, enum_variant, scalar, decimal, bytes, string, json, uuid, fixed
936        }
937    }
938
939    pub struct DefaultArrayAsVecDecoder<T> {
940        buf: Vec<T>,
941    }
942    impl<T> Default for DefaultArrayAsVecDecoder<T> {
943        fn default() -> Self {
944            Self { buf: vec![] }
945        }
946    }
947    impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
948        type Out = Vec<T>;
949        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
950            while let Some(next) = {
951                let inner = T::new_decoder();
952                a.decode_next(inner)?
953            } {
954                self.buf.push(next);
955            }
956            Ok(self.buf)
957        }
958        define_unexpected! {
959            record, union_branch, map, enum_variant, scalar, decimal, bytes, string, json, uuid, fixed
960        }
961    }
962    impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
963        type Decoder = DefaultArrayAsVecDecoder<T>;
964        type State = ();
965
966        fn new_decoder(_state: Self::State) -> Self::Decoder {
967            DefaultArrayAsVecDecoder::<T>::default()
968        }
969    }
970    pub struct TrivialDecoder;
971
972    impl TrivialDecoder {
973        fn maybe_skip<'a, V, R: AvroRead>(
974            self,
975            r: ValueOrReader<'a, V, R>,
976        ) -> Result<(), AvroError> {
977            if let ValueOrReader::Reader { len, r } = r {
978                Ok(r.skip(len)?)
979            } else {
980                Ok(())
981            }
982        }
983    }
984
985    impl AvroDecode for TrivialDecoder {
986        type Out = ();
987        fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
988            while let Some((_, _, f)) = a.next_field()? {
989                f.decode_field(TrivialDecoder)?;
990            }
991            Ok(())
992        }
993        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
994            self,
995            _idx: usize,
996            _n_variants: usize,
997            _null_variant: Option<usize>,
998            deserializer: D,
999            reader: &'a mut R,
1000        ) -> Result<(), AvroError> {
1001            deserializer.deserialize(reader, self)
1002        }
1003
1004        fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1005            Ok(())
1006        }
1007        fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1008            Ok(())
1009        }
1010        fn decimal<'a, R: AvroRead>(
1011            self,
1012            _precision: usize,
1013            _scale: usize,
1014            r: ValueOrReader<'a, &'a [u8], R>,
1015        ) -> Result<(), AvroError> {
1016            self.maybe_skip(r)
1017        }
1018        fn bytes<'a, R: AvroRead>(
1019            self,
1020            r: ValueOrReader<'a, &'a [u8], R>,
1021        ) -> Result<(), AvroError> {
1022            self.maybe_skip(r)
1023        }
1024        fn string<'a, R: AvroRead>(
1025            self,
1026            r: ValueOrReader<'a, &'a str, R>,
1027        ) -> Result<(), AvroError> {
1028            self.maybe_skip(r)
1029        }
1030        fn json<'a, R: AvroRead>(
1031            self,
1032            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1033        ) -> Result<(), AvroError> {
1034            self.maybe_skip(r)
1035        }
1036        fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1037            self.maybe_skip(r)
1038        }
1039        fn fixed<'a, R: AvroRead>(
1040            self,
1041            r: ValueOrReader<'a, &'a [u8], R>,
1042        ) -> Result<(), AvroError> {
1043            self.maybe_skip(r)
1044        }
1045        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1046            while a.decode_next(TrivialDecoder)?.is_some() {}
1047            Ok(())
1048        }
1049
1050        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1051            while let Some((_n, entry)) = m.next_entry()? {
1052                entry.decode_field(TrivialDecoder)?
1053            }
1054            Ok(())
1055        }
1056    }
1057    pub struct ValueDecoder;
1058    impl AvroDecode for ValueDecoder {
1059        type Out = Value;
1060        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1061            self,
1062            a: &mut A,
1063        ) -> Result<Value, AvroError> {
1064            let mut fields = vec![];
1065            while let Some((name, idx, f)) = a.next_field()? {
1066                let next = ValueDecoder;
1067                let val = f.decode_field(next)?;
1068                fields.push((idx, (name.to_string(), val)));
1069            }
1070            fields.sort_by_key(|(idx, _)| *idx);
1071
1072            Ok(Value::Record(
1073                fields
1074                    .into_iter()
1075                    .map(|(_idx, (name, val))| (name, val))
1076                    .collect(),
1077            ))
1078        }
1079        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1080            self,
1081            index: usize,
1082            n_variants: usize,
1083            null_variant: Option<usize>,
1084            deserializer: D,
1085            reader: &'a mut R,
1086        ) -> Result<Value, AvroError> {
1087            let next = ValueDecoder;
1088            let inner = Box::new(deserializer.deserialize(reader, next)?);
1089            Ok(Value::Union {
1090                index,
1091                inner,
1092                n_variants,
1093                null_variant,
1094            })
1095        }
1096        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1097            let mut items = vec![];
1098            loop {
1099                let next = ValueDecoder;
1100
1101                if let Some(value) = a.decode_next(next)? {
1102                    items.push(value)
1103                } else {
1104                    break;
1105                }
1106            }
1107            Ok(Value::Array(items))
1108        }
1109        fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1110            Ok(Value::Enum(idx, symbol.to_string()))
1111        }
1112        fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1113            Ok(scalar.into())
1114        }
1115        fn decimal<'a, R: AvroRead>(
1116            self,
1117            precision: usize,
1118            scale: usize,
1119            r: ValueOrReader<'a, &'a [u8], R>,
1120        ) -> Result<Value, AvroError> {
1121            let unscaled = match r {
1122                ValueOrReader::Value(buf) => buf.to_vec(),
1123                ValueOrReader::Reader { len, r } => {
1124                    let mut buf = vec![];
1125                    buf.resize_with(len, Default::default);
1126                    r.read_exact(&mut buf)?;
1127                    buf
1128                }
1129            };
1130            Ok(Value::Decimal(DecimalValue {
1131                unscaled,
1132                precision,
1133                scale,
1134            }))
1135        }
1136        fn bytes<'a, R: AvroRead>(
1137            self,
1138            r: ValueOrReader<'a, &'a [u8], R>,
1139        ) -> Result<Value, AvroError> {
1140            let buf = match r {
1141                ValueOrReader::Value(buf) => buf.to_vec(),
1142                ValueOrReader::Reader { len, r } => {
1143                    let mut buf = vec![];
1144                    buf.resize_with(len, Default::default);
1145                    r.read_exact(&mut buf)?;
1146                    buf
1147                }
1148            };
1149            Ok(Value::Bytes(buf))
1150        }
1151        fn string<'a, R: AvroRead>(
1152            self,
1153            r: ValueOrReader<'a, &'a str, R>,
1154        ) -> Result<Value, AvroError> {
1155            let s = match r {
1156                ValueOrReader::Value(s) => s.to_string(),
1157                ValueOrReader::Reader { len, r } => {
1158                    let mut buf = vec![];
1159                    buf.resize_with(len, Default::default);
1160                    r.read_exact(&mut buf)?;
1161                    String::from_utf8(buf)
1162                        .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1163                }
1164            };
1165            Ok(Value::String(s))
1166        }
1167        fn json<'a, R: AvroRead>(
1168            self,
1169            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1170        ) -> Result<Value, AvroError> {
1171            let val = match r {
1172                ValueOrReader::Value(val) => val.clone(),
1173                ValueOrReader::Reader { len, r } => {
1174                    let mut buf = vec![];
1175                    buf.resize_with(len, Default::default);
1176                    r.read_exact(&mut buf)?;
1177                    serde_json::from_slice(&buf).map_err(|e| {
1178                        AvroError::Decode(DecodeError::BadJson {
1179                            category: e.classify(),
1180                            bytes: buf.to_owned(),
1181                        })
1182                    })?
1183                }
1184            };
1185            Ok(Value::Json(val))
1186        }
1187        fn uuid<'a, R: AvroRead>(
1188            self,
1189            r: ValueOrReader<'a, &'a [u8], R>,
1190        ) -> Result<Value, AvroError> {
1191            let buf = match r {
1192                ValueOrReader::Value(val) => val.to_vec(),
1193                ValueOrReader::Reader { len, r } => {
1194                    let mut buf = vec![];
1195                    buf.resize_with(len, Default::default);
1196                    r.read_exact(&mut buf)?;
1197                    buf
1198                }
1199            };
1200            let s = std::str::from_utf8(&buf)
1201                .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1202            let val =
1203                uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1204            Ok(Value::Uuid(val))
1205        }
1206        fn fixed<'a, R: AvroRead>(
1207            self,
1208            r: ValueOrReader<'a, &'a [u8], R>,
1209        ) -> Result<Value, AvroError> {
1210            let buf = match r {
1211                ValueOrReader::Value(buf) => buf.to_vec(),
1212                ValueOrReader::Reader { len, r } => {
1213                    let mut buf = vec![];
1214                    buf.resize_with(len, Default::default);
1215                    r.read_exact(&mut buf)?;
1216                    buf
1217                }
1218            };
1219            Ok(Value::Fixed(buf.len(), buf))
1220        }
1221        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1222            let mut entries = BTreeMap::new();
1223            while let Some((name, a)) = m.next_entry()? {
1224                let d = ValueDecoder;
1225                let val = a.decode_field(d)?;
1226                entries.insert(name, val);
1227            }
1228            Ok(Value::Map(entries))
1229        }
1230    }
1231}
1232
1233impl<'a> AvroDeserializer for &'a Value {
1234    fn deserialize<R: AvroRead, D: AvroDecode>(
1235        self,
1236        _r: &mut R,
1237        d: D,
1238    ) -> Result<D::Out, AvroError> {
1239        give_value(d, self)
1240    }
1241}
1242
1243pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1244    use ValueOrReader::Value as V;
1245    match v {
1246        Value::Null => d.scalar(Scalar::Null),
1247        Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1248        Value::Int(val) => d.scalar(Scalar::Int(*val)),
1249        Value::Long(val) => d.scalar(Scalar::Long(*val)),
1250        Value::Float(val) => d.scalar(Scalar::Float(*val)),
1251        Value::Double(val) => d.scalar(Scalar::Double(*val)),
1252        Value::Date(val) => d.scalar(Scalar::Date(*val)),
1253        Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1254        // The &[u8] parameter here (and elsewhere in this function) is arbitrary, but we have to put in something in order for the function
1255        // to type-check
1256        Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1257        Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1258        Value::String(val) => d.string::<&[u8]>(V(val)),
1259        Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1260        Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1261        Value::Union {
1262            index,
1263            inner,
1264            n_variants,
1265            null_variant,
1266        } => {
1267            let mut empty_reader: &[u8] = &[];
1268            d.union_branch(
1269                *index,
1270                *n_variants,
1271                *null_variant,
1272                &**inner,
1273                &mut empty_reader,
1274            )
1275        }
1276        Value::Array(val) => {
1277            let mut a = ValueArrayAccess::new(val);
1278            d.array(&mut a)
1279        }
1280        Value::Map(val) => {
1281            let vals: Vec<_> = val.clone().into_iter().collect();
1282            let mut m = ValueMapAccess::new(vals.as_slice());
1283            d.map(&mut m)
1284        }
1285        Value::Record(val) => {
1286            let mut a = ValueRecordAccess::new(val);
1287            d.record(&mut a)
1288        }
1289        Value::Json(val) => d.json::<&[u8]>(V(val)),
1290        Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1291    }
1292}
1293
1294pub trait AvroDeserializer {
1295    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1296}
1297
1298#[derive(Clone, Copy)]
1299pub struct GeneralDeserializer<'a> {
1300    pub schema: SchemaNode<'a>,
1301}
1302
1303impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1304    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1305        use ValueOrReader::Reader;
1306        match self.schema.inner {
1307            SchemaPiece::Null => d.scalar(Scalar::Null),
1308            SchemaPiece::Boolean => {
1309                let mut buf = [0u8; 1];
1310                r.read_exact(&mut buf[..])?;
1311                let val = match buf[0] {
1312                    0u8 => false,
1313                    1u8 => true,
1314                    other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
1315                };
1316                d.scalar(Scalar::Boolean(val))
1317            }
1318            SchemaPiece::Int => {
1319                let val = zag_i32(r)?;
1320                d.scalar(Scalar::Int(val))
1321            }
1322            SchemaPiece::Long => {
1323                let val = zag_i64(r)?;
1324                d.scalar(Scalar::Long(val))
1325            }
1326            SchemaPiece::Float => {
1327                let val = decode_float(r)?;
1328                d.scalar(Scalar::Float(val))
1329            }
1330            SchemaPiece::Double => {
1331                let val = decode_double(r)?;
1332                d.scalar(Scalar::Double(val))
1333            }
1334            SchemaPiece::Date => {
1335                let days = zag_i32(r)?;
1336                d.scalar(Scalar::Date(days))
1337            }
1338            SchemaPiece::TimestampMilli => {
1339                let total_millis = zag_i64(r)?;
1340                let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
1341                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1342                    _ => unreachable!(),
1343                };
1344                d.scalar(scalar)
1345            }
1346            SchemaPiece::TimestampMicro => {
1347                let total_micros = zag_i64(r)?;
1348                let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
1349                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1350                    _ => unreachable!(),
1351                };
1352                d.scalar(scalar)
1353            }
1354            SchemaPiece::Decimal {
1355                precision,
1356                scale,
1357                fixed_size,
1358            } => {
1359                let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
1360                d.decimal(*precision, *scale, Reader { len, r })
1361            }
1362            SchemaPiece::Bytes => {
1363                let len = decode_len(r)?;
1364                d.bytes(Reader { len, r })
1365            }
1366            SchemaPiece::String => {
1367                let len = decode_len(r)?;
1368                d.string(Reader { len, r })
1369            }
1370            SchemaPiece::Json => {
1371                let len = decode_len(r)?;
1372                d.json(Reader { len, r })
1373            }
1374            SchemaPiece::Uuid => {
1375                let len = decode_len(r)?;
1376                d.uuid(Reader { len, r })
1377            }
1378            SchemaPiece::Array(inner) => {
1379                // From the spec:
1380                // Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
1381                // If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
1382
1383                let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
1384                d.array(&mut a)
1385            }
1386            SchemaPiece::Map(inner) => {
1387                // See logic for `SchemaPiece::Array` above. Maps are encoded similarly.
1388                let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
1389                d.map(&mut m)
1390            }
1391            SchemaPiece::Union(inner) => {
1392                let index = decode_long_nonneg(r)? as usize;
1393                let variants = inner.variants();
1394                match variants.get(index) {
1395                    Some(variant) => {
1396                        let n_variants = variants.len();
1397                        let null_variant = variants
1398                            .iter()
1399                            .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
1400                        let dsr = GeneralDeserializer {
1401                            schema: self.schema.step(variant),
1402                        };
1403                        d.union_branch(index, n_variants, null_variant, dsr, r)
1404                    }
1405                    None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
1406                        index,
1407                        len: variants.len(),
1408                    })),
1409                }
1410            }
1411            SchemaPiece::ResolveIntLong => {
1412                let val = zag_i32(r)? as i64;
1413                d.scalar(Scalar::Long(val))
1414            }
1415            SchemaPiece::ResolveIntFloat => {
1416                let val = zag_i32(r)? as f32;
1417                d.scalar(Scalar::Float(val))
1418            }
1419            SchemaPiece::ResolveIntDouble => {
1420                let val = zag_i32(r)? as f64;
1421                d.scalar(Scalar::Double(val))
1422            }
1423            SchemaPiece::ResolveLongFloat => {
1424                let val = zag_i64(r)? as f32;
1425                d.scalar(Scalar::Float(val))
1426            }
1427            SchemaPiece::ResolveLongDouble => {
1428                let val = zag_i64(r)? as f64;
1429                d.scalar(Scalar::Double(val))
1430            }
1431            SchemaPiece::ResolveFloatDouble => {
1432                let val = decode_float(r)? as f64;
1433                d.scalar(Scalar::Double(val))
1434            }
1435            SchemaPiece::ResolveConcreteUnion {
1436                index,
1437                inner,
1438                n_reader_variants,
1439                reader_null_variant,
1440            } => {
1441                let dsr = GeneralDeserializer {
1442                    schema: self.schema.step(&**inner),
1443                };
1444                d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1445            }
1446            SchemaPiece::ResolveUnionUnion {
1447                permutation,
1448                n_reader_variants,
1449                reader_null_variant,
1450            } => {
1451                let index = decode_long_nonneg(r)? as usize;
1452                if index >= permutation.len() {
1453                    return Err(AvroError::Decode(DecodeError::BadUnionIndex {
1454                        index,
1455                        len: permutation.len(),
1456                    }));
1457                }
1458                match &permutation[index] {
1459                    Err(e) => Err(e.clone()),
1460                    Ok((index, variant)) => {
1461                        let dsr = GeneralDeserializer {
1462                            schema: self.schema.step(variant),
1463                        };
1464                        d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
1465                    }
1466                }
1467            }
1468            SchemaPiece::ResolveUnionConcrete { index, inner } => {
1469                let found_index = decode_long_nonneg(r)? as usize;
1470                if *index != found_index {
1471                    Err(AvroError::Decode(DecodeError::WrongUnionIndex {
1472                        expected: *index,
1473                        actual: found_index,
1474                    }))
1475                } else {
1476                    let dsr = GeneralDeserializer {
1477                        schema: self.schema.step(inner.as_ref()),
1478                    };
1479                    // The reader is not expecting a union here, so don't call `D::union_branch`
1480                    dsr.deserialize(r, d)
1481                }
1482            }
1483            SchemaPiece::Record {
1484                doc: _,
1485                fields,
1486                lookup: _,
1487            } => {
1488                let mut a = SimpleRecordAccess::new(self.schema, r, fields);
1489                d.record(&mut a)
1490            }
1491            SchemaPiece::Enum {
1492                symbols,
1493                doc: _,
1494                default_idx: _,
1495            } => {
1496                let index = decode_int_nonneg(r)? as usize;
1497                match symbols.get(index) {
1498                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1499                        index,
1500                        len: symbols.len(),
1501                    })),
1502                    Some(symbol) => d.enum_variant(symbol, index),
1503                }
1504            }
1505            SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
1506            // XXX - This does not deliver fields to the consumer in the same order they were
1507            // declared in the reader schema, which might cause headache for consumers...
1508            // Unfortunately, there isn't a good way to do so without pre-decoding the whole record
1509            // (which would require a lot of allocations)
1510            // and then sorting the fields. So, just let the consumer deal with re-ordering.
1511            SchemaPiece::ResolveRecord {
1512                defaults,
1513                fields,
1514                n_reader_fields: _,
1515            } => {
1516                let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
1517                d.record(&mut a)
1518            }
1519            SchemaPiece::ResolveEnum {
1520                doc: _,
1521                symbols,
1522                default,
1523            } => {
1524                let index = decode_int_nonneg(r)? as usize;
1525                match symbols.get(index) {
1526                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
1527                        index,
1528                        len: symbols.len(),
1529                    })),
1530                    Some(op) => match op {
1531                        Err(missing) => {
1532                            if let Some((reader_index, symbol)) = default.clone() {
1533                                d.enum_variant(&symbol, reader_index)
1534                            } else {
1535                                Err(AvroError::Decode(DecodeError::MissingEnumIndex {
1536                                    index,
1537                                    symbol: missing.clone(),
1538                                }))
1539                            }
1540                        }
1541                        Ok((index, name)) => d.enum_variant(name, *index),
1542                    },
1543                }
1544            }
1545            SchemaPiece::ResolveIntTsMilli => {
1546                let total_millis = zag_i32(r)?;
1547                let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
1548                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1549                    _ => unreachable!(),
1550                };
1551                d.scalar(scalar)
1552            }
1553            SchemaPiece::ResolveIntTsMicro => {
1554                let total_micros = zag_i32(r)?;
1555                let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
1556                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
1557                    _ => unreachable!(),
1558                };
1559                d.scalar(scalar)
1560            }
1561            SchemaPiece::ResolveDateTimestamp => {
1562                let days = zag_i32(r)?;
1563
1564                let date = NaiveDate::from_ymd_opt(1970, 1, 1)
1565                    .expect("naive date known valid")
1566                    .checked_add_signed(
1567                        chrono::Duration::try_days(days.into())
1568                            .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
1569                    )
1570                    .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
1571                let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
1572                d.scalar(Scalar::Timestamp(dt))
1573            }
1574        }
1575    }
1576}
1577/// Decode a `Value` from avro format given its `Schema`.
1578pub fn decode<'a, R: AvroRead>(
1579    schema: SchemaNode<'a>,
1580    reader: &'a mut R,
1581) -> Result<Value, AvroError> {
1582    let d = ValueDecoder;
1583    let dsr = GeneralDeserializer { schema };
1584    let val = dsr.deserialize(reader, d)?;
1585    Ok(val)
1586}