1use std::cmp;
25use std::collections::BTreeSet;
26use std::fmt::{self, Display};
27use std::fs::File;
28use std::io::{self, Cursor, Read, Seek, SeekFrom};
29
30use chrono::{DateTime, NaiveDate};
31use flate2::read::MultiGzDecoder;
32
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35 RecordField, ResolvedDefaultValueField, ResolvedRecordField, Schema, SchemaNode, SchemaPiece,
36 SchemaPieceOrNamed, SchemaPieceRefOrNamed,
37};
38use crate::types::{Scalar, Value};
39use crate::util::{TsUnit, safe_len, zag_i32, zag_i64};
40use crate::{TrivialDecoder, ValueDecoder};
41
42pub trait StatefulAvroDecodable: Sized {
43 type Decoder: AvroDecode<Out = Self>;
44 type State;
45 fn new_decoder(state: Self::State) -> Self::Decoder;
46}
47pub trait AvroDecodable: Sized {
48 type Decoder: AvroDecode<Out = Self>;
49
50 fn new_decoder() -> Self::Decoder;
51}
52impl<T> AvroDecodable for T
53where
54 T: StatefulAvroDecodable,
55 T::State: Default,
56{
57 type Decoder = <Self as StatefulAvroDecodable>::Decoder;
58
59 fn new_decoder() -> Self::Decoder {
60 <Self as StatefulAvroDecodable>::new_decoder(Default::default())
61 }
62}
63#[inline]
64fn decode_long_nonneg<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
65 let u = match zag_i64(reader)? {
66 i if i >= 0 => i as u64,
67 i => return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(i))),
68 };
69 Ok(u)
70}
71
72fn decode_int_nonneg<R: Read>(reader: &mut R) -> Result<u32, AvroError> {
73 let u = match zag_i32(reader)? {
74 i if i >= 0 => i as u32,
75 i => {
76 return Err(AvroError::Decode(DecodeError::ExpectedNonnegInteger(
77 i as i64,
78 )));
79 }
80 };
81 Ok(u)
82}
83
84#[inline]
85fn decode_len<R: Read>(reader: &mut R) -> Result<usize, AvroError> {
86 zag_i64(reader).and_then(|i| safe_len(i as usize))
87}
88
89#[inline]
90fn decode_float<R: Read>(reader: &mut R) -> Result<f32, AvroError> {
91 let mut buf = [0u8; 4];
92 reader.read_exact(&mut buf[..])?;
93 Ok(f32::from_le_bytes(buf))
94}
95
96#[inline]
97fn decode_double<R: Read>(reader: &mut R) -> Result<f64, AvroError> {
98 let mut buf = [0u8; 8];
99 reader.read_exact(&mut buf[..])?;
100 Ok(f64::from_le_bytes(buf))
101}
102
103impl Display for TsUnit {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 match self {
106 TsUnit::Millis => write!(f, "ms"),
107 TsUnit::Micros => write!(f, "us"),
108 }
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use chrono::DateTime;
115
116 use crate::types::Value;
117 use crate::util::TsUnit;
118
119 use super::build_ts_value;
120
121 #[mz_ore::test]
122 fn test_negative_timestamps() {
123 assert_eq!(
124 build_ts_value(-1, TsUnit::Millis).unwrap(),
125 Value::Timestamp(
126 DateTime::from_timestamp(-1, 999_000_000)
127 .unwrap()
128 .naive_utc()
129 )
130 );
131 assert_eq!(
132 build_ts_value(-1000, TsUnit::Millis).unwrap(),
133 Value::Timestamp(DateTime::from_timestamp(-1, 0).unwrap().naive_utc())
134 );
135 assert_eq!(
136 build_ts_value(-1000, TsUnit::Micros).unwrap(),
137 Value::Timestamp(
138 DateTime::from_timestamp(-1, 999_000_000)
139 .unwrap()
140 .naive_utc()
141 )
142 );
143 assert_eq!(
144 build_ts_value(-1, TsUnit::Micros).unwrap(),
145 Value::Timestamp(
146 DateTime::from_timestamp(-1, 999_999_000)
147 .unwrap()
148 .naive_utc()
149 )
150 );
151 assert_eq!(
152 build_ts_value(-123_456_789_123, TsUnit::Micros).unwrap(),
153 Value::Timestamp(
154 DateTime::from_timestamp(-123_457, (1_000_000 - 789_123) * 1_000)
155 .unwrap()
156 .naive_utc()
157 )
158 );
159 }
160
161 #[mz_ore::test]
162 fn array_block_len_bounded_by_remaining_input() {
163 use std::str::FromStr;
168
169 use super::{AvroDeserializer, GeneralDeserializer};
170 use crate::util::zig_i64;
171 use crate::{Schema, ValueDecoder};
172
173 let schema = Schema::from_str(r#"{"type": "array", "items": "long"}"#).unwrap();
174 let mut body = Vec::new();
175 zig_i64(8_000_000, &mut body); let dsr = GeneralDeserializer {
177 schema: schema.top_node(),
178 };
179 let mut reader: &[u8] = &body;
180 let res = dsr.deserialize(&mut reader, ValueDecoder);
181 assert!(
182 res.is_err(),
183 "an array block longer than the remaining input must be rejected, not allocated"
184 );
185 }
186
187 #[mz_ore::test]
188 fn zero_width_array_elements_decode() {
189 use std::str::FromStr;
195
196 use super::{AvroDeserializer, GeneralDeserializer};
197 use crate::types::Value;
198 use crate::util::zig_i64;
199 use crate::{Schema, ValueDecoder};
200
201 for (items, want) in [
202 (r#""null""#, Value::Null),
203 (
204 r#"{"type": "record", "name": "Empty", "fields": []}"#,
205 Value::Record(vec![]),
206 ),
207 ] {
208 let schema =
209 Schema::from_str(&format!(r#"{{"type": "array", "items": {items}}}"#)).unwrap();
210 let mut body = Vec::new();
211 zig_i64(10, &mut body); body.push(0); let dsr = GeneralDeserializer {
214 schema: schema.top_node(),
215 };
216 let mut reader: &[u8] = &body;
217 let decoded = dsr
218 .deserialize(&mut reader, ValueDecoder)
219 .expect("a zero-width array element type must decode, not be rejected");
220 assert_eq!(decoded, Value::Array(vec![want; 10]));
221 }
222 }
223
224 #[mz_ore::test]
225 fn valid_null_array_falsely_rejected() {
226 use std::str::FromStr;
230
231 use super::{AvroDeserializer, GeneralDeserializer};
232 use crate::encode::encode_to_vec;
233 use crate::types::Value;
234 use crate::{Schema, ValueDecoder};
235
236 let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
237 let value = Value::Array(vec![Value::Null, Value::Null]);
238 let body = encode_to_vec(&value, &schema);
239
240 let dsr = GeneralDeserializer {
241 schema: schema.top_node(),
242 };
243 let mut reader: &[u8] = &body;
244 let res = dsr.deserialize(&mut reader, ValueDecoder);
245 assert!(
246 res.is_ok(),
247 "an encoder-produced array of nulls should round-trip, but got: {res:?}"
248 );
249 }
250
251 #[mz_ore::test]
252 fn zero_width_array_elements_decode_across_blocks() {
253 use std::str::FromStr;
256
257 use super::{AvroDeserializer, GeneralDeserializer};
258 use crate::types::Value;
259 use crate::util::zig_i64;
260 use crate::{Schema, ValueDecoder};
261
262 let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
263 let mut body = Vec::new();
264 zig_i64(4, &mut body);
265 zig_i64(6, &mut body);
266 body.push(0);
267
268 let dsr = GeneralDeserializer {
269 schema: schema.top_node(),
270 };
271 let mut reader: &[u8] = &body;
272 let decoded = dsr
273 .deserialize(&mut reader, ValueDecoder)
274 .expect("zero-width arrays may span multiple blocks below the cap");
275 assert_eq!(decoded, Value::Array(vec![Value::Null; 10]));
276 }
277
278 #[mz_ore::test]
279 fn zero_width_array_total_len_bounded_across_blocks() {
280 use std::str::FromStr;
285
286 use super::{AvroArrayAccess, DECODE_NODES, MAX_VALUE_NODES, SimpleArrayAccess};
287 use crate::util::zig_i64;
288 use crate::{Schema, TrivialDecoder};
289
290 let schema = Schema::from_str(r#""null""#).unwrap();
291 let mut body = Vec::new();
292 zig_i64(1, &mut body);
293
294 let mut reader: &[u8] = &body;
295 let mut access = SimpleArrayAccess::new(&mut reader, schema.top_node());
296 DECODE_NODES.with(|n| n.set(MAX_VALUE_NODES));
299
300 let err = access
301 .decode_next(TrivialDecoder)
302 .expect_err("a new block past the cumulative node budget must be rejected");
303 DECODE_NODES.with(|n| n.set(0));
304 assert!(
305 err.to_string().contains("exceeds cumulative limit"),
306 "unexpected error: {err}"
307 );
308 }
309
310 #[mz_ore::test]
311 fn zero_width_record_array_bounded() {
312 use std::str::FromStr;
319
320 use super::{AvroDeserializer, GeneralDeserializer};
321 use crate::util::zig_i64;
322 use crate::{Schema, ValueDecoder};
323
324 let schema = Schema::from_str(
325 r#"{"type": "array", "items":
326 {"type": "record", "name": "R", "fields": [{"name": "g0", "type": "null"}]}}"#,
327 )
328 .unwrap();
329 let mut body = Vec::new();
332 zig_i64(100_000_000, &mut body);
333 let dsr = GeneralDeserializer {
334 schema: schema.top_node(),
335 };
336 let mut reader: &[u8] = &body;
337 let res = dsr.deserialize(&mut reader, ValueDecoder);
338 assert!(
339 res.is_err(),
340 "an array of zero-width records longer than the node cap must be rejected, not allocated"
341 );
342 }
343
344 #[mz_ore::test]
345 fn small_zero_width_record_array_decodes() {
346 use std::str::FromStr;
350
351 use super::{AvroDeserializer, GeneralDeserializer};
352 use crate::types::Value;
353 use crate::util::zig_i64;
354 use crate::{Schema, ValueDecoder};
355
356 let schema = Schema::from_str(
357 r#"{"type": "array", "items":
358 {"type": "record", "name": "R", "fields": [{"name": "g0", "type": "null"}]}}"#,
359 )
360 .unwrap();
361 let mut body = Vec::new();
362 zig_i64(10, &mut body);
363 body.push(0);
364 let dsr = GeneralDeserializer {
365 schema: schema.top_node(),
366 };
367 let mut reader: &[u8] = &body;
368 let decoded = dsr
369 .deserialize(&mut reader, ValueDecoder)
370 .expect("a below-cap array of zero-width records must decode, not be rejected");
371 let want = Value::Record(vec![("g0".to_string(), Value::Null)]);
372 assert_eq!(decoded, Value::Array(vec![want; 10]));
373 }
374
375 #[mz_ore::test]
376 fn nested_zero_width_collection_shares_node_budget() {
377 use std::str::FromStr;
387
388 use super::{AvroDeserializer, GeneralDeserializer, MAX_VALUE_NODES};
389 use crate::util::zig_i64;
390 use crate::{Schema, ValueDecoder};
391
392 let schema = Schema::from_str(
393 r#"{"type": "array", "items":
394 {"type": "record", "name": "Outer", "fields": [
395 {"name": "inner", "type":
396 {"type": "array", "items":
397 {"type": "record", "name": "Inner",
398 "fields": [{"name": "g0", "type": "null"}]}}}]}}"#,
399 )
400 .unwrap();
401 let mut body = Vec::new();
409 zig_i64(1, &mut body);
410 zig_i64((MAX_VALUE_NODES / 2) as i64, &mut body);
411 let dsr = GeneralDeserializer {
412 schema: schema.top_node(),
413 };
414 let mut reader: &[u8] = &body;
415 let err = dsr.deserialize(&mut reader, ValueDecoder).expect_err(
416 "a nested array claiming MAX_VALUE_NODES on top of the outer spend must be rejected",
417 );
418 assert!(
419 err.to_string().contains("exceeds cumulative limit"),
420 "unexpected error: {err}"
421 );
422 }
423
424 #[mz_ore::test]
425 fn top_level_decode_resets_stale_node_budget() {
426 use std::str::FromStr;
432
433 use super::{AvroDeserializer, DECODE_NODES, GeneralDeserializer, MAX_VALUE_NODES};
434 use crate::types::Value;
435 use crate::util::zig_i64;
436 use crate::{Schema, ValueDecoder};
437
438 let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap();
439 let mut body = Vec::new();
440 zig_i64(3, &mut body);
441 body.push(0);
442
443 DECODE_NODES.with(|n| n.set(MAX_VALUE_NODES));
444 let dsr = GeneralDeserializer {
445 schema: schema.top_node(),
446 };
447 let mut reader: &[u8] = &body;
448 let decoded = dsr
449 .deserialize(&mut reader, ValueDecoder)
450 .expect("the top-level entry must reset a stale node budget");
451 assert_eq!(decoded, Value::Array(vec![Value::Null; 3]));
452 }
453}
454
455pub fn build_ts_value(value: i64, unit: TsUnit) -> Result<Value, AvroError> {
457 let result = match unit {
458 TsUnit::Millis => DateTime::from_timestamp_millis(value),
459 TsUnit::Micros => DateTime::from_timestamp_micros(value),
460 };
461 let ndt = result.ok_or(AvroError::Decode(DecodeError::BadTimestamp { unit, value }))?;
462 Ok(Value::Timestamp(ndt.naive_utc()))
463}
464
465pub trait AvroRead: Read + Skip {}
470
471impl<T> AvroRead for T where T: Read + Skip {}
472
473pub trait Skip: Read {
475 #[allow(clippy::unused_io_amount)]
491 fn skip(&mut self, mut len: usize) -> Result<(), io::Error> {
492 const BUF_SIZE: usize = 512;
493 let mut buf = [0; BUF_SIZE];
494
495 while len > 0 {
496 let n = if len < BUF_SIZE {
497 self.read(&mut buf[..len])?
498 } else {
499 self.read(&mut buf)?
500 };
501 if n == 0 {
502 break;
503 }
504 len -= n;
505 }
506 Ok(())
507 }
508
509 fn remaining_input(&self) -> Option<usize> {
517 None
518 }
519}
520
521impl Skip for File {
522 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
523 self.seek(SeekFrom::Current(len as i64))?;
524 Ok(())
525 }
526}
527
528impl Skip for &[u8] {
529 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
530 let len = cmp::min(len, self.len());
531 *self = &self[len..];
532 Ok(())
533 }
534
535 fn remaining_input(&self) -> Option<usize> {
536 Some(self.len())
537 }
538}
539
540impl<S: Skip + ?Sized> Skip for Box<S> {
541 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
542 self.as_mut().skip(len)
543 }
544
545 fn remaining_input(&self) -> Option<usize> {
546 self.as_ref().remaining_input()
547 }
548}
549
550impl<T: AsRef<[u8]>> Skip for Cursor<T> {
551 fn skip(&mut self, len: usize) -> Result<(), io::Error> {
552 self.seek(SeekFrom::Current(len as i64))?;
553 Ok(())
554 }
555
556 fn remaining_input(&self) -> Option<usize> {
557 let total = self.get_ref().as_ref().len();
558 Some(total.saturating_sub(usize::try_from(self.position()).unwrap_or(usize::MAX)))
559 }
560}
561
562impl<R: Read> Skip for MultiGzDecoder<R> {}
563
564pub enum ValueOrReader<'a, V, R: AvroRead> {
565 Value(V),
566 Reader { len: usize, r: &'a mut R },
567}
568
569enum SchemaOrDefault<'b, R: AvroRead> {
570 Schema(&'b mut R, SchemaNode<'b>),
571 Default(&'b Value),
572}
573pub struct AvroFieldAccess<'b, R: AvroRead> {
574 schema: SchemaOrDefault<'b, R>,
575}
576
577impl<'b, R: AvroRead> AvroFieldAccess<'b, R> {
578 pub fn decode_field<D: AvroDecode>(self, d: D) -> Result<D::Out, AvroError> {
579 match self.schema {
580 SchemaOrDefault::Schema(r, schema) => {
581 let des = GeneralDeserializer { schema };
582 des.deserialize(r, d)
583 }
584 SchemaOrDefault::Default(value) => give_value(d, value),
585 }
586 }
587}
588
589pub trait AvroRecordAccess<R: AvroRead> {
590 fn next_field<'b>(
591 &'b mut self,
592 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError>;
593}
594
595struct SimpleRecordAccess<'a, R: AvroRead> {
596 schema: SchemaNode<'a>,
597 r: &'a mut R,
598 fields: &'a [RecordField],
599 i: usize,
600}
601
602impl<'a, R: AvroRead> SimpleRecordAccess<'a, R> {
603 fn new(schema: SchemaNode<'a>, r: &'a mut R, fields: &'a [RecordField]) -> Self {
604 Self {
605 schema,
606 r,
607 fields,
608 i: 0,
609 }
610 }
611}
612
613impl<'a, R: AvroRead> AvroRecordAccess<R> for SimpleRecordAccess<'a, R> {
614 fn next_field<'b>(
615 &'b mut self,
616 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
617 assert!(self.i <= self.fields.len());
618 if self.i == self.fields.len() {
619 Ok(None)
620 } else {
621 let f = &self.fields[self.i];
622 self.i += 1;
623 Ok(Some((
624 f.name.as_str(),
625 f.position,
626 AvroFieldAccess {
627 schema: SchemaOrDefault::Schema(self.r, self.schema.step(&f.schema)),
628 },
629 )))
630 }
631 }
632}
633
634struct ValueRecordAccess<'a> {
635 values: &'a [(String, Value)],
636 i: usize,
637}
638
639impl<'a> ValueRecordAccess<'a> {
640 fn new(values: &'a [(String, Value)]) -> Self {
641 Self { values, i: 0 }
642 }
643}
644
645impl<'a> AvroRecordAccess<&'a [u8]> for ValueRecordAccess<'a> {
646 fn next_field<'b>(
647 &'b mut self,
648 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, &'a [u8]>)>, AvroError> {
649 assert!(self.i <= self.values.len());
650 if self.i == self.values.len() {
651 Ok(None)
652 } else {
653 let (name, val) = &self.values[self.i];
654 self.i += 1;
655 Ok(Some((
656 name.as_str(),
657 self.i - 1,
658 AvroFieldAccess {
659 schema: SchemaOrDefault::Default(val),
660 },
661 )))
662 }
663 }
664}
665
666struct ValueMapAccess<'a> {
667 values: &'a [(String, Value)],
668 i: usize,
669}
670
671impl<'a> ValueMapAccess<'a> {
672 fn new(values: &'a [(String, Value)]) -> Self {
673 Self { values, i: 0 }
674 }
675}
676
677impl<'a> AvroMapAccess for ValueMapAccess<'a> {
678 type R = &'a [u8];
679 fn next_entry<'b>(
680 &'b mut self,
681 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError> {
682 assert!(self.i <= self.values.len());
683 if self.i == self.values.len() {
684 Ok(None)
685 } else {
686 let (name, val) = &self.values[self.i];
687 self.i += 1;
688 Ok(Some((
689 name.clone(),
690 AvroFieldAccess {
691 schema: SchemaOrDefault::Default(val),
692 },
693 )))
694 }
695 }
696}
697
698struct ResolvedRecordAccess<'a, R: AvroRead> {
699 defaults: &'a [ResolvedDefaultValueField],
700 i_defaults: usize,
701 fields: &'a [ResolvedRecordField],
702 i_fields: usize,
703 r: &'a mut R,
704 schema: SchemaNode<'a>,
705}
706
707impl<'a, R: AvroRead> ResolvedRecordAccess<'a, R> {
708 fn new(
709 defaults: &'a [ResolvedDefaultValueField],
710 fields: &'a [ResolvedRecordField],
711 r: &'a mut R,
712 schema: SchemaNode<'a>,
713 ) -> Self {
714 Self {
715 defaults,
716 i_defaults: 0,
717 fields,
718 i_fields: 0,
719 r,
720 schema,
721 }
722 }
723}
724
725impl<'a, R: AvroRead> AvroRecordAccess<R> for ResolvedRecordAccess<'a, R> {
726 fn next_field<'b>(
727 &'b mut self,
728 ) -> Result<Option<(&'b str, usize, AvroFieldAccess<'b, R>)>, AvroError> {
729 assert!(self.i_defaults <= self.defaults.len() && self.i_fields <= self.fields.len());
730 if self.i_defaults < self.defaults.len() {
731 let default = &self.defaults[self.i_defaults];
732 self.i_defaults += 1;
733 Ok(Some((
734 default.name.as_str(),
735 default.position,
736 AvroFieldAccess {
737 schema: SchemaOrDefault::Default(&default.default),
738 },
739 )))
740 } else {
741 while self.i_fields < self.fields.len() {
742 let field = &self.fields[self.i_fields];
743 self.i_fields += 1;
744 match field {
745 ResolvedRecordField::Absent(absent_schema) => {
746 let d = GeneralDeserializer {
748 schema: absent_schema.top_node(),
749 };
750 d.deserialize(self.r, TrivialDecoder)?;
751 continue;
752 }
753 ResolvedRecordField::Present(field) => {
754 return Ok(Some((
755 field.name.as_str(),
756 field.position,
757 AvroFieldAccess {
758 schema: SchemaOrDefault::Schema(
759 self.r,
760 self.schema.step(&field.schema),
761 ),
762 },
763 )));
764 }
765 }
766 }
767 Ok(None)
768 }
769 }
770}
771
772pub trait AvroArrayAccess {
773 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError>;
774}
775
776pub trait AvroMapAccess {
777 type R: AvroRead;
778 fn next_entry<'b>(
779 &'b mut self,
780 ) -> Result<Option<(String, AvroFieldAccess<'b, Self::R>)>, AvroError>;
781}
782
783pub struct SimpleMapAccess<'a, R: AvroRead> {
784 entry_schema: SchemaNode<'a>,
785 r: &'a mut R,
786 done: bool,
787 remaining: usize,
788 entry_nodes: usize,
792}
793
794impl<'a, R: AvroRead> SimpleMapAccess<'a, R> {
795 fn new(entry_schema: SchemaNode<'a>, r: &'a mut R) -> Self {
796 Self {
797 entry_schema,
798 r,
799 done: false,
800 remaining: 0,
801 entry_nodes: 1usize.saturating_add(min_value_nodes(entry_schema)),
803 }
804 }
805}
806
807impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> {
808 type R = R;
809 fn next_entry<'b>(&'b mut self) -> Result<Option<(String, AvroFieldAccess<'b, R>)>, AvroError> {
810 if self.done {
811 return Ok(None);
812 }
813 if self.remaining == 0 {
814 let (len, _len_in_bytes) = match zag_i64(self.r)? {
816 len if len > 0 => (len as usize, None),
817 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
818 0 => {
819 self.done = true;
820 return Ok(None);
821 }
822 _ => unreachable!(),
823 };
824 let block_nodes = len.saturating_mul(self.entry_nodes);
829 if block_nodes > MAX_VALUE_NODES {
830 return Err(AvroError::Decode(DecodeError::Custom(format!(
831 "Avro map block length {len} exceeds limit {MAX_VALUE_NODES} decoded values"
832 ))));
833 }
834 charge_value_nodes("map", block_nodes)?;
838 if let Some(remaining) = self.r.remaining_input() {
845 if len > remaining {
846 return Err(AvroError::Decode(DecodeError::Custom(format!(
847 "Avro map block length {len} exceeds remaining input ({remaining} bytes)"
848 ))));
849 }
850 }
851 self.remaining = len;
852 }
853 assert!(self.remaining > 0);
854 self.remaining -= 1;
855
856 let key_len = decode_len(self.r)?;
859 let mut key_buf = vec![];
860 key_buf.resize_with(key_len, Default::default);
861 self.r.read_exact(&mut key_buf)?;
862 let key = String::from_utf8(key_buf)
863 .map_err(|_e| AvroError::Decode(DecodeError::MapKeyUtf8Error))?;
864
865 let a = AvroFieldAccess {
866 schema: SchemaOrDefault::Schema(self.r, self.entry_schema),
867 };
868 Ok(Some((key, a)))
869 }
870}
871
872struct SimpleArrayAccess<'a, R: AvroRead> {
873 r: &'a mut R,
874 schema: SchemaNode<'a>,
875 remaining: usize,
876 element_nodes: usize,
880 done: bool,
881}
882
883impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> {
884 fn new(r: &'a mut R, schema: SchemaNode<'a>) -> Self {
885 Self {
886 r,
887 schema,
888 remaining: 0,
889 element_nodes: min_value_nodes(schema),
890 done: false,
891 }
892 }
893}
894
895struct ValueArrayAccess<'a> {
896 values: &'a [Value],
897 i: usize,
898}
899
900impl<'a> ValueArrayAccess<'a> {
901 fn new(values: &'a [Value]) -> Self {
902 Self { values, i: 0 }
903 }
904}
905
906impl<'a> AvroArrayAccess for ValueArrayAccess<'a> {
907 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
908 assert!(self.i <= self.values.len());
909 if self.i == self.values.len() {
910 Ok(None)
911 } else {
912 let val = give_value(d, &self.values[self.i])?;
913 self.i += 1;
914 Ok(Some(val))
915 }
916 }
917}
918
919const MAX_VALUE_NODES: usize = 1 << 22;
953
954fn min_encoded_len(schema: SchemaNode) -> usize {
971 let mut visited = BTreeSet::new();
972 min_encoded_len_piece(schema.root, schema.inner, &mut visited)
973}
974
975fn min_encoded_len_or_named(
978 root: &Schema,
979 node: SchemaPieceRefOrNamed,
980 visited: &mut BTreeSet<usize>,
981) -> usize {
982 match node {
983 SchemaPieceRefOrNamed::Piece(piece) => min_encoded_len_piece(root, piece, visited),
984 SchemaPieceRefOrNamed::Named(idx) => {
985 if !visited.insert(idx) {
988 return 0;
989 }
990 let len = min_encoded_len_piece(root, &root.lookup(idx).piece, visited);
991 visited.remove(&idx);
992 len
993 }
994 }
995}
996
997fn min_encoded_len_piece(
998 root: &Schema,
999 piece: &SchemaPiece,
1000 visited: &mut BTreeSet<usize>,
1001) -> usize {
1002 match piece {
1003 SchemaPiece::Null => 0,
1005 SchemaPiece::Boolean
1007 | SchemaPiece::Int
1008 | SchemaPiece::Long
1009 | SchemaPiece::Date
1010 | SchemaPiece::TimestampMilli
1011 | SchemaPiece::TimestampMicro => 1,
1012 SchemaPiece::Float => 4,
1013 SchemaPiece::Double => 8,
1014 SchemaPiece::Decimal {
1017 fixed_size: Some(size),
1018 ..
1019 } => *size,
1020 SchemaPiece::Decimal {
1021 fixed_size: None, ..
1022 }
1023 | SchemaPiece::Bytes
1024 | SchemaPiece::String
1025 | SchemaPiece::Json
1026 | SchemaPiece::Uuid => 1,
1027 SchemaPiece::Array(_) | SchemaPiece::Map(_) => 1,
1030 SchemaPiece::Union(_) => 1,
1032 SchemaPiece::Enum { .. } => 1,
1034 SchemaPiece::Fixed { size } => *size,
1035 SchemaPiece::Record { fields, .. } => fields.iter().fold(0, |acc, field| {
1039 acc.saturating_add(min_encoded_len_or_named(
1040 root,
1041 field.schema.as_ref(),
1042 visited,
1043 ))
1044 }),
1045 _ => 0,
1048 }
1049}
1050
1051fn min_value_nodes(schema: SchemaNode) -> usize {
1070 let mut visited = BTreeSet::new();
1071 min_value_nodes_piece(schema.root, schema.inner, &mut visited)
1072}
1073
1074fn min_value_nodes_or_named(
1077 root: &Schema,
1078 node: SchemaPieceRefOrNamed,
1079 visited: &mut BTreeSet<usize>,
1080) -> usize {
1081 match node {
1082 SchemaPieceRefOrNamed::Piece(piece) => min_value_nodes_piece(root, piece, visited),
1083 SchemaPieceRefOrNamed::Named(idx) => {
1084 if !visited.insert(idx) {
1088 return 1;
1089 }
1090 let nodes = min_value_nodes_piece(root, &root.lookup(idx).piece, visited);
1091 visited.remove(&idx);
1092 nodes
1093 }
1094 }
1095}
1096
1097fn min_value_nodes_piece(
1098 root: &Schema,
1099 piece: &SchemaPiece,
1100 visited: &mut BTreeSet<usize>,
1101) -> usize {
1102 match piece {
1103 SchemaPiece::Record { fields, .. } => fields.iter().fold(1, |acc, field| {
1107 acc.saturating_add(min_value_nodes_or_named(
1108 root,
1109 field.schema.as_ref(),
1110 visited,
1111 ))
1112 }),
1113 _ => 1,
1119 }
1120}
1121
1122impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> {
1123 fn decode_next<D: AvroDecode>(&mut self, d: D) -> Result<Option<D::Out>, AvroError> {
1124 if self.done {
1125 return Ok(None);
1126 }
1127 if self.remaining == 0 {
1128 let (len, _len_in_bytes) = match zag_i64(self.r)? {
1130 len if len > 0 => (len as usize, None),
1131 neglen if neglen < 0 => (neglen.unsigned_abs() as usize, Some(decode_len(self.r)?)),
1132 0 => {
1133 self.done = true;
1134 return Ok(None);
1135 }
1136 _ => unreachable!(),
1137 };
1138 let block_nodes = len.saturating_mul(self.element_nodes);
1143 if block_nodes > MAX_VALUE_NODES {
1144 return Err(AvroError::Decode(DecodeError::Custom(format!(
1145 "Avro array block length {len} exceeds limit {MAX_VALUE_NODES} \
1146 decoded values"
1147 ))));
1148 }
1149 charge_value_nodes("array", block_nodes)?;
1153 if let Some(remaining) = self.r.remaining_input() {
1165 let min_elem = min_encoded_len(self.schema);
1166 if min_elem > 0 && len.saturating_mul(min_elem) > remaining {
1167 return Err(AvroError::Decode(DecodeError::Custom(format!(
1168 "Avro array block length {len} exceeds remaining input ({remaining} bytes)"
1169 ))));
1170 }
1171 }
1172 self.remaining = len;
1173 }
1174 assert!(self.remaining > 0);
1175 self.remaining -= 1;
1176 let des = GeneralDeserializer {
1177 schema: self.schema,
1178 };
1179 des.deserialize(self.r, d).map(Some)
1180 }
1181}
1182
1183#[macro_export]
1184macro_rules! define_unexpected {
1185 (record) => {
1186 fn record<R: $crate::AvroRead, A: $crate::AvroRecordAccess<R>>(
1187 self,
1188 _a: &mut A,
1189 ) -> Result<Self::Out, $crate::error::Error> {
1190 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedRecord))
1191 }
1192 };
1193 (union_branch) => {
1194 fn union_branch<'avro_macro_lifetime, R: $crate::AvroRead, D: $crate::AvroDeserializer>(
1195 self,
1196 _idx: usize,
1197 _n_variants: usize,
1198 _null_variant: Option<usize>,
1199 _deserializer: D,
1200 _reader: &'avro_macro_lifetime mut R,
1201 ) -> Result<Self::Out, $crate::error::Error> {
1202 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUnion))
1203 }
1204 };
1205 (array) => {
1206 fn array<A: $crate::AvroArrayAccess>(
1207 self,
1208 _a: &mut A,
1209 ) -> Result<Self::Out, $crate::error::Error> {
1210 Err($crate::error::Error::Decode(
1211 $crate::error::DecodeError::UnexpectedArray,
1212 ))
1213 }
1214 };
1215 (map) => {
1216 fn map<M: $crate::AvroMapAccess>(
1217 self,
1218 _m: &mut M,
1219 ) -> Result<Self::Out, $crate::error::Error> {
1220 Err($crate::error::Error::Decode(
1221 $crate::error::DecodeError::UnexpectedMap,
1222 ))
1223 }
1224 };
1225 (enum_variant) => {
1226 fn enum_variant(
1227 self,
1228 _symbol: &str,
1229 _idx: usize,
1230 ) -> Result<Self::Out, $crate::error::Error> {
1231 Err($crate::error::Error::Decode(
1232 $crate::error::DecodeError::UnexpectedEnum,
1233 ))
1234 }
1235 };
1236 (scalar) => {
1237 fn scalar(self, _scalar: $crate::types::Scalar) -> Result<Self::Out, $crate::error::Error> {
1238 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedScalar))
1239 }
1240 };
1241 (decimal) => {
1242 fn decimal<'avro_macro_lifetime, R: AvroRead>(
1243 self,
1244 _precision: usize,
1245 _scale: usize,
1246 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1247 ) -> Result<Self::Out, $crate::error::Error> {
1248 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedDecimal))
1249 }
1250 };
1251 (bytes) => {
1252 fn bytes<'avro_macro_lifetime, R: AvroRead>(
1253 self,
1254 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1255 ) -> Result<Self::Out, $crate::error::Error> {
1256 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedBytes))
1257 }
1258 };
1259 (string) => {
1260 fn string<'avro_macro_lifetime, R: AvroRead>(
1261 self,
1262 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime str, R>,
1263 ) -> Result<Self::Out, $crate::error::Error> {
1264 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedString))
1265 }
1266 };
1267 (json) => {
1268 fn json<'avro_macro_lifetime, R: AvroRead>(
1269 self,
1270 _r: $crate::ValueOrReader<
1271 'avro_macro_lifetime,
1272 &'avro_macro_lifetime serde_json::Value,
1273 R,
1274 >,
1275 ) -> Result<Self::Out, $crate::error::Error> {
1276 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedJson))
1277 }
1278 };
1279 (uuid) => {
1280 fn uuid<'avro_macro_lifetime, R: AvroRead>(
1281 self,
1282 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1283 ) -> Result<Self::Out, $crate::error::Error> {
1284 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedUuid))
1285 }
1286 };
1287 (fixed) => {
1288 fn fixed<'avro_macro_lifetime, R: AvroRead>(
1289 self,
1290 _r: $crate::ValueOrReader<'avro_macro_lifetime, &'avro_macro_lifetime [u8], R>,
1291 ) -> Result<Self::Out, $crate::error::Error> {
1292 Err($crate::error::Error::Decode($crate::error::DecodeError::UnexpectedFixed))
1293 }
1294 };
1295 ($($kind:ident),+) => {
1296 $($crate::define_unexpected!{$kind})+
1297 }
1298}
1299
1300pub trait AvroDecode: Sized {
1301 type Out;
1302 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1303 self,
1304 _a: &mut A,
1305 ) -> Result<Self::Out, AvroError>;
1306
1307 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1308 self,
1309 _idx: usize,
1310 _n_variants: usize,
1311 _null_variant: Option<usize>,
1312 _deserializer: D,
1313 _reader: &'a mut R,
1314 ) -> Result<Self::Out, AvroError>;
1315
1316 fn array<A: AvroArrayAccess>(self, _a: &mut A) -> Result<Self::Out, AvroError>;
1317
1318 fn map<M: AvroMapAccess>(self, _m: &mut M) -> Result<Self::Out, AvroError>;
1319
1320 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<Self::Out, AvroError>;
1321
1322 fn scalar(self, _scalar: Scalar) -> Result<Self::Out, AvroError>;
1323
1324 fn decimal<'a, R: AvroRead>(
1325 self,
1326 _precision: usize,
1327 _scale: usize,
1328 _r: ValueOrReader<'a, &'a [u8], R>,
1329 ) -> Result<Self::Out, AvroError>;
1330
1331 fn bytes<'a, R: AvroRead>(
1332 self,
1333 _r: ValueOrReader<'a, &'a [u8], R>,
1334 ) -> Result<Self::Out, AvroError>;
1335 fn string<'a, R: AvroRead>(
1336 self,
1337 _r: ValueOrReader<'a, &'a str, R>,
1338 ) -> Result<Self::Out, AvroError>;
1339 fn json<'a, R: AvroRead>(
1340 self,
1341 _r: ValueOrReader<'a, &'a serde_json::Value, R>,
1342 ) -> Result<Self::Out, AvroError>;
1343 fn uuid<'a, R: AvroRead>(
1344 self,
1345 _r: ValueOrReader<'a, &'a [u8], R>,
1346 ) -> Result<Self::Out, AvroError>;
1347 fn fixed<'a, R: AvroRead>(
1348 self,
1349 _r: ValueOrReader<'a, &'a [u8], R>,
1350 ) -> Result<Self::Out, AvroError>;
1351 fn map_decoder<T, F: FnMut(Self::Out) -> Result<T, AvroError>>(
1352 self,
1353 f: F,
1354 ) -> public_decoders::MappingDecoder<T, Self::Out, Self, F> {
1355 public_decoders::MappingDecoder::new(self, f)
1356 }
1357}
1358
1359pub mod public_decoders {
1360
1361 use std::collections::BTreeMap;
1362
1363 use crate::error::{DecodeError, Error as AvroError};
1364 use crate::types::{DecimalValue, Scalar, Value};
1365 use crate::{
1366 AvroArrayAccess, AvroDecode, AvroDeserializer, AvroRead, AvroRecordAccess, ValueOrReader,
1367 };
1368
1369 use super::{AvroDecodable, AvroMapAccess, StatefulAvroDecodable};
1370
1371 macro_rules! define_simple_decoder {
1372 ($name:ident, $out:ty, $($scalar_branch:ident);*) => {
1373 pub struct $name;
1374 impl AvroDecode for $name {
1375 type Out = $out;
1376 fn scalar(self, scalar: Scalar) -> Result<$out, AvroError> {
1377 let out = match scalar {
1378 $(
1379 Scalar::$scalar_branch(inner) => {inner.try_into()?}
1380 ),*
1381 other => return Err(AvroError::Decode(
1382 DecodeError::UnexpectedScalarKind(other.into()),
1383 ))
1384 };
1385 Ok(out)
1386 }
1387 define_unexpected! {
1388 array, record, union_branch, map,
1389 enum_variant, decimal, bytes, string,
1390 json, uuid, fixed
1391 }
1392 }
1393
1394 impl StatefulAvroDecodable for $out {
1395 type Decoder = $name;
1396 type State = ();
1397 fn new_decoder(_state: ()) -> $name {
1398 $name
1399 }
1400 }
1401 }
1402 }
1403
1404 define_simple_decoder!(I32Decoder, i32, Int;Long);
1405 define_simple_decoder!(I64Decoder, i64, Int;Long);
1406 define_simple_decoder!(U64Decoder, u64, Int;Long);
1407 define_simple_decoder!(UsizeDecoder, usize, Int;Long);
1408 define_simple_decoder!(IsizeDecoder, isize, Int;Long);
1409
1410 pub struct MappingDecoder<
1411 T,
1412 InnerOut,
1413 Inner: AvroDecode<Out = InnerOut>,
1414 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1415 > {
1416 inner: Inner,
1417 conv: Conv,
1418 }
1419
1420 impl<
1421 T,
1422 InnerOut,
1423 Inner: AvroDecode<Out = InnerOut>,
1424 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1425 > MappingDecoder<T, InnerOut, Inner, Conv>
1426 {
1427 pub fn new(inner: Inner, conv: Conv) -> Self {
1428 Self { inner, conv }
1429 }
1430 }
1431
1432 impl<
1433 T,
1434 InnerOut,
1435 Inner: AvroDecode<Out = InnerOut>,
1436 Conv: FnMut(InnerOut) -> Result<T, AvroError>,
1437 > AvroDecode for MappingDecoder<T, InnerOut, Inner, Conv>
1438 {
1439 type Out = T;
1440
1441 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1442 mut self,
1443 a: &mut A,
1444 ) -> Result<Self::Out, AvroError> {
1445 (self.conv)(self.inner.record(a)?)
1446 }
1447
1448 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1449 mut self,
1450 idx: usize,
1451 n_variants: usize,
1452 null_variant: Option<usize>,
1453 deserializer: D,
1454 reader: &'a mut R,
1455 ) -> Result<Self::Out, AvroError> {
1456 (self.conv)(self.inner.union_branch(
1457 idx,
1458 n_variants,
1459 null_variant,
1460 deserializer,
1461 reader,
1462 )?)
1463 }
1464
1465 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1466 (self.conv)(self.inner.array(a)?)
1467 }
1468
1469 fn map<M: AvroMapAccess>(mut self, m: &mut M) -> Result<Self::Out, AvroError> {
1470 (self.conv)(self.inner.map(m)?)
1471 }
1472
1473 fn enum_variant(mut self, symbol: &str, idx: usize) -> Result<Self::Out, AvroError> {
1474 (self.conv)(self.inner.enum_variant(symbol, idx)?)
1475 }
1476
1477 fn scalar(mut self, scalar: Scalar) -> Result<Self::Out, AvroError> {
1478 (self.conv)(self.inner.scalar(scalar)?)
1479 }
1480
1481 fn decimal<'a, R: AvroRead>(
1482 mut self,
1483 precision: usize,
1484 scale: usize,
1485 r: ValueOrReader<'a, &'a [u8], R>,
1486 ) -> Result<Self::Out, AvroError> {
1487 (self.conv)(self.inner.decimal(precision, scale, r)?)
1488 }
1489
1490 fn bytes<'a, R: AvroRead>(
1491 mut self,
1492 r: ValueOrReader<'a, &'a [u8], R>,
1493 ) -> Result<Self::Out, AvroError> {
1494 (self.conv)(self.inner.bytes(r)?)
1495 }
1496
1497 fn string<'a, R: AvroRead>(
1498 mut self,
1499 r: ValueOrReader<'a, &'a str, R>,
1500 ) -> Result<Self::Out, AvroError> {
1501 (self.conv)(self.inner.string(r)?)
1502 }
1503
1504 fn json<'a, R: AvroRead>(
1505 mut self,
1506 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1507 ) -> Result<Self::Out, AvroError> {
1508 (self.conv)(self.inner.json(r)?)
1509 }
1510
1511 fn uuid<'a, R: AvroRead>(
1512 mut self,
1513 r: ValueOrReader<'a, &'a [u8], R>,
1514 ) -> Result<Self::Out, AvroError> {
1515 (self.conv)(self.inner.uuid(r)?)
1516 }
1517
1518 fn fixed<'a, R: AvroRead>(
1519 mut self,
1520 r: ValueOrReader<'a, &'a [u8], R>,
1521 ) -> Result<Self::Out, AvroError> {
1522 (self.conv)(self.inner.fixed(r)?)
1523 }
1524 }
1525 pub struct ArrayAsVecDecoder<
1526 InnerOut,
1527 Inner: AvroDecode<Out = InnerOut>,
1528 Ctor: FnMut() -> Inner,
1529 > {
1530 ctor: Ctor,
1531 buf: Vec<InnerOut>,
1532 }
1533
1534 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner>
1535 ArrayAsVecDecoder<InnerOut, Inner, Ctor>
1536 {
1537 pub fn new(ctor: Ctor) -> Self {
1538 Self { ctor, buf: vec![] }
1539 }
1540 }
1541 impl<InnerOut, Inner: AvroDecode<Out = InnerOut>, Ctor: FnMut() -> Inner> AvroDecode
1542 for ArrayAsVecDecoder<InnerOut, Inner, Ctor>
1543 {
1544 type Out = Vec<InnerOut>;
1545 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1546 while let Some(next) = a.decode_next((self.ctor)())? {
1547 self.buf.push(next);
1548 }
1549 Ok(self.buf)
1550 }
1551 define_unexpected! {
1552 record, union_branch, map, enum_variant,
1553 scalar, decimal, bytes, string, json, uuid,
1554 fixed
1555 }
1556 }
1557
1558 pub struct DefaultArrayAsVecDecoder<T> {
1559 buf: Vec<T>,
1560 }
1561 impl<T> Default for DefaultArrayAsVecDecoder<T> {
1562 fn default() -> Self {
1563 Self { buf: vec![] }
1564 }
1565 }
1566 impl<T: AvroDecodable> AvroDecode for DefaultArrayAsVecDecoder<T> {
1567 type Out = Vec<T>;
1568 fn array<A: AvroArrayAccess>(mut self, a: &mut A) -> Result<Self::Out, AvroError> {
1569 while let Some(next) = {
1570 let inner = T::new_decoder();
1571 a.decode_next(inner)?
1572 } {
1573 self.buf.push(next);
1574 }
1575 Ok(self.buf)
1576 }
1577 define_unexpected! {
1578 record, union_branch, map, enum_variant,
1579 scalar, decimal, bytes, string, json, uuid,
1580 fixed
1581 }
1582 }
1583 impl<T: AvroDecodable> StatefulAvroDecodable for Vec<T> {
1584 type Decoder = DefaultArrayAsVecDecoder<T>;
1585 type State = ();
1586
1587 fn new_decoder(_state: Self::State) -> Self::Decoder {
1588 DefaultArrayAsVecDecoder::<T>::default()
1589 }
1590 }
1591 pub struct TrivialDecoder;
1592
1593 impl TrivialDecoder {
1594 fn maybe_skip<'a, V, R: AvroRead>(
1595 self,
1596 r: ValueOrReader<'a, V, R>,
1597 ) -> Result<(), AvroError> {
1598 if let ValueOrReader::Reader { len, r } = r {
1599 Ok(r.skip(len)?)
1600 } else {
1601 Ok(())
1602 }
1603 }
1604 }
1605
1606 impl AvroDecode for TrivialDecoder {
1607 type Out = ();
1608 fn record<R: AvroRead, A: AvroRecordAccess<R>>(self, a: &mut A) -> Result<(), AvroError> {
1609 while let Some((_, _, f)) = a.next_field()? {
1610 f.decode_field(TrivialDecoder)?;
1611 }
1612 Ok(())
1613 }
1614 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1615 self,
1616 _idx: usize,
1617 _n_variants: usize,
1618 _null_variant: Option<usize>,
1619 deserializer: D,
1620 reader: &'a mut R,
1621 ) -> Result<(), AvroError> {
1622 deserializer.deserialize(reader, self)
1623 }
1624
1625 fn enum_variant(self, _symbol: &str, _idx: usize) -> Result<(), AvroError> {
1626 Ok(())
1627 }
1628 fn scalar(self, _scalar: Scalar) -> Result<(), AvroError> {
1629 Ok(())
1630 }
1631 fn decimal<'a, R: AvroRead>(
1632 self,
1633 _precision: usize,
1634 _scale: usize,
1635 r: ValueOrReader<'a, &'a [u8], R>,
1636 ) -> Result<(), AvroError> {
1637 self.maybe_skip(r)
1638 }
1639 fn bytes<'a, R: AvroRead>(
1640 self,
1641 r: ValueOrReader<'a, &'a [u8], R>,
1642 ) -> Result<(), AvroError> {
1643 self.maybe_skip(r)
1644 }
1645 fn string<'a, R: AvroRead>(
1646 self,
1647 r: ValueOrReader<'a, &'a str, R>,
1648 ) -> Result<(), AvroError> {
1649 self.maybe_skip(r)
1650 }
1651 fn json<'a, R: AvroRead>(
1652 self,
1653 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1654 ) -> Result<(), AvroError> {
1655 self.maybe_skip(r)
1656 }
1657 fn uuid<'a, R: AvroRead>(self, r: ValueOrReader<'a, &'a [u8], R>) -> Result<(), AvroError> {
1658 self.maybe_skip(r)
1659 }
1660 fn fixed<'a, R: AvroRead>(
1661 self,
1662 r: ValueOrReader<'a, &'a [u8], R>,
1663 ) -> Result<(), AvroError> {
1664 self.maybe_skip(r)
1665 }
1666 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<(), AvroError> {
1667 while a.decode_next(TrivialDecoder)?.is_some() {}
1668 Ok(())
1669 }
1670
1671 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<(), AvroError> {
1672 while let Some((_n, entry)) = m.next_entry()? {
1673 entry.decode_field(TrivialDecoder)?
1674 }
1675 Ok(())
1676 }
1677 }
1678 pub struct ValueDecoder;
1679 impl AvroDecode for ValueDecoder {
1680 type Out = Value;
1681 fn record<R: AvroRead, A: AvroRecordAccess<R>>(
1682 self,
1683 a: &mut A,
1684 ) -> Result<Value, AvroError> {
1685 let mut fields = vec![];
1686 while let Some((name, idx, f)) = a.next_field()? {
1687 let next = ValueDecoder;
1688 let val = f.decode_field(next)?;
1689 fields.push((idx, (name.to_string(), val)));
1690 }
1691 fields.sort_by_key(|(idx, _)| *idx);
1692
1693 Ok(Value::Record(
1694 fields
1695 .into_iter()
1696 .map(|(_idx, (name, val))| (name, val))
1697 .collect(),
1698 ))
1699 }
1700 fn union_branch<'a, R: AvroRead, D: AvroDeserializer>(
1701 self,
1702 index: usize,
1703 n_variants: usize,
1704 null_variant: Option<usize>,
1705 deserializer: D,
1706 reader: &'a mut R,
1707 ) -> Result<Value, AvroError> {
1708 let next = ValueDecoder;
1709 let inner = Box::new(deserializer.deserialize(reader, next)?);
1710 Ok(Value::Union {
1711 index,
1712 inner,
1713 n_variants,
1714 null_variant,
1715 })
1716 }
1717 fn array<A: AvroArrayAccess>(self, a: &mut A) -> Result<Value, AvroError> {
1718 let mut items = vec![];
1719 loop {
1720 let next = ValueDecoder;
1721
1722 if let Some(value) = a.decode_next(next)? {
1723 items.push(value)
1724 } else {
1725 break;
1726 }
1727 }
1728 Ok(Value::Array(items))
1729 }
1730 fn enum_variant(self, symbol: &str, idx: usize) -> Result<Value, AvroError> {
1731 Ok(Value::Enum(idx, symbol.to_string()))
1732 }
1733 fn scalar(self, scalar: Scalar) -> Result<Value, AvroError> {
1734 Ok(scalar.into())
1735 }
1736 fn decimal<'a, R: AvroRead>(
1737 self,
1738 precision: usize,
1739 scale: usize,
1740 r: ValueOrReader<'a, &'a [u8], R>,
1741 ) -> Result<Value, AvroError> {
1742 let unscaled = match r {
1743 ValueOrReader::Value(buf) => buf.to_vec(),
1744 ValueOrReader::Reader { len, r } => {
1745 let mut buf = vec![];
1746 buf.resize_with(len, Default::default);
1747 r.read_exact(&mut buf)?;
1748 buf
1749 }
1750 };
1751 Ok(Value::Decimal(DecimalValue {
1752 unscaled,
1753 precision,
1754 scale,
1755 }))
1756 }
1757 fn bytes<'a, R: AvroRead>(
1758 self,
1759 r: ValueOrReader<'a, &'a [u8], R>,
1760 ) -> Result<Value, AvroError> {
1761 let buf = match r {
1762 ValueOrReader::Value(buf) => buf.to_vec(),
1763 ValueOrReader::Reader { len, r } => {
1764 let mut buf = vec![];
1765 buf.resize_with(len, Default::default);
1766 r.read_exact(&mut buf)?;
1767 buf
1768 }
1769 };
1770 Ok(Value::Bytes(buf))
1771 }
1772 fn string<'a, R: AvroRead>(
1773 self,
1774 r: ValueOrReader<'a, &'a str, R>,
1775 ) -> Result<Value, AvroError> {
1776 let s = match r {
1777 ValueOrReader::Value(s) => s.to_string(),
1778 ValueOrReader::Reader { len, r } => {
1779 let mut buf = vec![];
1780 buf.resize_with(len, Default::default);
1781 r.read_exact(&mut buf)?;
1782 String::from_utf8(buf)
1783 .map_err(|_e| AvroError::Decode(DecodeError::StringUtf8Error))?
1784 }
1785 };
1786 Ok(Value::String(s))
1787 }
1788 fn json<'a, R: AvroRead>(
1789 self,
1790 r: ValueOrReader<'a, &'a serde_json::Value, R>,
1791 ) -> Result<Value, AvroError> {
1792 let val = match r {
1793 ValueOrReader::Value(val) => val.clone(),
1794 ValueOrReader::Reader { len, r } => {
1795 let mut buf = vec![];
1796 buf.resize_with(len, Default::default);
1797 r.read_exact(&mut buf)?;
1798 serde_json::from_slice(&buf).map_err(|e| {
1799 AvroError::Decode(DecodeError::BadJson {
1800 category: e.classify(),
1801 bytes: buf.to_owned(),
1802 })
1803 })?
1804 }
1805 };
1806 Ok(Value::Json(val))
1807 }
1808 fn uuid<'a, R: AvroRead>(
1809 self,
1810 r: ValueOrReader<'a, &'a [u8], R>,
1811 ) -> Result<Value, AvroError> {
1812 let buf = match r {
1813 ValueOrReader::Value(val) => val.to_vec(),
1814 ValueOrReader::Reader { len, r } => {
1815 let mut buf = vec![];
1816 buf.resize_with(len, Default::default);
1817 r.read_exact(&mut buf)?;
1818 buf
1819 }
1820 };
1821 let s = std::str::from_utf8(&buf)
1822 .map_err(|_| AvroError::Decode(DecodeError::UuidUtf8Error))?;
1823 let val =
1824 uuid::Uuid::parse_str(s).map_err(|e| AvroError::Decode(DecodeError::BadUuid(e)))?;
1825 Ok(Value::Uuid(val))
1826 }
1827 fn fixed<'a, R: AvroRead>(
1828 self,
1829 r: ValueOrReader<'a, &'a [u8], R>,
1830 ) -> Result<Value, AvroError> {
1831 let buf = match r {
1832 ValueOrReader::Value(buf) => buf.to_vec(),
1833 ValueOrReader::Reader { len, r } => {
1834 let mut buf = vec![];
1835 buf.resize_with(len, Default::default);
1836 r.read_exact(&mut buf)?;
1837 buf
1838 }
1839 };
1840 Ok(Value::Fixed(buf.len(), buf))
1841 }
1842 fn map<M: AvroMapAccess>(self, m: &mut M) -> Result<Value, AvroError> {
1843 let mut entries = BTreeMap::new();
1844 while let Some((name, a)) = m.next_entry()? {
1845 let d = ValueDecoder;
1846 let val = a.decode_field(d)?;
1847 entries.insert(name, val);
1848 }
1849 Ok(Value::Map(entries))
1850 }
1851 }
1852}
1853
1854impl<'a> AvroDeserializer for &'a Value {
1855 fn deserialize<R: AvroRead, D: AvroDecode>(
1856 self,
1857 _r: &mut R,
1858 d: D,
1859 ) -> Result<D::Out, AvroError> {
1860 give_value(d, self)
1861 }
1862}
1863
1864pub fn give_value<D: AvroDecode>(d: D, v: &Value) -> Result<D::Out, AvroError> {
1865 use ValueOrReader::Value as V;
1866 match v {
1867 Value::Null => d.scalar(Scalar::Null),
1868 Value::Boolean(val) => d.scalar(Scalar::Boolean(*val)),
1869 Value::Int(val) => d.scalar(Scalar::Int(*val)),
1870 Value::Long(val) => d.scalar(Scalar::Long(*val)),
1871 Value::Float(val) => d.scalar(Scalar::Float(*val)),
1872 Value::Double(val) => d.scalar(Scalar::Double(*val)),
1873 Value::Date(val) => d.scalar(Scalar::Date(*val)),
1874 Value::Timestamp(val) => d.scalar(Scalar::Timestamp(*val)),
1875 Value::Decimal(val) => d.decimal::<&[u8]>(val.precision, val.scale, V(&val.unscaled)),
1878 Value::Bytes(val) => d.bytes::<&[u8]>(V(val)),
1879 Value::String(val) => d.string::<&[u8]>(V(val)),
1880 Value::Fixed(_len, val) => d.fixed::<&[u8]>(V(val)),
1881 Value::Enum(idx, symbol) => d.enum_variant(symbol, *idx),
1882 Value::Union {
1883 index,
1884 inner,
1885 n_variants,
1886 null_variant,
1887 } => {
1888 let mut empty_reader: &[u8] = &[];
1889 d.union_branch(
1890 *index,
1891 *n_variants,
1892 *null_variant,
1893 &**inner,
1894 &mut empty_reader,
1895 )
1896 }
1897 Value::Array(val) => {
1898 let mut a = ValueArrayAccess::new(val);
1899 d.array(&mut a)
1900 }
1901 Value::Map(val) => {
1902 let vals: Vec<_> = val.clone().into_iter().collect();
1903 let mut m = ValueMapAccess::new(vals.as_slice());
1904 d.map(&mut m)
1905 }
1906 Value::Record(val) => {
1907 let mut a = ValueRecordAccess::new(val);
1908 d.record(&mut a)
1909 }
1910 Value::Json(val) => d.json::<&[u8]>(V(val)),
1911 Value::Uuid(val) => d.uuid::<&[u8]>(V(val.to_string().as_bytes())),
1912 }
1913}
1914
1915pub trait AvroDeserializer {
1916 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError>;
1917}
1918
1919#[derive(Clone, Copy)]
1920pub struct GeneralDeserializer<'a> {
1921 pub schema: SchemaNode<'a>,
1922}
1923
1924const MAX_DECODE_DEPTH: usize = 128;
1929
1930thread_local! {
1931 static DECODE_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
1932 static DECODE_NODES: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
1938}
1939
1940struct DecodeDepthGuard;
1941impl DecodeDepthGuard {
1942 fn enter() -> Result<Self, AvroError> {
1943 DECODE_DEPTH.with(|d| {
1944 let new = d.get() + 1;
1945 if new > MAX_DECODE_DEPTH {
1946 return Err(AvroError::Decode(DecodeError::Custom(format!(
1947 "Avro decode depth exceeds limit {MAX_DECODE_DEPTH}"
1948 ))));
1949 }
1950 d.set(new);
1951 if new == 1 {
1957 DECODE_NODES.with(|n| n.set(0));
1958 }
1959 Ok(DecodeDepthGuard)
1960 })
1961 }
1962}
1963impl Drop for DecodeDepthGuard {
1964 fn drop(&mut self) {
1965 DECODE_DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
1966 }
1967}
1968
1969fn charge_value_nodes(kind: &str, nodes: usize) -> Result<(), AvroError> {
1978 DECODE_NODES.with(|n| {
1979 let total = n.get().saturating_add(nodes);
1980 if total > MAX_VALUE_NODES {
1981 return Err(AvroError::Decode(DecodeError::Custom(format!(
1982 "Avro {kind} decode exceeds cumulative limit {MAX_VALUE_NODES} decoded values"
1983 ))));
1984 }
1985 n.set(total);
1986 Ok(())
1987 })
1988}
1989
1990impl<'a> AvroDeserializer for GeneralDeserializer<'a> {
1991 fn deserialize<R: AvroRead, D: AvroDecode>(self, r: &mut R, d: D) -> Result<D::Out, AvroError> {
1992 let _guard = DecodeDepthGuard::enter()?;
1993 use ValueOrReader::Reader;
1994 match self.schema.inner {
1995 SchemaPiece::Null => d.scalar(Scalar::Null),
1996 SchemaPiece::Boolean => {
1997 let mut buf = [0u8; 1];
1998 r.read_exact(&mut buf[..])?;
1999 let val = match buf[0] {
2000 0u8 => false,
2001 1u8 => true,
2002 other => return Err(AvroError::Decode(DecodeError::BadBoolean(other))),
2003 };
2004 d.scalar(Scalar::Boolean(val))
2005 }
2006 SchemaPiece::Int => {
2007 let val = zag_i32(r)?;
2008 d.scalar(Scalar::Int(val))
2009 }
2010 SchemaPiece::Long => {
2011 let val = zag_i64(r)?;
2012 d.scalar(Scalar::Long(val))
2013 }
2014 SchemaPiece::Float => {
2015 let val = decode_float(r)?;
2016 d.scalar(Scalar::Float(val))
2017 }
2018 SchemaPiece::Double => {
2019 let val = decode_double(r)?;
2020 d.scalar(Scalar::Double(val))
2021 }
2022 SchemaPiece::Date => {
2023 let days = zag_i32(r)?;
2024 d.scalar(Scalar::Date(days))
2025 }
2026 SchemaPiece::TimestampMilli => {
2027 let total_millis = zag_i64(r)?;
2028 let scalar = match build_ts_value(total_millis, TsUnit::Millis)? {
2029 Value::Timestamp(ts) => Scalar::Timestamp(ts),
2030 _ => unreachable!(),
2031 };
2032 d.scalar(scalar)
2033 }
2034 SchemaPiece::TimestampMicro => {
2035 let total_micros = zag_i64(r)?;
2036 let scalar = match build_ts_value(total_micros, TsUnit::Micros)? {
2037 Value::Timestamp(ts) => Scalar::Timestamp(ts),
2038 _ => unreachable!(),
2039 };
2040 d.scalar(scalar)
2041 }
2042 SchemaPiece::Decimal {
2043 precision,
2044 scale,
2045 fixed_size,
2046 } => {
2047 let len = fixed_size.map(Ok).unwrap_or_else(|| decode_len(r))?;
2048 d.decimal(*precision, *scale, Reader { len, r })
2049 }
2050 SchemaPiece::Bytes => {
2051 let len = decode_len(r)?;
2052 d.bytes(Reader { len, r })
2053 }
2054 SchemaPiece::String => {
2055 let len = decode_len(r)?;
2056 d.string(Reader { len, r })
2057 }
2058 SchemaPiece::Json => {
2059 let len = decode_len(r)?;
2060 d.json(Reader { len, r })
2061 }
2062 SchemaPiece::Uuid => {
2063 let len = decode_len(r)?;
2064 d.uuid(Reader { len, r })
2065 }
2066 SchemaPiece::Array(inner) => {
2067 let mut a = SimpleArrayAccess::new(r, self.schema.step(inner));
2072 d.array(&mut a)
2073 }
2074 SchemaPiece::Map(inner) => {
2075 let mut m = SimpleMapAccess::new(self.schema.step(inner), r);
2077 d.map(&mut m)
2078 }
2079 SchemaPiece::Union(inner) => {
2080 let index = decode_long_nonneg(r)? as usize;
2081 let variants = inner.variants();
2082 match variants.get(index) {
2083 Some(variant) => {
2084 let n_variants = variants.len();
2085 let null_variant = variants
2086 .iter()
2087 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
2088 let dsr = GeneralDeserializer {
2089 schema: self.schema.step(variant),
2090 };
2091 d.union_branch(index, n_variants, null_variant, dsr, r)
2092 }
2093 None => Err(AvroError::Decode(DecodeError::BadUnionIndex {
2094 index,
2095 len: variants.len(),
2096 })),
2097 }
2098 }
2099 SchemaPiece::ResolveIntLong => {
2100 let val = zag_i32(r)? as i64;
2101 d.scalar(Scalar::Long(val))
2102 }
2103 SchemaPiece::ResolveIntFloat => {
2104 let val = zag_i32(r)? as f32;
2105 d.scalar(Scalar::Float(val))
2106 }
2107 SchemaPiece::ResolveIntDouble => {
2108 let val = zag_i32(r)? as f64;
2109 d.scalar(Scalar::Double(val))
2110 }
2111 SchemaPiece::ResolveLongFloat => {
2112 let val = zag_i64(r)? as f32;
2113 d.scalar(Scalar::Float(val))
2114 }
2115 SchemaPiece::ResolveLongDouble => {
2116 let val = zag_i64(r)? as f64;
2117 d.scalar(Scalar::Double(val))
2118 }
2119 SchemaPiece::ResolveFloatDouble => {
2120 let val = decode_float(r)? as f64;
2121 d.scalar(Scalar::Double(val))
2122 }
2123 SchemaPiece::ResolveConcreteUnion {
2124 index,
2125 inner,
2126 n_reader_variants,
2127 reader_null_variant,
2128 } => {
2129 let dsr = GeneralDeserializer {
2130 schema: self.schema.step(&**inner),
2131 };
2132 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
2133 }
2134 SchemaPiece::ResolveUnionUnion {
2135 permutation,
2136 n_reader_variants,
2137 reader_null_variant,
2138 } => {
2139 let index = decode_long_nonneg(r)? as usize;
2140 if index >= permutation.len() {
2141 return Err(AvroError::Decode(DecodeError::BadUnionIndex {
2142 index,
2143 len: permutation.len(),
2144 }));
2145 }
2146 match &permutation[index] {
2147 Err(e) => Err(e.clone()),
2148 Ok((index, variant)) => {
2149 let dsr = GeneralDeserializer {
2150 schema: self.schema.step(variant),
2151 };
2152 d.union_branch(*index, *n_reader_variants, *reader_null_variant, dsr, r)
2153 }
2154 }
2155 }
2156 SchemaPiece::ResolveUnionConcrete { index, inner } => {
2157 let found_index = decode_long_nonneg(r)? as usize;
2158 if *index != found_index {
2159 Err(AvroError::Decode(DecodeError::WrongUnionIndex {
2160 expected: *index,
2161 actual: found_index,
2162 }))
2163 } else {
2164 let dsr = GeneralDeserializer {
2165 schema: self.schema.step(inner.as_ref()),
2166 };
2167 dsr.deserialize(r, d)
2169 }
2170 }
2171 SchemaPiece::Record {
2172 doc: _,
2173 fields,
2174 lookup: _,
2175 } => {
2176 let mut a = SimpleRecordAccess::new(self.schema, r, fields);
2177 d.record(&mut a)
2178 }
2179 SchemaPiece::Enum {
2180 symbols,
2181 doc: _,
2182 default_idx: _,
2183 } => {
2184 let index = decode_int_nonneg(r)? as usize;
2185 match symbols.get(index) {
2186 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
2187 index,
2188 len: symbols.len(),
2189 })),
2190 Some(symbol) => d.enum_variant(symbol, index),
2191 }
2192 }
2193 SchemaPiece::Fixed { size } => d.fixed(Reader { len: *size, r }),
2194 SchemaPiece::ResolveRecord {
2200 defaults,
2201 fields,
2202 n_reader_fields: _,
2203 } => {
2204 let mut a = ResolvedRecordAccess::new(defaults, fields, r, self.schema);
2205 d.record(&mut a)
2206 }
2207 SchemaPiece::ResolveEnum {
2208 doc: _,
2209 symbols,
2210 default,
2211 } => {
2212 let index = decode_int_nonneg(r)? as usize;
2213 match symbols.get(index) {
2214 None => Err(AvroError::Decode(DecodeError::BadEnumIndex {
2215 index,
2216 len: symbols.len(),
2217 })),
2218 Some(op) => match op {
2219 Err(missing) => {
2220 if let Some((reader_index, symbol)) = default.clone() {
2221 d.enum_variant(&symbol, reader_index)
2222 } else {
2223 Err(AvroError::Decode(DecodeError::MissingEnumIndex {
2224 index,
2225 symbol: missing.clone(),
2226 }))
2227 }
2228 }
2229 Ok((index, name)) => d.enum_variant(name, *index),
2230 },
2231 }
2232 }
2233 SchemaPiece::ResolveIntTsMilli => {
2234 let total_millis = zag_i32(r)?;
2235 let scalar = match build_ts_value(total_millis as i64, TsUnit::Millis)? {
2236 Value::Timestamp(ts) => Scalar::Timestamp(ts),
2237 _ => unreachable!(),
2238 };
2239 d.scalar(scalar)
2240 }
2241 SchemaPiece::ResolveIntTsMicro => {
2242 let total_micros = zag_i32(r)?;
2243 let scalar = match build_ts_value(total_micros as i64, TsUnit::Micros)? {
2244 Value::Timestamp(ts) => Scalar::Timestamp(ts),
2245 _ => unreachable!(),
2246 };
2247 d.scalar(scalar)
2248 }
2249 SchemaPiece::ResolveDateTimestamp => {
2250 let days = zag_i32(r)?;
2251
2252 let date = NaiveDate::from_ymd_opt(1970, 1, 1)
2253 .expect("naive date known valid")
2254 .checked_add_signed(
2255 chrono::Duration::try_days(days.into())
2256 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?,
2257 )
2258 .ok_or(AvroError::Decode(DecodeError::BadDate(days)))?;
2259 let dt = date.and_hms_opt(0, 0, 0).expect("HMS known valid");
2260 d.scalar(Scalar::Timestamp(dt))
2261 }
2262 }
2263 }
2264}
2265pub fn decode<'a, R: AvroRead>(
2267 schema: SchemaNode<'a>,
2268 reader: &'a mut R,
2269) -> Result<Value, AvroError> {
2270 let d = ValueDecoder;
2271 let dsr = GeneralDeserializer { schema };
2272 let val = dsr.deserialize(reader, d)?;
2273 Ok(val)
2274}