Skip to main content

mz_avro/
decode.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use std::cmp;
25use std::collections::BTreeSet;
26use std::fmt::{self, Display};
27use std::fs::File;
28use std::io::{self, Cursor, Read, Seek, SeekFrom};
29
30use chrono::{DateTime, NaiveDate};
31use flate2::read::MultiGzDecoder;
32
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35    RecordField, ResolvedDefaultValueField, ResolvedRecordField, Schema, SchemaNode, SchemaPiece,
36    SchemaPieceOrNamed, SchemaPieceRefOrNamed,
37};
38use crate::types::{Scalar, Value};
39use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
40use crate::{TrivialDecoder, ValueDecoder};
41
42pub trait StatefulAvroDecodable: Sized {
43    type Decoder: AvroDecode<Out = Self>;
44    type State;
45    fn new_decoder(state: Self::State) -> Self::Decoder;
46}
47pub trait AvroDecodable: Sized {
48    type Decoder: AvroDecode<Out = Self>;
49
50    fn new_decoder() -> Self::Decoder;
51}
52impl<T> AvroDecodable for T
53where
54    T: StatefulAvroDecodable,
55    T::State: Default,
56{
57    type Decoder = <Self as StatefulAvroDecodable>::Decoder;
58
59    fn new_decoder() -> Self::Decoder {
60        <Self as StatefulAvroDecodable>::new_decoder(Default::default())
61    }
62}
63#[inline]
64fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
65    let u = match zag_i64(reader)? {
66        i if i >= 0 => i as u64,
67        i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
68    };
69    Ok(u)
70}
71
72fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
73    let u = match zag_i32(reader)? {
74        i if i >= 0 => i as u32,
75        i => {
76            return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
77                i as i64,
78            )));
79        }
80    };
81    Ok(u)
82}
83
84#[inline]
85fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
86    zag_i64(reader).and_then(|i| safe_len(i as usize))
87}
88
89#[inline]
90fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
91    let mut buf = [0u8; 4];
92    reader.read_exact(&mut buf[..])?;
93    Ok(f32::from_le_bytes(buf))
94}
95
96#[inline]
97fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
98    let mut buf = [0u8; 8];
99    reader.read_exact(&mut buf[..])?;
100    Ok(f64::from_le_bytes(buf))
101}
102
103impl Display for TsUnit {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        match self {
106            TsUnit::Millis => write!(f, "ms"),
107            TsUnit::Micros => write!(f, "us"),
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use chrono::DateTime;
115
116    use crate::types::Value;
117    use crate::util::TsUnit;
118
119    use super::build_ts_value;
120
121    #[mz_ore::test]
122    fn test_negative_timestamps() {
123        assert_eq!(
124            build_ts_value(-1, TsUnit::Millis).unwrap(),
125            Value::Timestamp(
126                DateTime::from_timestamp(-1, 999_000_000)
127                    .unwrap()
128                    .naive_utc()
129            )
130        );
131        assert_eq!(
132            build_ts_value(-1000, TsUnit::Millis).unwrap(),
133            Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
134        );
135        assert_eq!(
136            build_ts_value(-1000, TsUnit::Micros).unwrap(),
137            Value::Timestamp(
138                DateTime::from_timestamp(-1, 999_000_000)
139                    .unwrap()
140                    .naive_utc()
141            )
142        );
143        assert_eq!(
144            build_ts_value(-1, TsUnit::Micros).unwrap(),
145            Value::Timestamp(
146                DateTime::from_timestamp(-1, 999_999_000)
147                    .unwrap()
148                    .naive_utc()
149            )
150        );
151        assert_eq!(
152            build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
153            Value::Timestamp(
154                DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
155                    .unwrap()
156                    .naive_utc()
157            )
158        );
159    }
160
161    #[mz_ore::test]
162    fn array_block_len_bounded_by_remaining_input() {
163        // A tiny body claiming a huge array block must error, not allocate. A
164        // small (or hostile) message used to drive an unbounded `Vec<Value>` by
165        // claiming a multi-million-element block whose items decode from ~no
166        // input (e.g. an empty record). Regression for an OOM found by fuzzing.
167        use std::str::FromStr;
168
169        use super::{AvroDeserializer, GeneralDeserializer};
170        use crate::util::zig_i64;
171        use crate::{Schema, ValueDecoder};
172
173        let schema = Schema::from_str(r#"{"type": "array", "items": "long"}"#).unwrap();
174        let mut body = Vec::new();
175        zig_i64(8_000_000, &mut body); // block count dwarfs the (here, empty) element data
176        let dsr = GeneralDeserializer {
177            schema: schema.top_node(),
178        };
179        let mut reader: &[u8] = &body;
180        let res = dsr.deserialize(&mut reader, ValueDecoder);
181        assert!(
182            res.is_err(),
183            "an array block longer than the remaining input must be rejected, not allocated"
184        );
185    }
186
187    #[mz_ore::test]
188    fn zero_width_array_elements_decode() {
189        // The remaining-input bound must not reject valid arrays whose elements
190        // encode to zero bytes. `null` and empty records have no per-element byte
191        // floor, so a ten-element block legitimately follows its count with no
192        // element bytes at all (Materialize's own writer emits `array<null>` of
193        // ten as `[20, 0]`: block count 10, then the terminating zero block).
194        use std::str::FromStr;
195
196        use super::{AvroDeserializer, GeneralDeserializer};
197        use crate::types::Value;
198        use crate::util::zig_i64;
199        use crate::{Schema, ValueDecoder};
200
201        for (items, want) in [
202            (r#""null""#, Value::Null),
203            (
204                r#"{"type": "record", "name": "Empty", "fields": []}"#,
205                Value::Record(vec![]),
206            ),
207        ] {
208            let schema =
209                Schema::from_str(&format!(r#"{{"type": "array", "items": {items}}}"#)).unwrap();
210            let mut body = Vec::new();
211            zig_i64(10, &mut body); // ten elements...
212            body.push(0); // ...then the terminating zero block. No element bytes.
213            let dsr = GeneralDeserializer {
214                schema: schema.top_node(),
215            };
216            let mut reader: &[u8] = &body;
217            let decoded = dsr
218                .deserialize(&mut reader, ValueDecoder)
219                .expect("a zero-width array element type must decode, not be rejected");
220            assert_eq!(decoded, Value::Array(vec![want; 10]));
221        }
222    }
223
224    #[mz_ore::test]
225    fn valid_null_array_falsely_rejected() {
226        // Materialize's encoder emits no element bytes for `null` array
227        // elements, so decoding must not assume each element consumes at least
228        // one byte of remaining input.
229        use std::str::FromStr;
230
231        use super::{AvroDeserializer, GeneralDeserializer};
232        use crate::encode::encode_to_vec;
233        use crate::types::Value;
234        use crate::{Schema, ValueDecoder};
235
236        let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
237        let value = Value::Array(vec![Value::Null, Value::Null]);
238        let body = encode_to_vec(&value, &schema);
239
240        let dsr = GeneralDeserializer {
241            schema: schema.top_node(),
242        };
243        let mut reader: &[u8] = &body;
244        let res = dsr.deserialize(&mut reader, ValueDecoder);
245        assert!(
246            res.is_ok(),
247            "an encoder-produced array of nulls should round-trip, but got: {res:?}"
248        );
249    }
250
251    #[mz_ore::test]
252    fn zero_width_array_elements_decode_across_blocks() {
253        // Zero-width arrays can be encoded as multiple blocks. The cumulative
254        // cap must not reject ordinary valid data below the limit.
255        use std::str::FromStr;
256
257        use super::{AvroDeserializer, GeneralDeserializer};
258        use crate::types::Value;
259        use crate::util::zig_i64;
260        use crate::{Schema, ValueDecoder};
261
262        let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
263        let mut body = Vec::new();
264        zig_i64(4, &mut body);
265        zig_i64(6, &mut body);
266        body.push(0);
267
268        let dsr = GeneralDeserializer {
269            schema: schema.top_node(),
270        };
271        let mut reader: &[u8] = &body;
272        let decoded = dsr
273            .deserialize(&mut reader, ValueDecoder)
274            .expect("zero-width arrays may span multiple blocks below the cap");
275        assert_eq!(decoded, Value::Array(vec![Value::Null; 10]));
276    }
277
278    #[mz_ore::test]
279    fn zero_width_array_total_len_bounded_across_blocks() {
280        // A zero-width element type has no input-proportional per-block bound.
281        // Keep a cumulative cap so repeated legal-size blocks cannot drive an
282        // unbounded decode. Seed the shared node budget at the cap to test the
283        // edge without walking millions of null elements first.
284        use std::str::FromStr;
285
286        use super::{AvroArrayAccess, DECODE_NODES, MAX_VALUE_NODES, SimpleArrayAccess};
287        use crate::util::zig_i64;
288        use crate::{Schema, TrivialDecoder};
289
290        let schema = Schema::from_str(r#""null""#).unwrap();
291        let mut body = Vec::new();
292        zig_i64(1, &mut body);
293
294        let mut reader: &[u8] = &body;
295        let mut access = SimpleArrayAccess::new(&mut reader, schema.top_node());
296        // Drive `SimpleArrayAccess` directly (no top-level decode entry to reset
297        // the budget), so pre-charge the shared counter to the cap by hand.
298        DECODE_NODES.with(|n| n.set(MAX_VALUE_NODES));
299
300        let err = access
301            .decode_next(TrivialDecoder)
302            .expect_err("a new block past the cumulative node budget must be rejected");
303        DECODE_NODES.with(|n| n.set(0));
304        assert!(
305            err.to_string().contains("exceeds cumulative limit"),
306            "unexpected error: {err}"
307        );
308    }
309
310    #[mz_ore::test]
311    fn zero_width_record_array_bounded() {
312        // Regression for an OOM found by the reader_decode fuzz target: an
313        // `array<record{null}>` element is zero-width on the wire (the byte-floor
314        // check below can't bound it) yet each element still allocates a
315        // `Value::Record`, so a multi-million-element block claimed from a
316        // handful of bytes amplified into gigabytes. The cumulative node cap must
317        // reject it rather than allocate.
318        use std::str::FromStr;
319
320        use super::{AvroDeserializer, GeneralDeserializer};
321        use crate::util::zig_i64;
322        use crate::{Schema, ValueDecoder};
323
324        let schema = Schema::from_str(
325            r#"{"type": "array", "items":
326                {"type": "record", "name": "R", "fields": [{"name": "g0", "type": "null"}]}}"#,
327        )
328        .unwrap();
329        // A single block claiming far more zero-width records than the node cap,
330        // followed by no element bytes at all.
331        let mut body = Vec::new();
332        zig_i64(100_000_000, &mut body);
333        let dsr = GeneralDeserializer {
334            schema: schema.top_node(),
335        };
336        let mut reader: &[u8] = &body;
337        let res = dsr.deserialize(&mut reader, ValueDecoder);
338        assert!(
339            res.is_err(),
340            "an array of zero-width records longer than the node cap must be rejected, not allocated"
341        );
342    }
343
344    #[mz_ore::test]
345    fn small_zero_width_record_array_decodes() {
346        // The node cap must not reject an ordinary, below-cap array of zero-width
347        // records: ten `record{null}`s encode (like `array<null>`) as just the
348        // block count followed by the terminating zero block.
349        use std::str::FromStr;
350
351        use super::{AvroDeserializer, GeneralDeserializer};
352        use crate::types::Value;
353        use crate::util::zig_i64;
354        use crate::{Schema, ValueDecoder};
355
356        let schema = Schema::from_str(
357            r#"{"type": "array", "items":
358                {"type": "record", "name": "R", "fields": [{"name": "g0", "type": "null"}]}}"#,
359        )
360        .unwrap();
361        let mut body = Vec::new();
362        zig_i64(10, &mut body);
363        body.push(0);
364        let dsr = GeneralDeserializer {
365            schema: schema.top_node(),
366        };
367        let mut reader: &[u8] = &body;
368        let decoded = dsr
369            .deserialize(&mut reader, ValueDecoder)
370            .expect("a below-cap array of zero-width records must decode, not be rejected");
371        let want = Value::Record(vec![("g0".to_string(), Value::Null)]);
372        assert_eq!(decoded, Value::Array(vec![want; 10]));
373    }
374
375    #[mz_ore::test]
376    fn nested_zero_width_collection_shares_node_budget() {
377        // Regression: the node budget must be shared across every collection in
378        // one datum, not reset per collection. With a per-collection budget each
379        // inner array of `array<record{array<record{null}>}>` would get a fresh
380        // `MAX_VALUE_NODES` ceiling, so a few wire bytes amplify into
381        // ~`MAX_VALUE_NODES` *per outer element* (the same blow-up the cap exists
382        // to stop, one nesting level deeper). Drive the decode so an inner
383        // array's block-header charge — reached only after the enclosing record
384        // starts decoding — trips the *shared* cumulative cap, proving the inner
385        // collection sees the outer element's spend rather than a fresh budget.
386        use std::str::FromStr;
387
388        use super::{AvroDeserializer, GeneralDeserializer, MAX_VALUE_NODES};
389        use crate::util::zig_i64;
390        use crate::{Schema, ValueDecoder};
391
392        let schema = Schema::from_str(
393            r#"{"type": "array", "items":
394                {"type": "record", "name": "Outer", "fields": [
395                    {"name": "inner", "type":
396                        {"type": "array", "items":
397                            {"type": "record", "name": "Inner",
398                             "fields": [{"name": "g0", "type": "null"}]}}}]}}"#,
399        )
400        .unwrap();
401        // One outer element (charges 2 nodes), whose inner array then claims
402        // `MAX_VALUE_NODES / 2` zero-width records — exactly `MAX_VALUE_NODES`
403        // weighted nodes, which clears the inner block's own per-block check but
404        // pushes the *shared* total (2 + MAX_VALUE_NODES) over the cap. No inner
405        // element bytes follow: a correct decode rejects at the header before
406        // allocating anything; the per-collection bug would instead materialize
407        // ~2M `Value::Record`s and only later hit EOF.
408        let mut body = Vec::new();
409        zig_i64(1, &mut body);
410        zig_i64((MAX_VALUE_NODES / 2) as i64, &mut body);
411        let dsr = GeneralDeserializer {
412            schema: schema.top_node(),
413        };
414        let mut reader: &[u8] = &body;
415        let err = dsr.deserialize(&mut reader, ValueDecoder).expect_err(
416            "a nested array claiming MAX_VALUE_NODES on top of the outer spend must be rejected",
417        );
418        assert!(
419            err.to_string().contains("exceeds cumulative limit"),
420            "unexpected error: {err}"
421        );
422    }
423
424    #[mz_ore::test]
425    fn top_level_decode_resets_stale_node_budget() {
426        // A decode that errored partway can leave the thread-local node counter
427        // non-zero; the next top-level decode must reset it (depth 0 -> 1) or an
428        // unrelated datum on the same thread is wrongly rejected. Pin the counter
429        // at the cap to stand in for that leftover, then require a small array to
430        // still decode.
431        use std::str::FromStr;
432
433        use super::{AvroDeserializer, DECODE_NODES, GeneralDeserializer, MAX_VALUE_NODES};
434        use crate::types::Value;
435        use crate::util::zig_i64;
436        use crate::{Schema, ValueDecoder};
437
438        let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
439        let mut body = Vec::new();
440        zig_i64(3, &mut body);
441        body.push(0);
442
443        DECODE_NODES.with(|n| n.set(MAX_VALUE_NODES));
444        let dsr = GeneralDeserializer {
445            schema: schema.top_node(),
446        };
447        let mut reader: &[u8] = &body;
448        let decoded = dsr
449            .deserialize(&mut reader, ValueDecoder)
450            .expect("the top-level entry must reset a stale node budget");
451        assert_eq!(decoded, Value::Array(vec![Value::Null; 3]));
452    }
453}
454
455/// A convenience function to build timestamp values from underlying longs.
456pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
457    let result = match unit {
458        TsUnit::Millis => DateTime::from_timestamp_millis(value),
459        TsUnit::Micros => DateTime::from_timestamp_micros(value),
460    };
461    let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
462    Ok(Value::Timestamp(ndt.naive_utc()))
463}
464
465/// A convenience trait for types that are both readable and skippable.
466///
467/// A blanket implementation is provided for all types that implement both
468/// [`Read`] and [`Skip`].
469pub trait AvroRead: Read + Skip {}
470
471impl<T> AvroRead for T where T: Read + Skip {}
472
473/// A trait that allows for efficient skipping forward while reading data.
474pub trait Skip: Read {
475    /// Advance the cursor by `len` bytes.
476    ///
477    /// If possible, the implementation should be more efficient than calling
478    /// [`Read::read`] and discarding the resulting bytes.
479    ///
480    /// Calling `skip` with a `len` that advances the cursor past the end of the
481    /// underlying data source is permissible. The only requirement is that the
482    /// next call to [`Read::read`] indicates EOF.
483    ///
484    /// # Errors
485    ///
486    /// Can return an error in all the same cases that [`Read::read`] can.
487    ///
488    /// TODO: Remove this clippy suppression when the issue is fixed.
489    /// See <https://github.com/rust-lang/rust-clippy/issues/12519>
490    #[allow(clippy::unused_io_amount)]
491    fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
492        const BUF_SIZE: usize = 512;
493        let mut buf = [0; BUF_SIZE];
494
495        while len > 0 {
496            let n = if len < BUF_SIZE {
497                self.read(&mut buf[..len])?
498            } else {
499                self.read(&mut buf)?
500            };
501            if n == 0 {
502                break;
503            }
504            len -= n;
505        }
506        Ok(())
507    }
508
509    /// An upper bound, if cheaply known, on the number of bytes still readable
510    /// from this source. Used to reject an array/map block that claims more
511    /// elements than the input could possibly contain: each element consumes at
512    /// least zero bytes, so a block longer than the remaining input only happens
513    /// when a small (or hostile) message claims a huge count, which would
514    /// otherwise drive an unbounded `Vec` allocation (length amplification).
515    /// Streaming sources that can't answer cheaply return `None`.
516    fn remaining_input(&self) -> Option<usize> {
517        None
518    }
519}
520
521impl Skip for File {
522    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
523        self.seek(SeekFrom::Current(len as i64))?;
524        Ok(())
525    }
526}
527
528impl Skip for &[u8] {
529    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
530        let len = cmp::min(len, self.len());
531        *self = &self[len..];
532        Ok(())
533    }
534
535    fn remaining_input(&self) -> Option<usize> {
536        Some(self.len())
537    }
538}
539
540impl<S: Skip + ?Sized> Skip for Box<S> {
541    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
542        self.as_mut().skip(len)
543    }
544
545    fn remaining_input(&self) -> Option<usize> {
546        self.as_ref().remaining_input()
547    }
548}
549
550impl<T: AsRef<[u8]>> Skip for Cursor<T> {
551    fn skip(&mut self, len: usize) -> Result<(), io::Error> {
552        self.seek(SeekFrom::Current(len as i64))?;
553        Ok(())
554    }
555
556    fn remaining_input(&self) -> Option<usize> {
557        let total = self.get_ref().as_ref().len();
558        Some(total.saturating_sub(usize::try_from(self.position()).unwrap_or(usize::MAX)))
559    }
560}
561
562impl<R: Read> Skip for MultiGzDecoder<R> {}
563
564pub enum ValueOrReader<'a, V, R: AvroRead> {
565    Value(V),
566    Reader { len: usize, r: &'a mut R },
567}
568
569enum SchemaOrDefault<'b, R: AvroRead> {
570    Schema(&'b mut R, SchemaNode<'b>),
571    Default(&'b Value),
572}
573pub struct AvroFieldAccess<'b, R: AvroRead> {
574    schema: SchemaOrDefault<'b, R>,
575}
576
577impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
578    pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
579        match self.schema {
580            SchemaOrDefault::Schema(r, schema) => {
581                let des = GeneralDeserializer { schema };
582                des.deserialize(r, d)
583            }
584            SchemaOrDefault::Default(value) => give_value(d, value),
585        }
586    }
587}
588
589pub trait AvroRecordAccess<R: AvroRead> {
590    fn next_field<'b>(
591        &'b mut self,
592    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
593}
594
595struct SimpleRecordAccess<'a, R: AvroRead> {
596    schema: SchemaNode<'a>,
597    r: &'a mut R,
598    fields: &'a [RecordField],
599    i: usize,
600}
601
602impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
603    fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
604        Self {
605            schema,
606            r,
607            fields,
608            i: 0,
609        }
610    }
611}
612
613impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
614    fn next_field<'b>(
615        &'b mut self,
616    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
617        assert!(self.i <= self.fields.len());
618        if self.i == self.fields.len() {
619            Ok(None)
620        } else {
621            let f = &self.fields[self.i];
622            self.i += 1;
623            Ok(Some((
624                f.name.as_str(),
625                f.position,
626                AvroFieldAccess {
627                    schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
628                },
629            )))
630        }
631    }
632}
633
634struct ValueRecordAccess<'a> {
635    values: &'a [(String, Value)],
636    i: usize,
637}
638
639impl<'a> ValueRecordAccess<'a> {
640    fn new(values: &'a [(String, Value)]) -> Self {
641        Self { values, i: 0 }
642    }
643}
644
645impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
646    fn next_field<'b>(
647        &'b mut self,
648    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
649        assert!(self.i <= self.values.len());
650        if self.i == self.values.len() {
651            Ok(None)
652        } else {
653            let (name, val) = &self.values[self.i];
654            self.i += 1;
655            Ok(Some((
656                name.as_str(),
657                self.i - 1,
658                AvroFieldAccess {
659                    schema: SchemaOrDefault::Default(val),
660                },
661            )))
662        }
663    }
664}
665
666struct ValueMapAccess<'a> {
667    values: &'a [(String, Value)],
668    i: usize,
669}
670
671impl<'a> ValueMapAccess<'a> {
672    fn new(values: &'a [(String, Value)]) -> Self {
673        Self { values, i: 0 }
674    }
675}
676
677impl<'a> AvroMapAccess for ValueMapAccess<'a> {
678    type R = &'a [u8];
679    fn next_entry<'b>(
680        &'b mut self,
681    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
682        assert!(self.i <= self.values.len());
683        if self.i == self.values.len() {
684            Ok(None)
685        } else {
686            let (name, val) = &self.values[self.i];
687            self.i += 1;
688            Ok(Some((
689                name.clone(),
690                AvroFieldAccess {
691                    schema: SchemaOrDefault::Default(val),
692                },
693            )))
694        }
695    }
696}
697
698struct ResolvedRecordAccess<'a, R: AvroRead> {
699    defaults: &'a [ResolvedDefaultValueField],
700    i_defaults: usize,
701    fields: &'a [ResolvedRecordField],
702    i_fields: usize,
703    r: &'a mut R,
704    schema: SchemaNode<'a>,
705}
706
707impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
708    fn new(
709        defaults: &'a [ResolvedDefaultValueField],
710        fields: &'a [ResolvedRecordField],
711        r: &'a mut R,
712        schema: SchemaNode<'a>,
713    ) -> Self {
714        Self {
715            defaults,
716            i_defaults: 0,
717            fields,
718            i_fields: 0,
719            r,
720            schema,
721        }
722    }
723}
724
725impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
726    fn next_field<'b>(
727        &'b mut self,
728    ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
729        assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
730        if self.i_defaults < self.defaults.len() {
731            let default = &self.defaults[self.i_defaults];
732            self.i_defaults += 1;
733            Ok(Some((
734                default.name.as_str(),
735                default.position,
736                AvroFieldAccess {
737                    schema: SchemaOrDefault::Default(&default.default),
738                },
739            )))
740        } else {
741            while self.i_fields < self.fields.len() {
742                let field = &self.fields[self.i_fields];
743                self.i_fields += 1;
744                match field {
745                    ResolvedRecordField::Absent(absent_schema) => {
746                        // we don't care what's in the value, but we still need to read it in order to skip ahead the proper amount in the input.
747                        let d = GeneralDeserializer {
748                            schema: absent_schema.top_node(),
749                        };
750                        d.deserialize(self.r, TrivialDecoder)?;
751                        continue;
752                    }
753                    ResolvedRecordField::Present(field) => {
754                        return Ok(Some((
755                            field.name.as_str(),
756                            field.position,
757                            AvroFieldAccess {
758                                schema: SchemaOrDefault::Schema(
759                                    self.r,
760                                    self.schema.step(&field.schema),
761                                ),
762                            },
763                        )));
764                    }
765                }
766            }
767            Ok(None)
768        }
769    }
770}
771
772pub trait AvroArrayAccess {
773    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
774}
775
776pub trait AvroMapAccess {
777    type R: AvroRead;
778    fn next_entry<'b>(
779        &'b mut self,
780    ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
781}
782
783pub struct SimpleMapAccess<'a, R: AvroRead> {
784    entry_schema: SchemaNode<'a>,
785    r: &'a mut R,
786    done: bool,
787    remaining: usize,
788    /// Lower bound on the `Value` nodes a single entry materializes: the key
789    /// `String` plus the value's [`min_value_nodes`]. Charged against the shared
790    /// [`DECODE_NODES`] budget per block; see [`charge_value_nodes`].
791    entry_nodes: usize,
792}
793
794impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
795    fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
796        Self {
797            entry_schema,
798            r,
799            done: false,
800            remaining: 0,
801            // One node for the key `String`, plus the value's own nodes.
802            entry_nodes: 1usize.saturating_add(min_value_nodes(entry_schema)),
803        }
804    }
805}
806
807impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
808    type R = R;
809    fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
810        if self.done {
811            return Ok(None);
812        }
813        if self.remaining == 0 {
814            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
815            let (len, _len_in_bytes) = match zag_i64(self.r)? {
816                len if len > 0 => (len as usize, None),
817                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
818                0 => {
819                    self.done = true;
820                    return Ok(None);
821                }
822                _ => unreachable!(),
823            };
824            // See `SimpleArrayAccess::decode_next` — same `MAX_VALUE_NODES`
825            // memory bound applies, weighting the entry count by the per-entry
826            // node lower bound so a block whose values are wide-but-zero-width
827            // records can't amplify a few wire bytes into millions of `Value`s.
828            let block_nodes = len.saturating_mul(self.entry_nodes);
829            if block_nodes > MAX_VALUE_NODES {
830                return Err(AvroError::Decode(DecodeError::Custom(format!(
831                    "Avro map block length {len} exceeds limit {MAX_VALUE_NODES} decoded values"
832                ))));
833            }
834            // Charge against the budget shared by every array/map in the datum,
835            // so nested collections can't each get a fresh cap (see
836            // `charge_value_nodes` / `MAX_VALUE_NODES`).
837            charge_value_nodes("map", block_nodes)?;
838            // A block can't hold more entries than there are bytes left to
839            // decode them from; reject a count that claims otherwise rather than
840            // letting it drive an unbounded allocation (see `Skip::remaining_input`).
841            // Unlike an array item, every map entry encodes at least a one-byte
842            // key-length varint, so each entry has a guaranteed one-byte floor and
843            // a count above the remaining input is always bogus.
844            if let Some(remaining) = self.r.remaining_input() {
845                if len > remaining {
846                    return Err(AvroError::Decode(DecodeError::Custom(format!(
847                        "Avro map block length {len} exceeds remaining input ({remaining} bytes)"
848                    ))));
849                }
850            }
851            self.remaining = len;
852        }
853        assert!(self.remaining > 0);
854        self.remaining -= 1;
855
856        // TODO - We can try to avoid this allocation, but  nobody uses maps in Materialize
857        // right now so it doesn't really matter.
858        let key_len = decode_len(self.r)?;
859        let mut key_buf = vec![];
860        key_buf.resize_with(key_len, Default::default);
861        self.r.read_exact(&mut key_buf)?;
862        let key = String::from_utf8(key_buf)
863            .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
864
865        let a = AvroFieldAccess {
866            schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
867        };
868        Ok(Some((key, a)))
869    }
870}
871
872struct SimpleArrayAccess<'a, R: AvroRead> {
873    r: &'a mut R,
874    schema: SchemaNode<'a>,
875    remaining: usize,
876    /// Lower bound on the `Value` nodes a single element materializes (see
877    /// [`min_value_nodes`]). Charged against the shared [`DECODE_NODES`] budget
878    /// per block; see [`charge_value_nodes`].
879    element_nodes: usize,
880    done: bool,
881}
882
883impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
884    fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
885        Self {
886            r,
887            schema,
888            remaining: 0,
889            element_nodes: min_value_nodes(schema),
890            done: false,
891        }
892    }
893}
894
895struct ValueArrayAccess<'a> {
896    values: &'a [Value],
897    i: usize,
898}
899
900impl<'a> ValueArrayAccess<'a> {
901    fn new(values: &'a [Value]) -> Self {
902        Self { values, i: 0 }
903    }
904}
905
906impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
907    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
908        assert!(self.i <= self.values.len());
909        if self.i == self.values.len() {
910            Ok(None)
911        } else {
912            let val = give_value(d, &self.values[self.i])?;
913            self.i += 1;
914            Ok(Some(val))
915        }
916    }
917}
918
919/// Sanity cap on the number of `Value` nodes one top-level decode may
920/// materialize across *every* array and map in the datum. Arrays and maps apply
921/// it per block (a fast reject for an absurd single-block count) and against the
922/// shared cumulative budget threaded through the whole decode (see
923/// [`charge_value_nodes`] / [`DECODE_NODES`]).
924///
925/// This bounds *memory*, not element count: each element is weighted by
926/// [`min_value_nodes`], a lower bound on the `Value` nodes it decodes into. An
927/// element-count cap alone is not enough, because a zero-width element — `null`,
928/// or a record of only `null`/empty-record fields — occupies no input yet still
929/// allocates a `Value` (a `Vec` slot, plus a record's own `Vec` and field-name
930/// `String`s). The [`min_encoded_len`] byte-floor check below bounds a block by
931/// the remaining input only when each element occupies at least one wire byte,
932/// so a multi-million-element block of zero-width elements would otherwise
933/// amplify a handful of bytes into gigabytes. Weighting the count and capping
934/// the product bounds that amplification (as well as the analogous case of a
935/// huge block of wide, positive-floor records read from a large input).
936///
937/// The budget is shared across the whole datum rather than reset per collection
938/// so the bound *composes through nesting*: a per-collection budget would hand
939/// every `array`/`map` a fresh ceiling, letting a schema like
940/// `array<record{array<record{null}>}>` amplify a few wire bytes into roughly
941/// this cap raised to the nesting depth. Sharing one budget keeps the worst case
942/// flat regardless of nesting.
943///
944/// Without any cap, a malicious or corrupt file can claim up to `i64::MAX` items
945/// and the generic array/map decode loop runs until it OOMs or hits `Vec`
946/// capacity-overflow.
947///
948/// At `1 << 22` nodes the worst case (decoding zero-width records right up to the
949/// cap) peaks around 750 MiB — including the transient doubling of the element
950/// `Vec` mid-`push` — leaving comfortable headroom under the fuzzer's 2 GiB RSS
951/// limit, while still admitting any realistically-sized array/map.
952const MAX_VALUE_NODES: usize = 1 << 22;
953
954/// A *lower* bound on the number of bytes any value of `schema` encodes to on
955/// the wire.
956///
957/// Used to reject an array block whose claimed element count could not possibly
958/// fit in the remaining input: a block of `len` elements occupies at least
959/// `len * min_encoded_len` bytes. Only an under-estimate is ever safe here — an
960/// over-estimate would reject valid data — so anything whose floor we can't
961/// prove (schema-resolution pieces, named-type recursion cycles) contributes
962/// `0`, which simply relaxes the bound.
963///
964/// Crucially this returns `0` for zero-width types — `null`, an empty record, a
965/// record of only such fields — because those genuinely encode to no bytes.
966/// Materialize's own writer emits a ten-element `array<null>` as `[20, 0]`, so a
967/// blanket "count must not exceed remaining bytes" rule would reject valid
968/// input. For zero-width element types the caller falls back to the cumulative
969/// [`MAX_VALUE_NODES`] cap (weighted by [`min_value_nodes`]).
970fn min_encoded_len(schema: SchemaNode) -> usize {
971    let mut visited = BTreeSet::new();
972    min_encoded_len_piece(schema.root, schema.inner, &mut visited)
973}
974
975/// Resolves a (possibly named) schema reference, guarding against named-type
976/// cycles, then defers to [`min_encoded_len_piece`].
977fn min_encoded_len_or_named(
978    root: &Schema,
979    node: SchemaPieceRefOrNamed,
980    visited: &mut BTreeSet<usize>,
981) -> usize {
982    match node {
983        SchemaPieceRefOrNamed::Piece(piece) => min_encoded_len_piece(root, piece, visited),
984        SchemaPieceRefOrNamed::Named(idx) => {
985            // A named-type cycle can only close through a record field; treat
986            // the back-edge as zero-width so we never over-estimate.
987            if !visited.insert(idx) {
988                return 0;
989            }
990            let len = min_encoded_len_piece(root, &root.lookup(idx).piece, visited);
991            visited.remove(&idx);
992            len
993        }
994    }
995}
996
997fn min_encoded_len_piece(
998    root: &Schema,
999    piece: &SchemaPiece,
1000    visited: &mut BTreeSet<usize>,
1001) -> usize {
1002    match piece {
1003        // Encodes to nothing at all.
1004        SchemaPiece::Null => 0,
1005        // A single byte (zig-zag varint of 0 is one byte; a bool is one byte).
1006        SchemaPiece::Boolean
1007        | SchemaPiece::Int
1008        | SchemaPiece::Long
1009        | SchemaPiece::Date
1010        | SchemaPiece::TimestampMilli
1011        | SchemaPiece::TimestampMicro => 1,
1012        SchemaPiece::Float => 4,
1013        SchemaPiece::Double => 8,
1014        // `fixed`-backed decimals are exactly their size; `bytes`-backed ones,
1015        // like `bytes`/`string`, carry at least a one-byte length varint.
1016        SchemaPiece::Decimal {
1017            fixed_size: Some(size),
1018            ..
1019        } => *size,
1020        SchemaPiece::Decimal {
1021            fixed_size: None, ..
1022        }
1023        | SchemaPiece::Bytes
1024        | SchemaPiece::String
1025        | SchemaPiece::Json
1026        | SchemaPiece::Uuid => 1,
1027        // An empty array/map encodes as a single zero-count byte regardless of
1028        // the element type, so don't recurse into it.
1029        SchemaPiece::Array(_) | SchemaPiece::Map(_) => 1,
1030        // A union always writes at least its one-byte branch index.
1031        SchemaPiece::Union(_) => 1,
1032        // An enum writes a one-byte symbol index.
1033        SchemaPiece::Enum { .. } => 1,
1034        SchemaPiece::Fixed { size } => *size,
1035        // A record's encoding is its fields' encodings concatenated, so its
1036        // floor is the sum of the fields' floors — which can be `0` (the empty
1037        // record, or a record of only `null`/empty-record fields).
1038        SchemaPiece::Record { fields, .. } => fields.iter().fold(0, |acc, field| {
1039            acc.saturating_add(min_encoded_len_or_named(
1040                root,
1041                field.schema.as_ref(),
1042                visited,
1043            ))
1044        }),
1045        // Schema-resolution pieces only arise on the reader/writer-mismatch
1046        // path; we don't try to prove a floor for them.
1047        _ => 0,
1048    }
1049}
1050
1051/// A *lower* bound on the number of `Value` nodes a single value of `schema`
1052/// materializes into when decoded.
1053///
1054/// Used to weight an array/map element so the cumulative [`MAX_VALUE_NODES`] cap
1055/// bounds decoded *memory*, not just element count. The amplifying case the cap
1056/// exists for — `null` and records of only zero-width fields — is counted
1057/// *exactly* here (a record always materializes every field, and none of these
1058/// types involve a union/array/map whose runtime size we couldn't predict), so
1059/// the bound is tight where it matters most.
1060///
1061/// As with [`min_encoded_len`], only an under-estimate is ever safe (an
1062/// over-estimate would reject valid data), so a nested array/map contributes
1063/// `1` — its empty-collection floor — and its actual contents are charged
1064/// against the shared [`MAX_VALUE_NODES`] budget as they are decoded (so the
1065/// cap still composes through nesting); a union contributes `1` (its count is
1066/// already bounded by the remaining input via its one-byte branch floor); and
1067/// unprovable schema-resolution pieces contribute `1`. Every value is at least
1068/// one node, so the weight is always `>= 1`.
1069fn min_value_nodes(schema: SchemaNode) -> usize {
1070    let mut visited = BTreeSet::new();
1071    min_value_nodes_piece(schema.root, schema.inner, &mut visited)
1072}
1073
1074/// Resolves a (possibly named) schema reference, guarding against named-type
1075/// cycles, then defers to [`min_value_nodes_piece`].
1076fn min_value_nodes_or_named(
1077    root: &Schema,
1078    node: SchemaPieceRefOrNamed,
1079    visited: &mut BTreeSet<usize>,
1080) -> usize {
1081    match node {
1082        SchemaPieceRefOrNamed::Piece(piece) => min_value_nodes_piece(root, piece, visited),
1083        SchemaPieceRefOrNamed::Named(idx) => {
1084            // A named-type cycle can only close through a record field; treat the
1085            // back-edge as a single node so we never over-estimate (and never
1086            // recurse forever).
1087            if !visited.insert(idx) {
1088                return 1;
1089            }
1090            let nodes = min_value_nodes_piece(root, &root.lookup(idx).piece, visited);
1091            visited.remove(&idx);
1092            nodes
1093        }
1094    }
1095}
1096
1097fn min_value_nodes_piece(
1098    root: &Schema,
1099    piece: &SchemaPiece,
1100    visited: &mut BTreeSet<usize>,
1101) -> usize {
1102    match piece {
1103        // A record materializes itself plus every one of its fields. This is the
1104        // only type that can be zero-width on the wire yet still allocate, so
1105        // counting its fields exactly is what makes the cap effective.
1106        SchemaPiece::Record { fields, .. } => fields.iter().fold(1, |acc, field| {
1107            acc.saturating_add(min_value_nodes_or_named(
1108                root,
1109                field.schema.as_ref(),
1110                visited,
1111            ))
1112        }),
1113        // Every other type materializes a single node for the purposes of this
1114        // lower bound: scalars and leaves trivially; an array/map at minimum an
1115        // empty collection (its contents bounded by its own cumulative cap); a
1116        // union its (input-bounded) branch index; and resolution pieces we don't
1117        // try to prove.
1118        _ => 1,
1119    }
1120}
1121
1122impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
1123    fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
1124        if self.done {
1125            return Ok(None);
1126        }
1127        if self.remaining == 0 {
1128            // TODO -- we can use len_in_bytes to quickly skip non-demanded arrays
1129            let (len, _len_in_bytes) = match zag_i64(self.r)? {
1130                len if len > 0 => (len as usize, None),
1131                neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
1132                0 => {
1133                    self.done = true;
1134                    return Ok(None);
1135                }
1136                _ => unreachable!(),
1137            };
1138            // Weight the count by the per-element node lower bound so the cap
1139            // bounds decoded memory, not just element count: a block of
1140            // zero-width-but-allocating elements (e.g. a record of `null`s)
1141            // amplifies a few wire bytes into millions of `Value`s otherwise.
1142            let block_nodes = len.saturating_mul(self.element_nodes);
1143            if block_nodes > MAX_VALUE_NODES {
1144                return Err(AvroError::Decode(DecodeError::Custom(format!(
1145                    "Avro array block length {len} exceeds limit {MAX_VALUE_NODES} \
1146                     decoded values"
1147                ))));
1148            }
1149            // Charge against the budget shared by every array/map in the datum,
1150            // so nested collections can't each get a fresh cap (see
1151            // `charge_value_nodes` / `MAX_VALUE_NODES`).
1152            charge_value_nodes("array", block_nodes)?;
1153            // A block of `len` items occupies at least `len * min_elem` bytes,
1154            // so a count needing more than the remaining input can't be honest;
1155            // reject it rather than let it drive an unbounded allocation (see
1156            // `Skip::remaining_input`). Unlike a map entry — which always carries
1157            // at least a one-byte key-length varint — an array item can encode to
1158            // zero bytes (`null`, an empty record), so this bound only applies
1159            // when the element type has a proven positive byte floor. For
1160            // zero-width element types (`min_elem == 0`) we rely on the
1161            // cumulative `MAX_VALUE_NODES` cap; otherwise a valid datum such as a
1162            // ten-element `array<null>` (encoded as `[20, 0]`) would be wrongly
1163            // rejected.
1164            if let Some(remaining) = self.r.remaining_input() {
1165                let min_elem = min_encoded_len(self.schema);
1166                if min_elem > 0 && len.saturating_mul(min_elem) > remaining {
1167                    return Err(AvroError::Decode(DecodeError::Custom(format!(
1168                        "Avro array block length {len} exceeds remaining input ({remaining} bytes)"
1169                    ))));
1170                }
1171            }
1172            self.remaining = len;
1173        }
1174        assert!(self.remaining > 0);
1175        self.remaining -= 1;
1176        let des = GeneralDeserializer {
1177            schema: self.schema,
1178        };
1179        des.deserialize(self.r, d).map(Some)
1180    }
1181}
1182
1183#[macro_export]
1184macro_rules! define_unexpected {
1185    (record) => {
1186        fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
1187            self,
1188            _a: &mut A,
1189        ) -> Result<Self::Out, $crate::error::Error> {
1190            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
1191        }
1192    };
1193    (union_branch) => {
1194        fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
1195            self,
1196            _idx: usize,
1197            _n_variants: usize,
1198            _null_variant: Option<usize>,
1199            _deserializer: D,
1200            _reader: &'avro_macro_lifetime mut R,
1201        ) -> Result<Self::Out, $crate::error::Error> {
1202            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
1203        }
1204    };
1205    (array) => {
1206        fn array<A: $crate::AvroArrayAccess>(
1207            self,
1208            _a: &mut A,
1209        ) -> Result<Self::Out, $crate::error::Error> {
1210            Err($crate::error::Error::Decode(
1211                $crate::error::DecodeError::UnexpectedArray,
1212            ))
1213        }
1214    };
1215    (map) => {
1216        fn map<M: $crate::AvroMapAccess>(
1217            self,
1218            _m: &mut M,
1219        ) -> Result<Self::Out, $crate::error::Error> {
1220            Err($crate::error::Error::Decode(
1221                $crate::error::DecodeError::UnexpectedMap,
1222            ))
1223        }
1224    };
1225    (enum_variant) => {
1226        fn enum_variant(
1227            self,
1228            _symbol: &str,
1229            _idx: usize,
1230        ) -> Result<Self::Out, $crate::error::Error> {
1231            Err($crate::error::Error::Decode(
1232                $crate::error::DecodeError::UnexpectedEnum,
1233            ))
1234        }
1235    };
1236    (scalar) => {
1237        fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
1238            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
1239        }
1240    };
1241    (decimal) => {
1242        fn decimal<'avro_macro_lifetime, R: AvroRead>(
1243            self,
1244            _precision: usize,
1245            _scale: usize,
1246            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1247        ) -> Result<Self::Out, $crate::error::Error> {
1248            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
1249        }
1250    };
1251    (bytes) => {
1252        fn bytes<'avro_macro_lifetime, R: AvroRead>(
1253            self,
1254            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1255        ) -> Result<Self::Out, $crate::error::Error> {
1256            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
1257        }
1258    };
1259    (string) => {
1260        fn string<'avro_macro_lifetime, R: AvroRead>(
1261            self,
1262            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
1263        ) -> Result<Self::Out, $crate::error::Error> {
1264            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
1265        }
1266    };
1267    (json) => {
1268        fn json<'avro_macro_lifetime, R: AvroRead>(
1269            self,
1270            _r: $crate::ValueOrReader<
1271                'avro_macro_lifetime,
1272                &'avro_macro_lifetime serde_json::Value,
1273                R,
1274            >,
1275        ) -> Result<Self::Out, $crate::error::Error> {
1276            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
1277        }
1278    };
1279    (uuid) => {
1280        fn uuid<'avro_macro_lifetime, R: AvroRead>(
1281            self,
1282            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1283        ) -> Result<Self::Out, $crate::error::Error> {
1284            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
1285        }
1286    };
1287    (fixed) => {
1288        fn fixed<'avro_macro_lifetime, R: AvroRead>(
1289            self,
1290            _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1291        ) -> Result<Self::Out, $crate::error::Error> {
1292            Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
1293        }
1294    };
1295    ($($kind:ident),+) => {
1296        $($crate::define_unexpected!{$kind})+
1297    }
1298}
1299
1300pub trait AvroDecode: Sized {
1301    type Out;
1302    fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1303        self,
1304        _a: &mut A,
1305    ) -> Result<Self::Out, AvroError>;
1306
1307    fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1308        self,
1309        _idx: usize,
1310        _n_variants: usize,
1311        _null_variant: Option<usize>,
1312        _deserializer: D,
1313        _reader: &'a mut R,
1314    ) -> Result<Self::Out, AvroError>;
1315
1316    fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
1317
1318    fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
1319
1320    fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
1321
1322    fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
1323
1324    fn decimal<'a, R: AvroRead>(
1325        self,
1326        _precision: usize,
1327        _scale: usize,
1328        _r: ValueOrReader<'a, &'a [u8], R>,
1329    ) -> Result<Self::Out, AvroError>;
1330
1331    fn bytes<'a, R: AvroRead>(
1332        self,
1333        _r: ValueOrReader<'a, &'a [u8], R>,
1334    ) -> Result<Self::Out, AvroError>;
1335    fn string<'a, R: AvroRead>(
1336        self,
1337        _r: ValueOrReader<'a, &'a str, R>,
1338    ) -> Result<Self::Out, AvroError>;
1339    fn json<'a, R: AvroRead>(
1340        self,
1341        _r: ValueOrReader<'a, &'a serde_json::Value, R>,
1342    ) -> Result<Self::Out, AvroError>;
1343    fn uuid<'a, R: AvroRead>(
1344        self,
1345        _r: ValueOrReader<'a, &'a [u8], R>,
1346    ) -> Result<Self::Out, AvroError>;
1347    fn fixed<'a, R: AvroRead>(
1348        self,
1349        _r: ValueOrReader<'a, &'a [u8], R>,
1350    ) -> Result<Self::Out, AvroError>;
1351    fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
1352        self,
1353        f: F,
1354    ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
1355        public_decoders::MappingDecoder::new(self, f)
1356    }
1357}
1358
1359pub mod public_decoders {
1360
1361    use std::collections::BTreeMap;
1362
1363    use crate::error::{DecodeError, Error as AvroError};
1364    use crate::types::{DecimalValue, Scalar, Value};
1365    use crate::{
1366        AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
1367    };
1368
1369    use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
1370
1371    macro_rules! define_simple_decoder {
1372        ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
1373            pub struct $name;
1374            impl AvroDecode for $name {
1375                type Out = $out;
1376                fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
1377                    let out = match scalar {
1378                        $(
1379                            Scalar::$scalar_branch(inner) => {inner.try_into()?}
1380                        ),*
1381                            other => return Err(AvroError::Decode(
1382                                DecodeError::UnexpectedScalarKind(other.into()),
1383                            ))
1384                    };
1385                    Ok(out)
1386                }
1387                define_unexpected! {
1388                    array, record, union_branch, map,
1389                    enum_variant, decimal, bytes, string,
1390                    json, uuid, fixed
1391                }
1392            }
1393
1394            impl StatefulAvroDecodable for $out {
1395                type Decoder = $name;
1396                type State = ();
1397                fn new_decoder(_state: ()) -> $name {
1398                    $name
1399                }
1400            }
1401        }
1402    }
1403
1404    define_simple_decoder!(I32Decoder, i32, Int;Long);
1405    define_simple_decoder!(I64Decoder, i64, Int;Long);
1406    define_simple_decoder!(U64Decoder, u64, Int;Long);
1407    define_simple_decoder!(UsizeDecoder, usize, Int;Long);
1408    define_simple_decoder!(IsizeDecoder, isize, Int;Long);
1409
1410    pub struct MappingDecoder<
1411        T,
1412        InnerOut,
1413        Inner: AvroDecode<Out = InnerOut>,
1414        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1415    > {
1416        inner: Inner,
1417        conv: Conv,
1418    }
1419
1420    impl<
1421        T,
1422        InnerOut,
1423        Inner: AvroDecode<Out = InnerOut>,
1424        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1425    > MappingDecoder<T, InnerOut, Inner, Conv>
1426    {
1427        pub fn new(inner: Inner, conv: Conv) -> Self {
1428            Self { inner, conv }
1429        }
1430    }
1431
1432    impl<
1433        T,
1434        InnerOut,
1435        Inner: AvroDecode<Out = InnerOut>,
1436        Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1437    > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
1438    {
1439        type Out = T;
1440
1441        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1442            mut self,
1443            a: &mut A,
1444        ) -> Result<Self::Out, AvroError> {
1445            (self.conv)(self.inner.record(a)?)
1446        }
1447
1448        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1449            mut self,
1450            idx: usize,
1451            n_variants: usize,
1452            null_variant: Option<usize>,
1453            deserializer: D,
1454            reader: &'a mut R,
1455        ) -> Result<Self::Out, AvroError> {
1456            (self.conv)(self.inner.union_branch(
1457                idx,
1458                n_variants,
1459                null_variant,
1460                deserializer,
1461                reader,
1462            )?)
1463        }
1464
1465        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1466            (self.conv)(self.inner.array(a)?)
1467        }
1468
1469        fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
1470            (self.conv)(self.inner.map(m)?)
1471        }
1472
1473        fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
1474            (self.conv)(self.inner.enum_variant(symbol, idx)?)
1475        }
1476
1477        fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
1478            (self.conv)(self.inner.scalar(scalar)?)
1479        }
1480
1481        fn decimal<'a, R: AvroRead>(
1482            mut self,
1483            precision: usize,
1484            scale: usize,
1485            r: ValueOrReader<'a, &'a [u8], R>,
1486        ) -> Result<Self::Out, AvroError> {
1487            (self.conv)(self.inner.decimal(precision, scale, r)?)
1488        }
1489
1490        fn bytes<'a, R: AvroRead>(
1491            mut self,
1492            r: ValueOrReader<'a, &'a [u8], R>,
1493        ) -> Result<Self::Out, AvroError> {
1494            (self.conv)(self.inner.bytes(r)?)
1495        }
1496
1497        fn string<'a, R: AvroRead>(
1498            mut self,
1499            r: ValueOrReader<'a, &'a str, R>,
1500        ) -> Result<Self::Out, AvroError> {
1501            (self.conv)(self.inner.string(r)?)
1502        }
1503
1504        fn json<'a, R: AvroRead>(
1505            mut self,
1506            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1507        ) -> Result<Self::Out, AvroError> {
1508            (self.conv)(self.inner.json(r)?)
1509        }
1510
1511        fn uuid<'a, R: AvroRead>(
1512            mut self,
1513            r: ValueOrReader<'a, &'a [u8], R>,
1514        ) -> Result<Self::Out, AvroError> {
1515            (self.conv)(self.inner.uuid(r)?)
1516        }
1517
1518        fn fixed<'a, R: AvroRead>(
1519            mut self,
1520            r: ValueOrReader<'a, &'a [u8], R>,
1521        ) -> Result<Self::Out, AvroError> {
1522            (self.conv)(self.inner.fixed(r)?)
1523        }
1524    }
1525    pub struct ArrayAsVecDecoder<
1526        InnerOut,
1527        Inner: AvroDecode<Out = InnerOut>,
1528        Ctor: FnMut() -> Inner,
1529    > {
1530        ctor: Ctor,
1531        buf: Vec<InnerOut>,
1532    }
1533
1534    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
1535        ArrayAsVecDecoder<InnerOut, Inner, Ctor>
1536    {
1537        pub fn new(ctor: Ctor) -> Self {
1538            Self { ctor, buf: vec![] }
1539        }
1540    }
1541    impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
1542        for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
1543    {
1544        type Out = Vec<InnerOut>;
1545        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1546            while let Some(next) = a.decode_next((self.ctor)())? {
1547                self.buf.push(next);
1548            }
1549            Ok(self.buf)
1550        }
1551        define_unexpected! {
1552            record, union_branch, map, enum_variant,
1553            scalar, decimal, bytes, string, json, uuid,
1554            fixed
1555        }
1556    }
1557
1558    pub struct DefaultArrayAsVecDecoder<T> {
1559        buf: Vec<T>,
1560    }
1561    impl<T> Default for DefaultArrayAsVecDecoder<T> {
1562        fn default() -> Self {
1563            Self { buf: vec![] }
1564        }
1565    }
1566    impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
1567        type Out = Vec<T>;
1568        fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1569            while let Some(next) = {
1570                let inner = T::new_decoder();
1571                a.decode_next(inner)?
1572            } {
1573                self.buf.push(next);
1574            }
1575            Ok(self.buf)
1576        }
1577        define_unexpected! {
1578            record, union_branch, map, enum_variant,
1579            scalar, decimal, bytes, string, json, uuid,
1580            fixed
1581        }
1582    }
1583    impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
1584        type Decoder = DefaultArrayAsVecDecoder<T>;
1585        type State = ();
1586
1587        fn new_decoder(_state: Self::State) -> Self::Decoder {
1588            DefaultArrayAsVecDecoder::<T>::default()
1589        }
1590    }
1591    pub struct TrivialDecoder;
1592
1593    impl TrivialDecoder {
1594        fn maybe_skip<'a, V, R: AvroRead>(
1595            self,
1596            r: ValueOrReader<'a, V, R>,
1597        ) -> Result<(), AvroError> {
1598            if let ValueOrReader::Reader { len, r } = r {
1599                Ok(r.skip(len)?)
1600            } else {
1601                Ok(())
1602            }
1603        }
1604    }
1605
1606    impl AvroDecode for TrivialDecoder {
1607        type Out = ();
1608        fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
1609            while let Some((_, _, f)) = a.next_field()? {
1610                f.decode_field(TrivialDecoder)?;
1611            }
1612            Ok(())
1613        }
1614        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1615            self,
1616            _idx: usize,
1617            _n_variants: usize,
1618            _null_variant: Option<usize>,
1619            deserializer: D,
1620            reader: &'a mut R,
1621        ) -> Result<(), AvroError> {
1622            deserializer.deserialize(reader, self)
1623        }
1624
1625        fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1626            Ok(())
1627        }
1628        fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1629            Ok(())
1630        }
1631        fn decimal<'a, R: AvroRead>(
1632            self,
1633            _precision: usize,
1634            _scale: usize,
1635            r: ValueOrReader<'a, &'a [u8], R>,
1636        ) -> Result<(), AvroError> {
1637            self.maybe_skip(r)
1638        }
1639        fn bytes<'a, R: AvroRead>(
1640            self,
1641            r: ValueOrReader<'a, &'a [u8], R>,
1642        ) -> Result<(), AvroError> {
1643            self.maybe_skip(r)
1644        }
1645        fn string<'a, R: AvroRead>(
1646            self,
1647            r: ValueOrReader<'a, &'a str, R>,
1648        ) -> Result<(), AvroError> {
1649            self.maybe_skip(r)
1650        }
1651        fn json<'a, R: AvroRead>(
1652            self,
1653            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1654        ) -> Result<(), AvroError> {
1655            self.maybe_skip(r)
1656        }
1657        fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1658            self.maybe_skip(r)
1659        }
1660        fn fixed<'a, R: AvroRead>(
1661            self,
1662            r: ValueOrReader<'a, &'a [u8], R>,
1663        ) -> Result<(), AvroError> {
1664            self.maybe_skip(r)
1665        }
1666        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1667            while a.decode_next(TrivialDecoder)?.is_some() {}
1668            Ok(())
1669        }
1670
1671        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1672            while let Some((_n, entry)) = m.next_entry()? {
1673                entry.decode_field(TrivialDecoder)?
1674            }
1675            Ok(())
1676        }
1677    }
1678    pub struct ValueDecoder;
1679    impl AvroDecode for ValueDecoder {
1680        type Out = Value;
1681        fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1682            self,
1683            a: &mut A,
1684        ) -> Result<Value, AvroError> {
1685            let mut fields = vec![];
1686            while let Some((name, idx, f)) = a.next_field()? {
1687                let next = ValueDecoder;
1688                let val = f.decode_field(next)?;
1689                fields.push((idx, (name.to_string(), val)));
1690            }
1691            fields.sort_by_key(|(idx, _)| *idx);
1692
1693            Ok(Value::Record(
1694                fields
1695                    .into_iter()
1696                    .map(|(_idx, (name, val))| (name, val))
1697                    .collect(),
1698            ))
1699        }
1700        fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1701            self,
1702            index: usize,
1703            n_variants: usize,
1704            null_variant: Option<usize>,
1705            deserializer: D,
1706            reader: &'a mut R,
1707        ) -> Result<Value, AvroError> {
1708            let next = ValueDecoder;
1709            let inner = Box::new(deserializer.deserialize(reader, next)?);
1710            Ok(Value::Union {
1711                index,
1712                inner,
1713                n_variants,
1714                null_variant,
1715            })
1716        }
1717        fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1718            let mut items = vec![];
1719            loop {
1720                let next = ValueDecoder;
1721
1722                if let Some(value) = a.decode_next(next)? {
1723                    items.push(value)
1724                } else {
1725                    break;
1726                }
1727            }
1728            Ok(Value::Array(items))
1729        }
1730        fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1731            Ok(Value::Enum(idx, symbol.to_string()))
1732        }
1733        fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1734            Ok(scalar.into())
1735        }
1736        fn decimal<'a, R: AvroRead>(
1737            self,
1738            precision: usize,
1739            scale: usize,
1740            r: ValueOrReader<'a, &'a [u8], R>,
1741        ) -> Result<Value, AvroError> {
1742            let unscaled = match r {
1743                ValueOrReader::Value(buf) => buf.to_vec(),
1744                ValueOrReader::Reader { len, r } => {
1745                    let mut buf = vec![];
1746                    buf.resize_with(len, Default::default);
1747                    r.read_exact(&mut buf)?;
1748                    buf
1749                }
1750            };
1751            Ok(Value::Decimal(DecimalValue {
1752                unscaled,
1753                precision,
1754                scale,
1755            }))
1756        }
1757        fn bytes<'a, R: AvroRead>(
1758            self,
1759            r: ValueOrReader<'a, &'a [u8], R>,
1760        ) -> Result<Value, AvroError> {
1761            let buf = match r {
1762                ValueOrReader::Value(buf) => buf.to_vec(),
1763                ValueOrReader::Reader { len, r } => {
1764                    let mut buf = vec![];
1765                    buf.resize_with(len, Default::default);
1766                    r.read_exact(&mut buf)?;
1767                    buf
1768                }
1769            };
1770            Ok(Value::Bytes(buf))
1771        }
1772        fn string<'a, R: AvroRead>(
1773            self,
1774            r: ValueOrReader<'a, &'a str, R>,
1775        ) -> Result<Value, AvroError> {
1776            let s = match r {
1777                ValueOrReader::Value(s) => s.to_string(),
1778                ValueOrReader::Reader { len, r } => {
1779                    let mut buf = vec![];
1780                    buf.resize_with(len, Default::default);
1781                    r.read_exact(&mut buf)?;
1782                    String::from_utf8(buf)
1783                        .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1784                }
1785            };
1786            Ok(Value::String(s))
1787        }
1788        fn json<'a, R: AvroRead>(
1789            self,
1790            r: ValueOrReader<'a, &'a serde_json::Value, R>,
1791        ) -> Result<Value, AvroError> {
1792            let val = match r {
1793                ValueOrReader::Value(val) => val.clone(),
1794                ValueOrReader::Reader { len, r } => {
1795                    let mut buf = vec![];
1796                    buf.resize_with(len, Default::default);
1797                    r.read_exact(&mut buf)?;
1798                    serde_json::from_slice(&buf).map_err(|e| {
1799                        AvroError::Decode(DecodeError::BadJson {
1800                            category: e.classify(),
1801                            bytes: buf.to_owned(),
1802                        })
1803                    })?
1804                }
1805            };
1806            Ok(Value::Json(val))
1807        }
1808        fn uuid<'a, R: AvroRead>(
1809            self,
1810            r: ValueOrReader<'a, &'a [u8], R>,
1811        ) -> Result<Value, AvroError> {
1812            let buf = match r {
1813                ValueOrReader::Value(val) => val.to_vec(),
1814                ValueOrReader::Reader { len, r } => {
1815                    let mut buf = vec![];
1816                    buf.resize_with(len, Default::default);
1817                    r.read_exact(&mut buf)?;
1818                    buf
1819                }
1820            };
1821            let s = std::str::from_utf8(&buf)
1822                .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1823            let val =
1824                uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1825            Ok(Value::Uuid(val))
1826        }
1827        fn fixed<'a, R: AvroRead>(
1828            self,
1829            r: ValueOrReader<'a, &'a [u8], R>,
1830        ) -> Result<Value, AvroError> {
1831            let buf = match r {
1832                ValueOrReader::Value(buf) => buf.to_vec(),
1833                ValueOrReader::Reader { len, r } => {
1834                    let mut buf = vec![];
1835                    buf.resize_with(len, Default::default);
1836                    r.read_exact(&mut buf)?;
1837                    buf
1838                }
1839            };
1840            Ok(Value::Fixed(buf.len(), buf))
1841        }
1842        fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1843            let mut entries = BTreeMap::new();
1844            while let Some((name, a)) = m.next_entry()? {
1845                let d = ValueDecoder;
1846                let val = a.decode_field(d)?;
1847                entries.insert(name, val);
1848            }
1849            Ok(Value::Map(entries))
1850        }
1851    }
1852}
1853
1854impl<'a> AvroDeserializer for &'a Value {
1855    fn deserialize<R: AvroRead, D: AvroDecode>(
1856        self,
1857        _r: &mut R,
1858        d: D,
1859    ) -> Result<D::Out, AvroError> {
1860        give_value(d, self)
1861    }
1862}
1863
1864pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1865    use ValueOrReader::Value as V;
1866    match v {
1867        Value::Null => d.scalar(Scalar::Null),
1868        Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1869        Value::Int(val) => d.scalar(Scalar::Int(*val)),
1870        Value::Long(val) => d.scalar(Scalar::Long(*val)),
1871        Value::Float(val) => d.scalar(Scalar::Float(*val)),
1872        Value::Double(val) => d.scalar(Scalar::Double(*val)),
1873        Value::Date(val) => d.scalar(Scalar::Date(*val)),
1874        Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1875        // The &[u8] parameter here (and elsewhere in this function) is arbitrary, but we have to put in something in order for the function
1876        // to type-check
1877        Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1878        Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1879        Value::String(val) => d.string::<&[u8]>(V(val)),
1880        Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1881        Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1882        Value::Union {
1883            index,
1884            inner,
1885            n_variants,
1886            null_variant,
1887        } => {
1888            let mut empty_reader: &[u8] = &[];
1889            d.union_branch(
1890                *index,
1891                *n_variants,
1892                *null_variant,
1893                &**inner,
1894                &mut empty_reader,
1895            )
1896        }
1897        Value::Array(val) => {
1898            let mut a = ValueArrayAccess::new(val);
1899            d.array(&mut a)
1900        }
1901        Value::Map(val) => {
1902            let vals: Vec<_> = val.clone().into_iter().collect();
1903            let mut m = ValueMapAccess::new(vals.as_slice());
1904            d.map(&mut m)
1905        }
1906        Value::Record(val) => {
1907            let mut a = ValueRecordAccess::new(val);
1908            d.record(&mut a)
1909        }
1910        Value::Json(val) => d.json::<&[u8]>(V(val)),
1911        Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1912    }
1913}
1914
1915pub trait AvroDeserializer {
1916    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1917}
1918
1919#[derive(Clone, Copy)]
1920pub struct GeneralDeserializer<'a> {
1921    pub schema: SchemaNode<'a>,
1922}
1923
1924/// Cap on recursive `GeneralDeserializer::deserialize` calls. Avro records
1925/// may reference themselves (`{"name":"X","type":"record","fields":[
1926/// {"name":"x","type":"X"}]}`), so a malicious file plus matching wire
1927/// bytes can recurse forever and overflow the stack.
1928const MAX_DECODE_DEPTH: usize = 128;
1929
1930thread_local! {
1931    static DECODE_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
1932    /// Cumulative `Value` nodes decoded so far in the current top-level decode,
1933    /// shared by every array and map in the datum and bounded by
1934    /// [`MAX_VALUE_NODES`]. Reset to `0` at each top-level entry (see
1935    /// [`DecodeDepthGuard::enter`]) so the budget composes across nesting
1936    /// instead of resetting per collection. Charged via [`charge_value_nodes`].
1937    static DECODE_NODES: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
1938}
1939
1940struct DecodeDepthGuard;
1941impl DecodeDepthGuard {
1942    fn enter() -> Result<Self, AvroError> {
1943        DECODE_DEPTH.with(|d| {
1944            let new = d.get() + 1;
1945            if new > MAX_DECODE_DEPTH {
1946                return Err(AvroError::Decode(DecodeError::Custom(format!(
1947                    "Avro decode depth exceeds limit {MAX_DECODE_DEPTH}"
1948                ))));
1949            }
1950            d.set(new);
1951            // The `Value`-node budget is shared across every array/map in one
1952            // datum so nesting can't multiply the cap (see `MAX_VALUE_NODES`).
1953            // This is the top-level entry (depth 0 -> 1), so reset it: each datum
1954            // starts fresh even if a previous decode on this thread errored out
1955            // partway and left the counter non-zero.
1956            if new == 1 {
1957                DECODE_NODES.with(|n| n.set(0));
1958            }
1959            Ok(DecodeDepthGuard)
1960        })
1961    }
1962}
1963impl Drop for DecodeDepthGuard {
1964    fn drop(&mut self) {
1965        DECODE_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
1966    }
1967}
1968
1969/// Charges `nodes` against the per-datum [`DECODE_NODES`] budget shared by every
1970/// array and map in a single top-level decode, rejecting once the cumulative
1971/// total exceeds [`MAX_VALUE_NODES`]. `kind` (`"array"` / `"map"`) only labels
1972/// the error.
1973///
1974/// The budget is shared — rather than tracked per collection instance — so the
1975/// cap composes across nesting; see [`MAX_VALUE_NODES`] for why a per-collection
1976/// budget would let nested zero-width collections amplify past it.
1977fn charge_value_nodes(kind: &str, nodes: usize) -> Result<(), AvroError> {
1978    DECODE_NODES.with(|n| {
1979        let total = n.get().saturating_add(nodes);
1980        if total > MAX_VALUE_NODES {
1981            return Err(AvroError::Decode(DecodeError::Custom(format!(
1982                "Avro {kind} decode exceeds cumulative limit {MAX_VALUE_NODES} decoded values"
1983            ))));
1984        }
1985        n.set(total);
1986        Ok(())
1987    })
1988}
1989
1990impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1991    fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1992        let _guard = DecodeDepthGuard::enter()?;
1993        use ValueOrReader::Reader;
1994        match self.schema.inner {
1995            SchemaPiece::Null => d.scalar(Scalar::Null),
1996            SchemaPiece::Boolean => {
1997                let mut buf = [0u8; 1];
1998                r.read_exact(&mut buf[..])?;
1999                let val = match buf[0] {
2000                    0u8 => false,
2001                    1u8 => true,
2002                    other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
2003                };
2004                d.scalar(Scalar::Boolean(val))
2005            }
2006            SchemaPiece::Int => {
2007                let val = zag_i32(r)?;
2008                d.scalar(Scalar::Int(val))
2009            }
2010            SchemaPiece::Long => {
2011                let val = zag_i64(r)?;
2012                d.scalar(Scalar::Long(val))
2013            }
2014            SchemaPiece::Float => {
2015                let val = decode_float(r)?;
2016                d.scalar(Scalar::Float(val))
2017            }
2018            SchemaPiece::Double => {
2019                let val = decode_double(r)?;
2020                d.scalar(Scalar::Double(val))
2021            }
2022            SchemaPiece::Date => {
2023                let days = zag_i32(r)?;
2024                d.scalar(Scalar::Date(days))
2025            }
2026            SchemaPiece::TimestampMilli => {
2027                let total_millis = zag_i64(r)?;
2028                let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
2029                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
2030                    _ => unreachable!(),
2031                };
2032                d.scalar(scalar)
2033            }
2034            SchemaPiece::TimestampMicro => {
2035                let total_micros = zag_i64(r)?;
2036                let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
2037                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
2038                    _ => unreachable!(),
2039                };
2040                d.scalar(scalar)
2041            }
2042            SchemaPiece::Decimal {
2043                precision,
2044                scale,
2045                fixed_size,
2046            } => {
2047                let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
2048                d.decimal(*precision, *scale, Reader { len, r })
2049            }
2050            SchemaPiece::Bytes => {
2051                let len = decode_len(r)?;
2052                d.bytes(Reader { len, r })
2053            }
2054            SchemaPiece::String => {
2055                let len = decode_len(r)?;
2056                d.string(Reader { len, r })
2057            }
2058            SchemaPiece::Json => {
2059                let len = decode_len(r)?;
2060                d.json(Reader { len, r })
2061            }
2062            SchemaPiece::Uuid => {
2063                let len = decode_len(r)?;
2064                d.uuid(Reader { len, r })
2065            }
2066            SchemaPiece::Array(inner) => {
2067                // From the spec:
2068                // Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
2069                // If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
2070
2071                let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
2072                d.array(&mut a)
2073            }
2074            SchemaPiece::Map(inner) => {
2075                // See logic for `SchemaPiece::Array` above. Maps are encoded similarly.
2076                let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
2077                d.map(&mut m)
2078            }
2079            SchemaPiece::Union(inner) => {
2080                let index = decode_long_nonneg(r)? as usize;
2081                let variants = inner.variants();
2082                match variants.get(index) {
2083                    Some(variant) => {
2084                        let n_variants = variants.len();
2085                        let null_variant = variants
2086                            .iter()
2087                            .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
2088                        let dsr = GeneralDeserializer {
2089                            schema: self.schema.step(variant),
2090                        };
2091                        d.union_branch(index, n_variants, null_variant, dsr, r)
2092                    }
2093                    None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
2094                        index,
2095                        len: variants.len(),
2096                    })),
2097                }
2098            }
2099            SchemaPiece::ResolveIntLong => {
2100                let val = zag_i32(r)? as i64;
2101                d.scalar(Scalar::Long(val))
2102            }
2103            SchemaPiece::ResolveIntFloat => {
2104                let val = zag_i32(r)? as f32;
2105                d.scalar(Scalar::Float(val))
2106            }
2107            SchemaPiece::ResolveIntDouble => {
2108                let val = zag_i32(r)? as f64;
2109                d.scalar(Scalar::Double(val))
2110            }
2111            SchemaPiece::ResolveLongFloat => {
2112                let val = zag_i64(r)? as f32;
2113                d.scalar(Scalar::Float(val))
2114            }
2115            SchemaPiece::ResolveLongDouble => {
2116                let val = zag_i64(r)? as f64;
2117                d.scalar(Scalar::Double(val))
2118            }
2119            SchemaPiece::ResolveFloatDouble => {
2120                let val = decode_float(r)? as f64;
2121                d.scalar(Scalar::Double(val))
2122            }
2123            SchemaPiece::ResolveConcreteUnion {
2124                index,
2125                inner,
2126                n_reader_variants,
2127                reader_null_variant,
2128            } => {
2129                let dsr = GeneralDeserializer {
2130                    schema: self.schema.step(&**inner),
2131                };
2132                d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
2133            }
2134            SchemaPiece::ResolveUnionUnion {
2135                permutation,
2136                n_reader_variants,
2137                reader_null_variant,
2138            } => {
2139                let index = decode_long_nonneg(r)? as usize;
2140                if index >= permutation.len() {
2141                    return Err(AvroError::Decode(DecodeError::BadUnionIndex {
2142                        index,
2143                        len: permutation.len(),
2144                    }));
2145                }
2146                match &permutation[index] {
2147                    Err(e) => Err(e.clone()),
2148                    Ok((index, variant)) => {
2149                        let dsr = GeneralDeserializer {
2150                            schema: self.schema.step(variant),
2151                        };
2152                        d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
2153                    }
2154                }
2155            }
2156            SchemaPiece::ResolveUnionConcrete { index, inner } => {
2157                let found_index = decode_long_nonneg(r)? as usize;
2158                if *index != found_index {
2159                    Err(AvroError::Decode(DecodeError::WrongUnionIndex {
2160                        expected: *index,
2161                        actual: found_index,
2162                    }))
2163                } else {
2164                    let dsr = GeneralDeserializer {
2165                        schema: self.schema.step(inner.as_ref()),
2166                    };
2167                    // The reader is not expecting a union here, so don't call `D::union_branch`
2168                    dsr.deserialize(r, d)
2169                }
2170            }
2171            SchemaPiece::Record {
2172                doc: _,
2173                fields,
2174                lookup: _,
2175            } => {
2176                let mut a = SimpleRecordAccess::new(self.schema, r, fields);
2177                d.record(&mut a)
2178            }
2179            SchemaPiece::Enum {
2180                symbols,
2181                doc: _,
2182                default_idx: _,
2183            } => {
2184                let index = decode_int_nonneg(r)? as usize;
2185                match symbols.get(index) {
2186                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
2187                        index,
2188                        len: symbols.len(),
2189                    })),
2190                    Some(symbol) => d.enum_variant(symbol, index),
2191                }
2192            }
2193            SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
2194            // XXX - This does not deliver fields to the consumer in the same order they were
2195            // declared in the reader schema, which might cause headache for consumers...
2196            // Unfortunately, there isn't a good way to do so without pre-decoding the whole record
2197            // (which would require a lot of allocations)
2198            // and then sorting the fields. So, just let the consumer deal with re-ordering.
2199            SchemaPiece::ResolveRecord {
2200                defaults,
2201                fields,
2202                n_reader_fields: _,
2203            } => {
2204                let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
2205                d.record(&mut a)
2206            }
2207            SchemaPiece::ResolveEnum {
2208                doc: _,
2209                symbols,
2210                default,
2211            } => {
2212                let index = decode_int_nonneg(r)? as usize;
2213                match symbols.get(index) {
2214                    None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
2215                        index,
2216                        len: symbols.len(),
2217                    })),
2218                    Some(op) => match op {
2219                        Err(missing) => {
2220                            if let Some((reader_index, symbol)) = default.clone() {
2221                                d.enum_variant(&symbol, reader_index)
2222                            } else {
2223                                Err(AvroError::Decode(DecodeError::MissingEnumIndex {
2224                                    index,
2225                                    symbol: missing.clone(),
2226                                }))
2227                            }
2228                        }
2229                        Ok((index, name)) => d.enum_variant(name, *index),
2230                    },
2231                }
2232            }
2233            SchemaPiece::ResolveIntTsMilli => {
2234                let total_millis = zag_i32(r)?;
2235                let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
2236                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
2237                    _ => unreachable!(),
2238                };
2239                d.scalar(scalar)
2240            }
2241            SchemaPiece::ResolveIntTsMicro => {
2242                let total_micros = zag_i32(r)?;
2243                let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
2244                    Value::Timestamp(ts) => Scalar::Timestamp(ts),
2245                    _ => unreachable!(),
2246                };
2247                d.scalar(scalar)
2248            }
2249            SchemaPiece::ResolveDateTimestamp => {
2250                let days = zag_i32(r)?;
2251
2252                let date = NaiveDate::from_ymd_opt(1970, 1, 1)
2253                    .expect("naive date known valid")
2254                    .checked_add_signed(
2255                        chrono::Duration::try_days(days.into())
2256                            .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
2257                    )
2258                    .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
2259                let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
2260                d.scalar(Scalar::Timestamp(dt))
2261            }
2262        }
2263    }
2264}
2265/// Decode a `Value` from avro format given its `Schema`.
2266pub fn decode<'a, R: AvroRead>(
2267    schema: SchemaNode<'a>,
2268    reader: &'a mut R,
2269) -> Result<Value, AvroError> {
2270    let d = ValueDecoder;
2271    let dsr = GeneralDeserializer { schema };
2272    let val = dsr.deserialize(reader, d)?;
2273    Ok(val)
2274}